From a9541057ea70953b30e3b26cedcd737fcb64ff69 Mon Sep 17 00:00:00 2001 From: Karim Bahgat Date: Fri, 27 Feb 2026 12:27:45 +0100 Subject: [PATCH 1/2] added raster temporal reduce process plugin reusing components from datasets module --- .gitignore | 1 + pygeoapi-config.yml | 5 + src/eo_api/datasets/raster.py | 1 + src/eo_api/datasets/utils.py | 2 +- .../ogcapi/plugins/processes/build_zarr.py | 1 + .../ogcapi/plugins/processes/schemas.py | 23 ++++ .../plugins/processes/temporal_reduce.py | 126 ++++++++++++++++++ 7 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 src/eo_api/routers/ogcapi/plugins/processes/build_zarr.py create mode 100644 src/eo_api/routers/ogcapi/plugins/processes/temporal_reduce.py diff --git a/.gitignore b/.gitignore index a1da880..6265a4f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ __pycache__/ eo_api.egg-info/ pygeoapi-openapi.yml target/ +src/eo_api/datasets/cache/ \ No newline at end of file diff --git a/pygeoapi-config.yml b/pygeoapi-config.yml index ede57d7..0d55bc9 100644 --- a/pygeoapi-config.yml +++ b/pygeoapi-config.yml @@ -355,6 +355,11 @@ resources: processor: name: eo_api.routers.ogcapi.plugins.processes.zonal_statistics.ZonalStatisticsProcessor + temporal-reduce: + type: process + processor: + name: eo_api.routers.ogcapi.plugins.processes.temporal_reduce.TemporalReduceProcessor + aggregation: type: process processor: diff --git a/src/eo_api/datasets/raster.py b/src/eo_api/datasets/raster.py index 8dce903..f2b51d7 100644 --- a/src/eo_api/datasets/raster.py +++ b/src/eo_api/datasets/raster.py @@ -20,6 +20,7 @@ def get_data(dataset: dict[str, Any], start: str, end: str) -> xr.Dataset: logger.info("Opening dataset") zarr_path = cache.get_zarr_path(dataset) if zarr_path: + logger.info(f'Using optimized zarr file: {zarr_path}') ds = xr.open_zarr(zarr_path, consolidated=True) else: logger.warning( diff --git a/src/eo_api/datasets/utils.py b/src/eo_api/datasets/utils.py index 374ad5e..d421646 100644 --- a/src/eo_api/datasets/utils.py +++ b/src/eo_api/datasets/utils.py @@ -51,7 +51,7 @@ def numpy_period_array(t_array: np.ndarray[Any, Any], period_type: str) -> np.nd return s.astype(f"U{lengths[period_type]}") -def pandas_period_string(column: pd.Series[Any], period_type: str) -> pd.Series[Any]: +def pandas_period_string(column: 'pd.Series[Any]', period_type: str) -> 'pd.Series[Any]': """Format a pandas datetime column as period strings.""" if period_type == "hourly": return column.dt.strftime("%Y-%m-%dT%H") # type: ignore[no-any-return] diff --git a/src/eo_api/routers/ogcapi/plugins/processes/build_zarr.py b/src/eo_api/routers/ogcapi/plugins/processes/build_zarr.py new file mode 100644 index 0000000..8f07ac9 --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/build_zarr.py @@ -0,0 +1 @@ +'''Dummy module for a process that would build a zarr file from netcdf file given a dataset id.''' \ No newline at end of file diff --git a/src/eo_api/routers/ogcapi/plugins/processes/schemas.py b/src/eo_api/routers/ogcapi/plugins/processes/schemas.py index 39e7a2f..0f55e13 100644 --- a/src/eo_api/routers/ogcapi/plugins/processes/schemas.py +++ b/src/eo_api/routers/ogcapi/plugins/processes/schemas.py @@ -61,6 +61,29 @@ def validate_stats(self) -> "ZonalStatisticsInput": return self +class TemporalReduceInput(BaseModel): + """Inputs for raster temporal reduce process.""" + + raster: str = Field(..., description="Raster path or URI") + band: str = Field(default=1, description="Raster band name") + period_type: str = Field(default=1, description="Period type") + time_period: str = Field(default=1, description="Time period") + stats: list[str] = Field( + default_factory=lambda: ["mean"], + min_length=1, + description="Statistics to compute", + ) + + @model_validator(mode="after") + def validate_stats(self) -> "TemporalReduceInput": + """Ensure all requested statistics are supported.""" + supported = {"count", "sum", "mean", "min", "max"} + invalid = [stat for stat in self.stats if stat not in supported] + if invalid: + raise ValueError(f"Unsupported stats requested: {invalid}. Allowed stats: {sorted(supported)}") + return self + + class CHIRPS3DHIS2PipelineInput(BaseModel): """Inputs for CHIRPS3 -> DHIS2 data value pipeline.""" diff --git a/src/eo_api/routers/ogcapi/plugins/processes/temporal_reduce.py b/src/eo_api/routers/ogcapi/plugins/processes/temporal_reduce.py new file mode 100644 index 0000000..f38ee29 --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/temporal_reduce.py @@ -0,0 +1,126 @@ +'''Temporal reduce process plugin for raster and time period input. + +Example invocation with curl (Windows): + +curl -X POST "http://localhost:8000/ogcapi/processes/temporal-reduce/execution" ^ +-H "Content-Type: application/json" ^ +-d "{\"inputs\": {\"raster\": \"2m_temperature_hourly\", \"band\": \"t2m\", \"period_type\": \"monthly\", \"time_period\": \"2025-12\", \"stats\": [\"mean\"]}}" ^ +--output temporal_reduce_map.png + +''' + +from __future__ import annotations + +import logging +from typing import Any + +from pydantic import ValidationError +from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError + +from eo_api.routers.ogcapi.plugins.processes.schemas import TemporalReduceInput +from eo_api.datasets import raster as raster_ops +from eo_api.datasets import serialize +from eo_api.datasets import registry + + +logger = logging.getLogger(__name__) + + +PROCESS_METADATA = { + "version": "0.1.0", + "id": "temporal-reduce", + "title": "Temporal reduce", + "description": "Reduces a raster dataset to a specified period type and time period.", + "jobControlOptions": ["sync-execute", "async-execute"], + "keywords": ["raster", "temporal", "time", "reduce", "aggregation"], + "inputs": { + "raster": { + "title": "Raster dataset id", + "description": "Raster dataset id as defined and created by /datasets endpoints.", + "schema": {"type": "string"}, + "minOccurs": 1, + "maxOccurs": 1, + }, + "band": { + "title": "Raster band name", + "description": "Name of raster band or variable to reduce.", + "schema": {"type": "string"}, + "minOccurs": 1, + "maxOccurs": 1, + }, + "period_type": { + "title": "Period type", + "description": "Type of period to reduce to: daily, monthly.", + "schema": {"type": "string"}, + "minOccurs": 1, + "maxOccurs": 1, + }, + "time_period": { + "title": "Time period", + "description": "Time period to reduce to, in ISO format corresponding to period_type.", + "schema": {"type": "string"}, + "minOccurs": 1, + "maxOccurs": 1, + }, + "stats": { + "title": "Statistics", + "description": "Statistics to compute for the specified time period.", + "schema": { + "type": "array", + "items": { + "type": "string", + "enum": ["count", "sum", "mean", "min", "max"], + }, + "default": ["mean"], + }, + "minOccurs": 0, + "maxOccurs": 1, + }, + }, + "outputs": { + "preview": { + "title": "Rendered map image of reduced raster", + "schema": {"type": "object", "contentMediaType": "image/png"}, + } + }, +} + + +class TemporalReduceProcessor(BaseProcessor): + """Processor that reduces a raster to a given time period.""" + + def __init__(self, processor_def: dict[str, Any]) -> None: + super().__init__(processor_def, PROCESS_METADATA) + self.supports_outputs = True + + def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, bytes]: + try: + inputs = TemporalReduceInput.model_validate(data) + logger.info(f'Process inputs: {inputs.model_dump()}') + except ValidationError as err: + raise ProcessorExecuteError(str(err)) from err + + # get dataset metadata + dataset_id = inputs.raster + dataset = registry.get_dataset(dataset_id) + if not dataset: + raise ProcessorExecuteError(f"Dataset '{dataset_id}' not found") + + # retrieve and limit raster to a single time period + start = end = inputs.time_period + ds = raster_ops.get_data(dataset, start, end) + + # reduce to time dimension + # NOTE: only allows a single statistic for now + statistic = inputs.stats[0] + ds = raster_ops.to_timeperiod(ds, dataset, inputs.period_type, statistic=statistic) + + # serialize to image preview + image_data = serialize.xarray_to_preview(ds, dataset, inputs.period_type) + + # return + logger.info('Process finished') + return "image/png", image_data + + def __repr__(self) -> str: + return "" From 4c7b18515b89c9d495f3387bc16ead2e8819b757 Mon Sep 17 00:00:00 2001 From: Karim Bahgat Date: Fri, 27 Feb 2026 16:07:52 +0100 Subject: [PATCH 2/2] add back datasets/ endpoints and fix type bugs from linting --- src/eo_api/datasets/api.py | 4 ++-- src/eo_api/main.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/eo_api/datasets/api.py b/src/eo_api/datasets/api.py index 5273335..ab6c504 100644 --- a/src/eo_api/datasets/api.py +++ b/src/eo_api/datasets/api.py @@ -41,7 +41,7 @@ def build_dataset_cache( start: str, end: str | None = None, overwrite: bool = False, - background_tasks: BackgroundTasks | None = None, + background_tasks: BackgroundTasks = None, ) -> dict[str, str]: """Download and cache dataset as local netcdf files direct from the source.""" dataset = _get_dataset_or_404(dataset_id) @@ -52,7 +52,7 @@ def build_dataset_cache( @router.get("/{dataset_id}/optimize_cache", response_model=dict) def optimize_dataset_cache( dataset_id: str, - background_tasks: BackgroundTasks | None = None, + background_tasks: BackgroundTasks = None, ) -> dict[str, str]: """Optimize dataset cache by collecting all cache files to a single zarr archive.""" dataset = _get_dataset_or_404(dataset_id) diff --git a/src/eo_api/main.py b/src/eo_api/main.py index f99667a..1ff416e 100644 --- a/src/eo_api/main.py +++ b/src/eo_api/main.py @@ -6,6 +6,7 @@ import eo_api.startup # noqa: F401 # pyright: ignore[reportUnusedImport] from eo_api.lifecycle import lifespan from eo_api.routers import cog, ogcapi, pipelines, prefect, root +from eo_api.datasets import api as datasets app = FastAPI(lifespan=lifespan) @@ -20,6 +21,7 @@ app.include_router(root.router) app.include_router(cog.router, prefix="/cog", tags=["Cloud Optimized GeoTIFF"]) app.include_router(pipelines.router, prefix="/pipelines", tags=["Pipelines"]) +app.include_router(datasets.router, prefix="/datasets", tags=["Datasets"]) app.mount(path="/ogcapi", app=ogcapi.app) app.mount(path="/", app=prefect.app)