Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stac reader #70

Merged
merged 30 commits into from
Sep 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
420bcd8
wip stac reader
geospatial-jeff Sep 14, 2020
3027e7e
add composite reader
geospatial-jeff Sep 17, 2020
1424b8e
update stac reader
geospatial-jeff Sep 17, 2020
3b30e4b
add read
geospatial-jeff Sep 17, 2020
6e6cfd6
align CogReader and CompositeReader interfaces
geospatial-jeff Sep 17, 2020
436ec16
add base class
geospatial-jeff Sep 17, 2020
12e0338
add composite tiler
geospatial-jeff Sep 17, 2020
eb087f7
better type check
geospatial-jeff Sep 17, 2020
c1fc4a8
allow reader aliasing
geospatial-jeff Sep 17, 2020
dcfe7cb
update stac reader
geospatial-jeff Sep 17, 2020
d6a51f1
this is better
geospatial-jeff Sep 17, 2020
41e9d1e
save progress
geospatial-jeff Sep 17, 2020
cb7fd65
update composite reader
geospatial-jeff Sep 19, 2020
45b7af7
improve asset filtering
geospatial-jeff Sep 19, 2020
a1bd197
fix filter type
geospatial-jeff Sep 19, 2020
2ec0cdb
read stac item from s3
geospatial-jeff Sep 19, 2020
b96ba85
add filesystem method to request json
geospatial-jeff Sep 19, 2020
0548a9d
update logs
geospatial-jeff Sep 19, 2020
674996a
use filesystems for stac
geospatial-jeff Sep 19, 2020
ac23e91
update init
geospatial-jeff Sep 19, 2020
0a3de78
add asset reader
geospatial-jeff Sep 20, 2020
f68b01d
remove imports
geospatial-jeff Sep 20, 2020
fcb8ed7
filter before reading headers
geospatial-jeff Sep 20, 2020
8175bfb
add composite reader test cases
geospatial-jeff Sep 20, 2020
4a70079
update setup.py
geospatial-jeff Sep 20, 2020
52b59c4
Update config.yml
geospatial-jeff Sep 26, 2020
61dbad7
update circle config
geospatial-jeff Sep 26, 2020
8f5592b
raise exception on missing assets, add stac tests
geospatial-jeff Sep 26, 2020
25c1f65
add stac test cases
geospatial-jeff Sep 26, 2020
caae839
remove comment
geospatial-jeff Sep 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
# https://nose.readthedocs.io
- run:
name: run tests
command: python setup.py pytest
command: pip install -e .[dev] && pytest -v

- codecov/upload:
file: "coverage.xml"
file: "coverage.xml"
3 changes: 2 additions & 1 deletion aiocogeo/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .cog import COGReader
from .cog import COGReader, CompositeReader
from .stac import STACReader
108 changes: 106 additions & 2 deletions aiocogeo/cog.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import abc
import asyncio
from dataclasses import dataclass, field
import logging
import math
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urljoin
import uuid

Expand All @@ -23,7 +24,44 @@


@dataclass
class COGReader(PartialReadInterface):
class ReaderMixin(abc.ABC):

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
...

@abc.abstractmethod
async def get_tile(self, x: int, y: int, z: int) -> Union[np.ndarray, List[np.ndarray]]:
...

@abc.abstractmethod
async def read(
self,
bounds: Tuple[float, float, float, float],
shape: Tuple[int, int],
resample_method: int = Image.NEAREST,
) -> Union[Union[np.ndarray, np.ma.masked_array], List[Union[np.ndarray, np.ma.masked_array]]]:
...

@abc.abstractmethod
async def point(self, x: Union[float, int], y: Union[float, int]) -> Union[Union[np.ndarray, np.ma.masked_array], List[Union[np.ndarray, np.ma.masked_array]]]:
...

@abc.abstractmethod
async def preview(
self,
max_size: int = 1024,
height: Optional[int] = None,
width: Optional[int] = None,
resample_method: int = Image.NEAREST
) -> Union[Union[np.ndarray, np.ma.masked_array], List[Union[np.ndarray, np.ma.masked_array]]]:
...


