Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
.DS_Store
__pycache__/
src/eo_api/datasets/cache
.venv/
.env
eo_api.egg-info/
Expand Down
Empty file removed __init__.py
Empty file.
9 changes: 2 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ dependencies = [
[tool.ruff]
target-version = "py313"
line-length = 120
exclude = [
"src/eo_api/datasets"
]

[tool.ruff.lint]
fixable = ["ALL"]
Expand All @@ -50,6 +47,7 @@ docstring-code-line-length = "dynamic"

[tool.mypy]
python_version = "3.13"
plugins = ["pydantic.mypy"]
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
Expand All @@ -58,16 +56,13 @@ no_implicit_optional = true
warn_unused_ignores = true
strict_equality = true
mypy_path = ["src"]
exclude = [
"src/eo_api/datasets"
]

[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false

[[tool.mypy.overrides]]
module = ["dhis2eo.*", "dhis2_client.*", "pygeoapi.*", "titiler.*", "rasterio.*", "pygeofilter.*", "prefect.*", "requests.*"]
module = ["dhis2eo.*", "dhis2_client.*", "pygeoapi.*", "titiler.*", "rasterio.*", "pygeofilter.*", "prefect.*", "requests.*", "geopandas.*", "earthkit.*", "metpy.*", "matplotlib.*", "yaml"]
ignore_missing_imports = true

[tool.pyright]
Expand Down
186 changes: 104 additions & 82 deletions src/eo_api/datasets/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
"""FastAPI router exposing dataset endpoints."""

from typing import Any

import xarray as xr
from fastapi import APIRouter, BackgroundTasks, HTTPException, Response
from fastapi.responses import FileResponse
from starlette.background import BackgroundTask
Expand All @@ -7,146 +11,164 @@

router = APIRouter()


@router.get("/")
def list_datasets():
"""Returned list of available datasets from registry.
"""
datasets = registry.list_datasets()
return datasets
def list_datasets() -> list[dict[str, Any]]:
"""Return list of available datasets from registry."""
return registry.list_datasets()


def get_dataset_or_404(dataset_id: str):
def _get_dataset_or_404(dataset_id: str) -> dict[str, Any]:
"""Look up a dataset by ID or raise 404."""
dataset = registry.get_dataset(dataset_id)
if not dataset:
raise HTTPException(status_code=404, detail=f"Dataset '{dataset_id}' not found")
return dataset


@router.get("/{dataset_id}", response_model=dict)
def get_dataset(dataset_id: str):
"""Get a single dataset by ID.
"""
dataset = get_dataset_or_404(dataset_id)
def get_dataset(dataset_id: str) -> dict[str, Any]:
"""Get a single dataset by ID."""
dataset = _get_dataset_or_404(dataset_id)
cache_info = cache.get_cache_info(dataset)
dataset.update(cache_info)
return dataset


@router.get("/{dataset_id}/build_cache", response_model=dict)
def build_dataset_cache(dataset_id: str, start: str, end: str | None = None, overwrite: bool = False, background_tasks: BackgroundTasks = None):
"""Download and cache dataset as local netcdf files direct from the source.
"""
dataset = get_dataset_or_404(dataset_id)
def build_dataset_cache(
dataset_id: str,
start: str,
end: str | None = None,
overwrite: bool = False,
background_tasks: BackgroundTasks | None = None,
) -> dict[str, str]:
"""Download and cache dataset as local netcdf files direct from the source."""
dataset = _get_dataset_or_404(dataset_id)
cache.build_dataset_cache(dataset, start=start, end=end, overwrite=overwrite, background_tasks=background_tasks)
return {'status': 'Dataset caching request submitted for processing'}
return {"status": "Dataset caching request submitted for processing"}


@router.get("/{dataset_id}/optimize_cache", response_model=dict)
def optimize_dataset_cache(dataset_id: str, background_tasks: BackgroundTasks = None):
"""Optimize dataset cache by collecting all cache files to a single zarr archive.
"""
dataset = get_dataset_or_404(dataset_id)
background_tasks.add_task(cache.optimize_dataset_cache, dataset)
return {'status': 'Dataset cache optimization submitted for processing'}

def get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation):
def optimize_dataset_cache(
dataset_id: str,
background_tasks: BackgroundTasks | None = None,
) -> dict[str, str]:
"""Optimize dataset cache by collecting all cache files to a single zarr archive."""
dataset = _get_dataset_or_404(dataset_id)
if background_tasks is not None:
background_tasks.add_task(cache.optimize_dataset_cache, dataset)
return {"status": "Dataset cache optimization submitted for processing"}


def _get_dataset_period_type(
dataset: dict[str, Any],
period_type: str,
start: str,
end: str,
temporal_aggregation: str,
) -> xr.Dataset:
"""Load and temporally aggregate a dataset."""
# TODO: maybe move this and similar somewhere better like a pipelines.py file?
# ...

# get raster data
ds = raster.get_data(dataset, start, end)

# aggregate to period type
ds = raster.to_timeperiod(ds, dataset, period_type, statistic=temporal_aggregation)

# return
return ds

@router.get("/{dataset_id}/{period_type}/orgunits", response_model=list)
def get_dataset_period_type_org_units(dataset_id: str, period_type: str, start: str, end: str, temporal_aggregation: str, spatial_aggregation: str):
"""Get a dataset dynamically aggregated to a given period type and org units and return json values.
"""
# get dataset metadata
dataset = get_dataset_or_404(dataset_id)

# get dataset for period type and start/end period
ds = get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)
@router.get("/{dataset_id}/{period_type}/orgunits", response_model=list)
def get_dataset_period_type_org_units(
dataset_id: str,
period_type: str,
start: str,
end: str,
temporal_aggregation: str,
spatial_aggregation: str,
) -> list[dict[str, Any]]:
"""Get a dataset aggregated to a given period type and org units as JSON values."""
dataset = _get_dataset_or_404(dataset_id)
ds = _get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)

