Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to s3 #2

Merged
merged 8 commits into from
Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,62 @@
# AWS Extensions for datapackage-pipelines

## Install

```
# clone the repo and install it wit pip

git clone https://github.com/frictionlessdata/datapackage-pipelines-aws.git
pip install -e .
```

## Usage

You can use datapackage-pipelines-aws as a plugin for (dpp)[https://github.com/frictionlessdata/datapackage-pipelines#datapackage-pipelines]. In pipeline-spec.yaml it will look like this

```yaml
...
- run: aws.to_s3
```

You will need AWS credentials to be set up. See (the guide to set up the credentials)[http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html]

### to_s3

Saves the DataPackage to AWS S3.

_Parameters:_

* `bucket` - Name of the bucket where DataPackage will be stored (should already be created!)
* `path` - Path (key/prefix) to the DataPackage. May contain format string available for `datapackage.json` Eg: `my/example/path/{owner}/{name}/{version}`


_Example:_

```yaml
datahub:
title: datahub-to-s3
pipeline:
-
run: load_metadata
parameters:
url: http://example.com/my-datapackage/datapackage.json
-
run: load_resource
parameters:
url: http://example.com/my-datapackage/datapackage.json
resource: my-resource
-
run: aws.to_s3
parameters:
bucket: my.bucket.name
path: path/{owner}/{name}/{version}
-
run: aws.to_s3
parameters:
bucket: my.another.bucket
path: another/path/{version}
```

Executing pipeline above will save DataPackage in the following directories on S3:
* my.bucket.name/path/my-name/py-package-name/latest/...
* my.bucket.name/another/path/latest/...
12 changes: 0 additions & 12 deletions datapackage_pipelines_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +0,0 @@
# -*- coding: utf-8 -*-

import io
import os

from .generator import Generator

VERSION_FILE = os.path.join(os.path.dirname(__file__), 'VERSION')

__version__ = io.open(VERSION_FILE, encoding='utf-8').readline().strip()

__all__ = ['Generator']
13 changes: 13 additions & 0 deletions datapackage_pipelines_aws/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
import logging


def generate_path(file_path, base_path='', datapackage={}):
format_params = {'version': 'latest'}
format_params.update(datapackage)
try:
base_path = base_path.format(**format_params)
except KeyError:
logging.error('datapackage.json is missing property: %s' % KeyError)
raise
return os.path.join(base_path, file_path)
25 changes: 25 additions & 0 deletions datapackage_pipelines_aws/processors/to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import boto3

from datapackage_pipelines.lib.dump.dumper_base import CSVDumper
from datapackage_pipelines_aws import helpers


class S3Dumper(CSVDumper):

def initialize(self, params):
super(S3Dumper, self).initialize(params)
self.bucket = params['bucket']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about AWS access key and secret?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akariv boto3 takes care of credential if they are set-up. It looks up in aws config file or searches for ENV variables http://boto3.readthedocs.io/en/latest/guide/configuration.html#aws-config-file
But we can have them as a part of the spec as well if that is a case

self.client = boto3.client('s3')
self.base_path = params.get('path', '')

def prepare_datapackage(self, datapackage, _):
self.datapackage = datapackage
return datapackage

def write_file_to_output(self, filename, path):
key = helpers.generate_path(path, self.base_path, self.datapackage)
self.client.put_object(
Body=open(filename, 'rb'), Bucket=self.bucket, Key=key)


S3Dumper()()
28 changes: 28 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import unittest

from datapackage_pipelines_aws import helpers

class TestToS3Proccessor(unittest.TestCase):
def test_generate_path(self):
inpath = 'datapackage.json'
basepath = 'my/test/path'
expected = 'my/test/path/datapackage.json'
datapackage = {'name': 'my-package'}
out = helpers.generate_path(inpath, basepath, datapackage)
self.assertEquals(out, expected)

def test_generate_path_with_formated_string(self):
inpath = 'datapackage.json'
basepath = 'my/test/path/{owner}/{name}/{version}'
expected = 'my/test/path/me/my-package/latest/datapackage.json'
datapackage = {'name': 'my-package', 'owner': 'me'}
out = helpers.generate_path(inpath, basepath, datapackage)
self.assertEquals(out, expected)

def test_generate_path_errors_without_owner_in_datapackage(self):
inpath = 'datapackage.json'
basepath = 'my/test/path/{owner}/{name}/{version}'
expected = 'my/test/path/me/my-package/latest/datapackage.json'
datapackage = {'name': 'my-package',}
with self.assertRaises(KeyError) as context:
helpers.generate_path(inpath, basepath, datapackage)
94 changes: 94 additions & 0 deletions tests/test_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import os
import unittest
import datetime

from moto import mock_s3
import boto3

from datapackage_pipelines.utilities.lib_test_helpers import (
mock_processor_test
)

import datapackage_pipelines_aws.processors

import logging
log = logging.getLogger(__name__)


class TestToS3Proccessor(unittest.TestCase):
def setUp(self):
self.bucket = 'my.test.bucket'
self.resources = [{
'name': 'resource',
"format": "csv",
"path": "data/test.csv",
"schema": {
"fields": [
{
"name": "Date",
"type": "date",
},
{
"name": "Name",
"type": "string",
}
]
}
}]
self.datapackage = {
'owner': 'me',
'name': 'my-datapackage',
'project': 'my-project',
'resources': self.resources
}
self.params = {
'bucket': self.bucket,
'path': 'my/test/path/{owner}/{name}/{version}'
}
# Path to the processor we want to test
self.processor_dir = \
os.path.dirname(datapackage_pipelines_aws.processors.__file__)
self.processor_path = os.path.join(self.processor_dir, 'to_s3.py')

@mock_s3
def test_puts_datapackage_on_s3(self):
# Should be in setup but requires mock
s3 = boto3.resource('s3')
s3.create_bucket(Bucket=self.bucket)
bucket = s3.Bucket(self.bucket)

class TempList(list):
pass

res = TempList([{'Date': datetime.datetime(1, 1, 1), 'Name': 'Name'}])
res.spec = self.resources[0]
res_iter = [res]

spew_args, _ = mock_processor_test(self.processor_path,
(self.params,
self.datapackage,
res_iter))

spew_res_iter = spew_args[1]
# We need to actually read the rows to ecexute the iterator(s)
rows = [list(res) for res in spew_res_iter]

keys = [key.key for key in bucket.objects.all()]

dp_path = 'my/test/path/me/my-datapackage/latest/datapackage.json'
csv_path = 'my/test/path/me/my-datapackage/latest/data/test.csv'
assert dp_path in keys
assert csv_path in keys

# Check datapackage.json content
content = s3.Object(self.bucket, dp_path).get()['Body']\
.read().decode("utf-8")
self.assertEquals(json.loads(content)['owner'], 'me')
self.assertEquals(json.loads(content)['name'], 'my-datapackage')

# Check csv content
content = s3.Object(self.bucket, csv_path).get()['Body']\
.read().decode("utf-8")
expected_csv = 'Date,Name\r\n0001-01-01 00:00:00,Name\r\n'
self.assertEquals(content, expected_csv)
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ envlist=

[testenv]
deps=
mock
requests-mock
google-compute-engine
moto
pytest
pytest-cov
coverage
Expand Down