@dataclass
class COGReader(ReaderMixin, PartialReadInterface):
filepath: str
ifds: Optional[List[ImageIFD]] = field(default_factory=lambda: [])
mask_ifds: Optional[List[MaskIFD]] = field(default_factory=lambda: [])
Expand Down Expand Up @@ -323,3 +361,69 @@ def create_tile_matrix_set(self, identifier: str = None) -> Dict[str, Any]:
"tileMatrix": list(reversed(matrices))
}
return tms


FilterType = Callable[[COGReader], Any]
MapType = Callable[[COGReader], Any]
ReduceType = Callable[[List[Union[np.ndarray, np.ma.masked_array]]], Any]

@dataclass
class CompositeReader(ReaderMixin):
readers: Optional[List[COGReader]] = field(default_factory=list)
filter: FilterType = lambda a: a
default_reducer: ReduceType = lambda r: r

def __iter__(self):
return iter(self.readers)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
pass

async def map(self, func: MapType) -> List[Any]:
futs = [func(reader) for reader in filter(self.filter, self.readers)]
return await asyncio.gather(*futs)

async def get_tile(self, x: int, y: int, z: int, reducer: Optional[ReduceType] = None) -> List[np.ndarray]:
"""Fetch a tile from all readers"""
tiles = await self.map(
func=lambda r: r.get_tile(x, y, z),
)
reducer = reducer or self.default_reducer
return reducer(tiles)

async def read(
self,
bounds: Tuple[float, float, float, float],
shape: Tuple[int, int],
resample_method: int = Image.NEAREST,
reducer: Optional[ReduceType] = None
):
reads = await self.map(
func=lambda r: r.read(bounds, shape, resample_method)
)
reducer = reducer or self.default_reducer
return reducer(reads)

async def point(self, x: Union[float, int], y: Union[float, int], reducer: Optional[ReduceType] = None) -> List[Union[np.ndarray, np.ma.masked_array]]:
points = await self.map(
func=lambda r: r.point(x, y)
)
reducer = reducer or self.default_reducer
return reducer(points)

async def preview(
self,
max_size: int = 1024,
height: Optional[int] = None,
width: Optional[int] = None,
resample_method: int = Image.NEAREST,
reducer: Optional[ReduceType] = None,
) -> List[Union[np.ndarray, np.ma.masked_array]]:
previews = await self.map(
func=lambda r: r.preview(max_size, height, width, resample_method)
)
reducer = reducer or self.default_reducer
return reducer(previews)
5 changes: 5 additions & 0 deletions aiocogeo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ class InvalidTiffError(CogReadError):
@dataclass
class TileNotFoundError(CogReadError):
...


