Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ __pycache__/
eo_api.egg-info/
pygeoapi-openapi.yml
target/
src/eo_api/datasets/cache/
5 changes: 5 additions & 0 deletions pygeoapi-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ resources:
processor:
name: eo_api.routers.ogcapi.plugins.processes.zonal_statistics.ZonalStatisticsProcessor

temporal-reduce:
type: process
processor:
name: eo_api.routers.ogcapi.plugins.processes.temporal_reduce.TemporalReduceProcessor

aggregation:
type: process
processor:
Expand Down
4 changes: 2 additions & 2 deletions src/eo_api/datasets/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_dataset_cache(
start: str,
end: str | None = None,
overwrite: bool = False,
background_tasks: BackgroundTasks | None = None,
background_tasks: BackgroundTasks = None,
) -> dict[str, str]:
"""Download and cache dataset as local netcdf files direct from the source."""
dataset = _get_dataset_or_404(dataset_id)
Expand All @@ -52,7 +52,7 @@ def build_dataset_cache(
@router.get("/{dataset_id}/optimize_cache", response_model=dict)
def optimize_dataset_cache(
dataset_id: str,
background_tasks: BackgroundTasks | None = None,
background_tasks: BackgroundTasks = None,
) -> dict[str, str]:
"""Optimize dataset cache by collecting all cache files to a single zarr archive."""
dataset = _get_dataset_or_404(dataset_id)
Expand Down
1 change: 1 addition & 0 deletions src/eo_api/datasets/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def get_data(dataset: dict[str, Any], start: str, end: str) -> xr.Dataset:
logger.info("Opening dataset")
zarr_path = cache.get_zarr_path(dataset)
if zarr_path:
logger.info(f'Using optimized zarr file: {zarr_path}')
ds = xr.open_zarr(zarr_path, consolidated=True)
else:
logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion src/eo_api/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def numpy_period_array(t_array: np.ndarray[Any, Any], period_type: str) -> np.nd
return s.astype(f"U{lengths[period_type]}")


def pandas_period_string(column: pd.Series[Any], period_type: str) -> pd.Series[Any]:
def pandas_period_string(column: 'pd.Series[Any]', period_type: str) -> 'pd.Series[Any]':
"""Format a pandas datetime column as period strings."""
if period_type == "hourly":
return column.dt.strftime("%Y-%m-%dT%H") # type: ignore[no-any-return]
Expand Down
2 changes: 2 additions & 0 deletions src/eo_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import eo_api.startup # noqa: F401 # pyright: ignore[reportUnusedImport]
from eo_api.lifecycle import lifespan
from eo_api.routers import cog, ogcapi, pipelines, prefect, root
from eo_api.datasets import api as datasets

app = FastAPI(lifespan=lifespan)

Expand All @@ -20,6 +21,7 @@
app.include_router(root.router)
app.include_router(cog.router, prefix="/cog", tags=["Cloud Optimized GeoTIFF"])
app.include_router(pipelines.router, prefix="/pipelines", tags=["Pipelines"])
app.include_router(datasets.router, prefix="/datasets", tags=["Datasets"])

app.mount(path="/ogcapi", app=ogcapi.app)
app.mount(path="/", app=prefect.app)
1 change: 1 addition & 0 deletions src/eo_api/routers/ogcapi/plugins/processes/build_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
'''Dummy module for a process that would build a zarr file from netcdf file given a dataset id.'''
23 changes: 23 additions & 0 deletions src/eo_api/routers/ogcapi/plugins/processes/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ def validate_stats(self) -> "ZonalStatisticsInput":
return self


class TemporalReduceInput(BaseModel):
"""Inputs for raster temporal reduce process."""

raster: str = Field(..., description="Raster path or URI")
band: str = Field(default=1, description="Raster band name")
period_type: str = Field(default=1, description="Period type")
time_period: str = Field(default=1, description="Time period")
stats: list[str] = Field(
default_factory=lambda: ["mean"],
min_length=1,
description="Statistics to compute",
)

@model_validator(mode="after")
def validate_stats(self) -> "TemporalReduceInput":
"""Ensure all requested statistics are supported."""
supported = {"count", "sum", "mean", "min", "max"}
invalid = [stat for stat in self.stats if stat not in supported]
if invalid:
raise ValueError(f"Unsupported stats requested: {invalid}. Allowed stats: {sorted(supported)}")
return self


class CHIRPS3DHIS2PipelineInput(BaseModel):
"""Inputs for CHIRPS3 -> DHIS2 data value pipeline."""

Expand Down
126 changes: 126 additions & 0 deletions src/eo_api/routers/ogcapi/plugins/processes/temporal_reduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
'''Temporal reduce process plugin for raster and time period input.

Example invocation with curl (Windows):

curl -X POST "http://localhost:8000/ogcapi/processes/temporal-reduce/execution" ^
-H "Content-Type: application/json" ^
-d "{\"inputs\": {\"raster\": \"2m_temperature_hourly\", \"band\": \"t2m\", \"period_type\": \"monthly\", \"time_period\": \"2025-12\", \"stats\": [\"mean\"]}}" ^
--output temporal_reduce_map.png

'''

from __future__ import annotations

import logging
from typing import Any

from pydantic import ValidationError
from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError

from eo_api.routers.ogcapi.plugins.processes.schemas import TemporalReduceInput
from eo_api.datasets import raster as raster_ops
from eo_api.datasets import serialize
from eo_api.datasets import registry


logger = logging.getLogger(__name__)


PROCESS_METADATA = {
"version": "0.1.0",
"id": "temporal-reduce",
"title": "Temporal reduce",
"description": "Reduces a raster dataset to a specified period type and time period.",
"jobControlOptions": ["sync-execute", "async-execute"],
"keywords": ["raster", "temporal", "time", "reduce", "aggregation"],
"inputs": {
"raster": {
"title": "Raster dataset id",
"description": "Raster dataset id as defined and created by /datasets endpoints.",
"schema": {"type": "string"},
"minOccurs": 1,
"maxOccurs": 1,
},
"band": {
"title": "Raster band name",
"description": "Name of raster band or variable to reduce.",
"schema": {"type": "string"},
"minOccurs": 1,
"maxOccurs": 1,
},
"period_type": {
"title": "Period type",
"description": "Type of period to reduce to: daily, monthly.",
"schema": {"type": "string"},
"minOccurs": 1,
"maxOccurs": 1,
},
"time_period": {
"title": "Time period",
"description": "Time period to reduce to, in ISO format corresponding to period_type.",
"schema": {"type": "string"},
"minOccurs": 1,
"maxOccurs": 1,
},
"stats": {
"title": "Statistics",
"description": "Statistics to compute for the specified time period.",
"schema": {
"type": "array",
"items": {
"type": "string",
"enum": ["count", "sum", "mean", "min", "max"],
},
"default": ["mean"],
},
"minOccurs": 0,
"maxOccurs": 1,
},
},
"outputs": {
"preview": {
"title": "Rendered map image of reduced raster",
"schema": {"type": "object", "contentMediaType": "image/png"},
}
},
}


class TemporalReduceProcessor(BaseProcessor):
"""Processor that reduces a raster to a given time period."""

def __init__(self, processor_def: dict[str, Any]) -> None:
super().__init__(processor_def, PROCESS_METADATA)
self.supports_outputs = True

def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, bytes]:
try:
inputs = TemporalReduceInput.model_validate(data)
logger.info(f'Process inputs: {inputs.model_dump()}')
except ValidationError as err:
raise ProcessorExecuteError(str(err)) from err

# get dataset metadata
dataset_id = inputs.raster
dataset = registry.get_dataset(dataset_id)
if not dataset:
raise ProcessorExecuteError(f"Dataset '{dataset_id}' not found")

# retrieve and limit raster to a single time period
start = end = inputs.time_period
ds = raster_ops.get_data(dataset, start, end)

# reduce to time dimension
# NOTE: only allows a single statistic for now
statistic = inputs.stats[0]
ds = raster_ops.to_timeperiod(ds, dataset, inputs.period_type, statistic=statistic)

# serialize to image preview
image_data = serialize.xarray_to_preview(ds, dataset, inputs.period_type)

# return
logger.info('Process finished')
return "image/png", image_data

def __repr__(self) -> str:
return "<TemporalReduceProcessor>"
Loading