# aggregate to geojson features
df = raster.to_features(ds, dataset, features=constants.ORG_UNITS_GEOJSON, statistic=spatial_aggregation)

# convert units if needed (inplace)
# NOTE: here we do it after agggregation to dataframe to speedup computation
# NOTE: here we do it after aggregation to dataframe to speedup computation
units.convert_pandas_units(df, dataset)

# serialize to json
data = serialize.dataframe_to_json_data(df, dataset, period_type)
return data
return serialize.dataframe_to_json_data(df, dataset, period_type)


@router.get("/{dataset_id}/{period_type}/orgunits/preview", response_model=list)
def get_dataset_period_type_org_units_preview(dataset_id: str, period_type: str, period: str, temporal_aggregation: str, spatial_aggregation: str):
"""Preview a PNG map image of a dataset dynamically aggregated to a given period and org units.
"""
# get dataset metadata
dataset = get_dataset_or_404(dataset_id)
def get_dataset_period_type_org_units_preview(
dataset_id: str,
period_type: str,
period: str,
temporal_aggregation: str,
spatial_aggregation: str,
) -> Response:
"""Preview a PNG map image of a dataset aggregated to a given period and org units."""
dataset = _get_dataset_or_404(dataset_id)

# get dataset for period type and a single period
start = end = period
ds = get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)
ds = _get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)

# aggregate to geojson features
df = raster.to_features(ds, dataset, features=constants.ORG_UNITS_GEOJSON, statistic=spatial_aggregation)

# convert units if needed (inplace)
# NOTE: here we do it after agggregation to dataframe to speedup computation
# NOTE: here we do it after aggregation to dataframe to speedup computation
units.convert_pandas_units(df, dataset)

# serialize to image
image_data = serialize.dataframe_to_preview(df, dataset, period_type)

# return as image
return Response(content=image_data, media_type="image/png")

@router.get("/{dataset_id}/{period_type}/raster")
def get_dataset_period_type_raster(dataset_id: str, period_type: str, start: str, end: str, temporal_aggregation: str):
"""Get a dataset dynamically aggregated to a given period type and return as downloadable raster file.
"""
# get dataset metadata
dataset = get_dataset_or_404(dataset_id)

# get dataset for period type and start/end period
ds = get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)
@router.get("/{dataset_id}/{period_type}/raster")
def get_dataset_period_type_raster(
dataset_id: str,
period_type: str,
start: str,
end: str,
temporal_aggregation: str,
) -> FileResponse:
"""Get a dataset aggregated to a given period type as a downloadable raster file."""
dataset = _get_dataset_or_404(dataset_id)
ds = _get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)

# convert units if needed (inplace)
units.convert_xarray_units(ds, dataset)

# serialize to temporary netcdf
file_path = serialize.xarray_to_temporary_netcdf(ds)

# return as streaming file and delete after completion
return FileResponse(
file_path,
media_type="application/x-netcdf",
filename='eo-api-raster-download.nc',
background=BackgroundTask(serialize.cleanup_file, file_path)
filename="eo-api-raster-download.nc",
background=BackgroundTask(serialize.cleanup_file, file_path),
)


@router.get("/{dataset_id}/{period_type}/raster/preview")
def get_dataset_period_type_raster_preview(dataset_id: str, period_type: str, period: str, temporal_aggregation: str):
"""Preview a PNG map image of a dataset dynamically aggregated to a given period.
"""
# get dataset metadata
dataset = get_dataset_or_404(dataset_id)
def get_dataset_period_type_raster_preview(
dataset_id: str,
period_type: str,
period: str,
temporal_aggregation: str,
) -> Response:
"""Preview a PNG map image of a dataset aggregated to a given period."""
dataset = _get_dataset_or_404(dataset_id)

# get dataset for period type and a single period
start = end = period
ds = get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)
ds = _get_dataset_period_type(dataset, period_type, start, end, temporal_aggregation)

# convert units if needed (inplace)
units.convert_xarray_units(ds, dataset)

# serialize to image
image_data = serialize.xarray_to_preview(ds, dataset, period_type)

# return as image
return Response(content=image_data, media_type="image/png")


@router.get("/{dataset_id}/{period_type}/tiles")
def get_dataset_period_type_tiles(dataset_id: str, period_type: str, start: str, end: str, temporal_aggregation: str):
pass
def get_dataset_period_type_tiles(
dataset_id: str,
period_type: str,
start: str,
end: str,
temporal_aggregation: str,
) -> None:
"""Placeholder for future tile-based dataset access."""
Loading