@dataclass
class MissingAssets(CogReadError):
...
45 changes: 43 additions & 2 deletions aiocogeo/filesystems.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import asyncio
from dataclasses import dataclass, field
import json
import logging
import time
from typing import Any, Callable, Dict, Union
Expand Down Expand Up @@ -86,6 +87,10 @@ async def range_request(self, start: int, offset: int) -> bytes:
"""
return await self._range_request(start, offset)

@abc.abstractmethod
async def request_json(self):
...

@abc.abstractmethod
async def _range_request(self, start: int, offset: int) -> bytes:
"""Perform a range request"""
Expand Down Expand Up @@ -149,6 +154,17 @@ async def _range_request(self, start: int, offset: int) -> bytes:
raise FileNotFoundError(f"File not found: {self.filepath}") from e
return data

async def request_json(self) -> Dict:
try:
async with self.session.get(self.filepath) as resp:
resp.raise_for_status()
data = await resp.json()
except (aiohttp.ClientError, aiohttp.ClientResponseError) as e:
await self._close()
raise FileNotFoundError(f"File not found: {self.filepath}") from e
return data


async def _close(self) -> None:
if 'session' not in self.kwargs:
await self.session.close()
Expand All @@ -173,10 +189,11 @@ async def _on_request_start(self, session, trace_config_ctx, params):
async def _on_request_end(self, session, trace_config_ctx, params):
if params.response.status < 400:
elapsed = round(asyncio.get_event_loop().time() - trace_config_ctx.start, 3)
content_range = params.response.headers['Content-Range']
content_range = params.response.headers.get('Content-Range')
self._total_bytes_requested += int(params.response.headers["Content-Length"])
self._total_requests += 1
self._requested_ranges.append(tuple([int(v) for v in content_range.split(' ')[-1].split('/')[0].split('-')]))
if content_range:
self._requested_ranges.append(tuple([int(v) for v in content_range.split(' ')[-1].split('/')[0].split('-')]))
if config.VERBOSE_LOGS:
debug_statement = [f"\n < HTTP/{session.version.major}.{session.version.minor}"]
debug_statement += [f"\n < {k}: {v}" for (k, v) in params.response.headers.items()]
Expand All @@ -200,6 +217,9 @@ async def _range_request(self, start: int, offset: int) -> bytes:
logger.debug(f" FINISHED REQUEST in {elapsed} seconds: <STATUS 206> ({start}-{start+offset})")
return data

async def request_json(self):
return json.load(self.file)

async def _close(self) -> None:
await self.file.close()

Expand Down Expand Up @@ -232,6 +252,27 @@ async def _range_request(self, start: int, offset: int) -> bytes:
data = await req['Body'].read()
return data

async def request_json(self):
kwargs = {}
if config.AWS_REQUEST_PAYER:
kwargs['RequestPayer'] = config.AWS_REQUEST_PAYER
begin = time.time()
try:
req = await self.object.get()
except botocore.exceptions.ClientError as e:
await self._close()
raise FileNotFoundError(f"File not found: {self.filepath}") from e
elapsed = time.time() - begin
content_range = req['ResponseMetadata']['HTTPHeaders']['content-range']
if not config.VERBOSE_LOGS:
status = req['ResponseMetadata']['HTTPStatusCode']
logger.debug(f" FINISHED REQUEST in {elapsed} seconds: <STATUS {status}> ({content_range})")
self._total_bytes_requested += int(req['ResponseMetadata']['HTTPHeaders']['content-length'])
self._total_requests += 1
self._requested_ranges.append(tuple([int(v) for v in content_range.split(' ')[-1].split('/')[0].split('-')]))
data = json.loads(await req['Body'].read().decode('utf-8'))
return data

async def _close(self) -> None:
await self.resource.__aexit__('', '', '')

Expand Down
50 changes: 50 additions & 0 deletions aiocogeo/stac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Optional, Set

from stac_pydantic.shared import Asset, MimeTypes

from .cog import COGReader, CompositeReader
from .filesystems import Filesystem
from .errors import MissingAssets


@dataclass
class AssetReader(COGReader):
asset: Asset = Asset


@dataclass
class STACReader(CompositeReader):
filepath: Optional[str] = None
include_types: Set[MimeTypes] = field(default_factory=lambda: {MimeTypes.cog})

kwargs: Optional[Dict] = field(default_factory=dict)


async def __aenter__(self):
async with Filesystem.create_from_filepath(self.filepath, **self.kwargs) as file_reader:
self._file_reader = file_reader
item = await file_reader.request_json()

# Create a reader for each asset with a COG mime type
reader_futs = []
for asset in item["assets"]:
if item["assets"][asset]["type"] in self.include_types:
reader = AssetReader(
filepath=item["assets"][asset]["href"],
asset=Asset(name=asset, **item['assets'][asset])
)
reader_futs.append(reader)

if not reader_futs:
raise MissingAssets(f"No assets found of type {self.include_types}")

reader_futs = map(lambda r: r.__aenter__(), filter(self.filter, reader_futs))
self.readers = await asyncio.gather(*reader_futs)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._file_reader._close()
for reader in self.readers:
await reader._file_reader._close()
Loading