Skip to content

Extensibility

The Open Climate Service supports three plugin types, all following the same pattern: place files in the appropriate subdirectory of plugins_dir and the service picks them up automatically — no forking or patching of core code required.

Plugin type Location Format
Datasets plugins_dir/datasets/ .yaml + .py
Processes plugins_dir/processes/ .py
Workflows plugins_dir/workflows/ .json

Datasets

Dataset templates are YAML files that describe a data source. Built-ins live in the package (open_climate_service/plugins/datasets/). Custom templates are loaded from plugins_dir/datasets/.

plugins/
└── datasets/
    ├── enacts_rainfall.yaml    # dataset template
    └── enacts.py               # streaming plugin class
# climate-service.yaml
plugins_dir: ./plugins/

All *.yaml files in plugins_dir/datasets/ are merged with the built-ins. A custom template with the same id as a built-in overrides it — useful for adjusting display ranges or availability settings on an existing dataset.

A Python plugin class is declared alongside the YAML using the ingestion.plugin dotted path. Plugins subclass BaseDatasetPlugin and implement just periods() and fetch_period() — the base class provides the concurrency defaults and canonical dimension names. fetch_period is a regular (blocking) method run in a worker thread, or an async def for natively-async sources. Any data transformations (unit conversion, dimension renaming, nodata masking, bbox clipping) are applied inside the fetch before the xr.Dataset is returned, typically via the normalize_period helper. The grid (shape, dtype, nodata, CRS) is inferred from the first fetched period; a projected-grid source declares its CRS via the crs class attribute.

See Adding custom datasets for the full template field reference, the streaming plugin contract, and the available helpers.


Processes

Custom processes are Python functions decorated with @process and placed in plugins_dir/processes/. They appear in GET /processes alongside standard openEO processes and are callable directly by process_id in any process graph.

# plugins/processes/indices.py
import xarray as xr
from open_climate_service.process import process

@process(summary="Precipitation anomaly relative to a baseline mean")
def precip_anomaly(pr: xr.DataArray, baseline: float = 0.0) -> xr.DataArray:
    """Deviation of precipitation from a long-term baseline mean."""
    return pr - baseline

The @process decorator derives the process id (function name), summary, parameter names, types, and defaults from the function signature and docstring. Use explicit metadata to override descriptions:

@process(
    summary="Precipitation anomaly relative to a baseline mean",
    parameters={"baseline": {"description": "Long-term mean precipitation (kg m-2 s-1)."}},
)
def precip_anomaly(pr: xr.DataArray, baseline: float = 0.0) -> xr.DataArray:
    ...

A plugin process with the same id as an existing process overrides it. The server must be restarted to pick up new process files. For built-in climate indices, see Climate indices.


Workflows

Reusable pipeline compositions are implemented as UDPs (User Defined Processes) — JSON process graph files placed in plugins_dir/workflows/. A UDP is a named, parameterised composition of existing openEO processes callable by name from any openEO client.

plugins/
└── workflows/
    └── monthly_rainfall.json

Example: monthly rainfall totals

{
  "id": "monthly_rainfall",
  "summary": "Monthly total precipitation for a collection and time range",
  "parameters": [
    {"name": "collection_id", "description": "Collection to load", "schema": {"type": "string"}},
    {"name": "temporal_extent", "description": "Time range [start, end]", "schema": {"type": "array"}}
  ],
  "process_graph": {
    "load": {
      "process_id": "load_collection",
      "arguments": {
        "id": {"from_parameter": "collection_id"},
        "temporal_extent": {"from_parameter": "temporal_extent"}
      }
    },
    "aggregate": {
      "process_id": "aggregate_temporal_period",
      "arguments": {
        "data": {"from_node": "load"},
        "period": "month",
        "reducer": {
          "process_graph": {
            "sum": {
              "process_id": "sum",
              "arguments": {"data": {"from_parameter": "data"}},
              "result": true
            }
          }
        }
      }
    },
    "save": {
      "process_id": "save_result",
      "arguments": {"data": {"from_node": "aggregate"}, "format": "Zarr"},
      "result": true
    }
  }
}

Calling the workflow from any openEO client:

import openeo

conn = openeo.connect("http://your-instance:8000")
job = conn.execute_batch_job({
    "process_graph": {
        "result": {
            "process_id": "monthly_rainfall",
            "arguments": {
                "collection_id": "chirps_rainfall_daily",
                "temporal_extent": ["2020-01-01", "2023-12-31"]
            },
            "result": true
        }
    }
})

Workflow JSON files are loaded on each request to GET /process_graphs, so changes on disk take effect without restarting the server. A plugin workflow with the same id as a built-in overrides it.