Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ dependencies = [
"requests",
"parsl",
"jinja2",
"pint"
"pint",
"CMIP7-data-request-api>=1.3"
]
dynamic = ["version"]

Expand Down
19 changes: 12 additions & 7 deletions src/access_moppy/atmosphere.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np
import xarray as xr

from access_moppy.base import CMIP6_CMORiser
from access_moppy.base import CMORiser
from access_moppy.derivations import custom_functions, evaluate_expression
from access_moppy.utilities import (
calculate_latitude_bounds,
Expand All @@ -13,9 +13,9 @@
)


class CMIP6_Atmosphere_CMORiser(CMIP6_CMORiser):
class Atmosphere_CMORiser(CMORiser):
"""
Handles CMORisation of NetCDF datasets using CMIP6 metadata (Atmosphere/Land).
Handles CMORisation of NetCDF datasets for Atmosphere/Land variables across CMIP versions.
"""

def calculate_missing_bounds_variables(self, bnds_required):
Expand Down Expand Up @@ -64,7 +64,7 @@ def calculate_missing_bounds_variables(self, bnds_required):
else:
# For other coordinates, we could add more handlers or skip
warnings.warn(
f"No automatic calculation available for '{bnds_var}'. This may cause CMIP6 compliance issues.",
f"No automatic calculation available for '{bnds_var}'. This may cause CMIP compliance issues.",
UserWarning,
stacklevel=3,
)
Expand Down Expand Up @@ -211,9 +211,14 @@ def select_and_process_variables(self):
self.ds = self.ds.rename(rename_map)

# Transpose the data variable according to the CMOR dimensions
cmor_dims = re.sub(
r"\w*level", "lev", self.vocab.variable["dimensions"]
).split()
# Handle both string and list dimension formats
dimensions = self.vocab.variable["dimensions"]
try:
# Try treating as string (space-separated)
cmor_dims = re.sub(r"\w*level", "lev", dimensions).split()
except TypeError:
# If re.sub() fails (TypeError for list input), it's already a list
cmor_dims = [re.sub(r"\w*level", "lev", dim) for dim in dimensions]

transpose_order = [
self.vocab.axes[dim]["out_name"]
Expand Down
146 changes: 48 additions & 98 deletions src/access_moppy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import numpy as np
import psutil
import xarray as xr
from cftime import date2num, num2date
from cftime import date2num
from dask.distributed import get_client

from access_moppy.utilities import (
Expand Down Expand Up @@ -165,9 +165,9 @@ def rechunk_dataset(self, ds: xr.Dataset) -> xr.Dataset:
return rechunked_ds


class CMIP6_CMORiser:
class CMORiser:
"""
Base class for CMIP6 CMORisers, providing shared logic for CMORisation.
Base class for CMORisers, providing shared logic for CMORisation across different CMIP versions.
"""

type_mapping = type_mapping
Expand All @@ -177,7 +177,7 @@ def __init__(
input_data: Optional[Union[str, List[str], xr.Dataset, xr.DataArray]] = None,
*,
output_path: str,
cmip6_vocab: Any,
vocab: Any,
variable_mapping: Dict[str, Any],
compound_name: str,
drs_root: Optional[Path] = None,
Expand Down Expand Up @@ -229,7 +229,7 @@ def __init__(
self.output_path = output_path
# Extract cmor_name from compound_name
_, self.cmor_name = compound_name.split(".")
self.vocab = cmip6_vocab
self.vocab = vocab
self.mapping = variable_mapping
self.drs_root = Path(drs_root) if drs_root is not None else None
self.version_date = datetime.now().strftime("%Y%m%d")
Expand Down Expand Up @@ -340,22 +340,31 @@ def _preprocess(ds):
)
else:
try:
# Enhanced validation with CMIP6 frequency compatibility
detected_freq, resampling_required = (
validate_cmip6_frequency_compatibility(
self.input_paths,
self.compound_name,
time_coord="time",
interactive=True,
)
)
if resampling_required:
print(
f"✓ Temporal resampling will be applied: {detected_freq} → CMIP6 target frequency"
# Enhanced validation with CMIP frequency compatibility
# Use CMIP6-specific validation if available, otherwise skip
if (
hasattr(self.vocab, "__class__")
and "CMIP6" in self.vocab.__class__.__name__
):
detected_freq, resampling_required = (
validate_cmip6_frequency_compatibility(
self.input_paths,
self.compound_name,
time_coord="time",
interactive=True,
)
)
if resampling_required:
print(
f"✓ Temporal resampling will be applied: {detected_freq} → CMIP6 target frequency"
)
else:
print(
f"✓ Validated compatible temporal frequency: {detected_freq}"
)
else:
print(
f"✓ Validated compatible temporal frequency: {detected_freq}"
"✓ Skipping detailed frequency validation for this CMIP version"
)
except (FrequencyMismatchError, IncompatibleFrequencyError) as e:
raise e # Re-raise these specific errors as-is
Expand Down Expand Up @@ -394,7 +403,7 @@ def _preprocess(ds):
)

if was_resampled:
print("✅ Applied temporal resampling to match CMIP6 requirements")
print("✅ Applied temporal resampling to match CMIP requirements")
else:
print("✅ No resampling needed - frequency already compatible")

Expand Down Expand Up @@ -727,19 +736,16 @@ def ordered(ds, core=("lat", "lon", "time", "height")):
self.ds = ordered(self.ds)

def _build_drs_path(self, attrs: Dict[str, str]) -> Path:
drs_components = [
attrs.get("mip_era", "CMIP6"),
attrs["activity_id"],
attrs["institution_id"],
attrs["source_id"],
attrs["experiment_id"],
attrs["variant_label"],
attrs["table_id"],
attrs["variable_id"],
attrs["grid_label"],
f"v{self.version_date}",
]
return self.drs_root.joinpath(*drs_components)
"""
Build DRS path using the vocabulary class's controlled vocabulary specifications.
"""
if not hasattr(self.vocab, "build_drs_path"):
raise AttributeError(
f"Vocabulary class {type(self.vocab).__name__} does not implement build_drs_path() method. "
"Please ensure you are using a proper CMIP vocabulary class (CMIP6Vocabulary or CMIP7Vocabulary)."
)

return self.vocab.build_drs_path(self.drs_root, self.version_date)

def _update_latest_symlink(self, versioned_path: Path):
latest_link = versioned_path.parent / "latest"
Expand Down Expand Up @@ -782,18 +788,15 @@ def write(self):
aux_coords.append(name)

attrs = self.ds.attrs
required_keys = [
"variable_id",
"table_id",
"source_id",
"experiment_id",
"variant_label",
"grid_label",
]

# Get required attributes from the vocabulary (works for both CMIP6 and CMIP7)
required_keys = self.vocab.get_required_attribute_names()

missing = [k for k in required_keys if k not in attrs]
if missing:
raise ValueError(
f"Missing required CMIP6 global attributes for filename: {missing}"
print(f"⚠️ Warning: Missing required global attributes: {missing}")
print(
" Some attributes may be required for CMIP compliance but file will still be written."
)

# ========== Memory Check ==========
Expand Down Expand Up @@ -878,62 +881,9 @@ def estimate_data_size(ds, cmor_name):
f"Data size: {data_size / 1024**3:.2f} GB, Available memory: {available_memory / 1024**3:.2f} GB"
)

# Generate filename based on whether time coordinate exists
if "time" in self.ds[self.cmor_name].coords:
# Time-dependent variable: include time range in filename
time_var = self.ds[self.cmor_name].coords["time"]
units = time_var.attrs["units"]
calendar = time_var.attrs.get("calendar", "standard").lower()
times = num2date(time_var.values[[0, -1]], units=units, calendar=calendar)
start, end = [f"{t.year:04d}{t.month:02d}" for t in times]
time_range = f"{start}-{end}"
filename = (
f"{attrs['variable_id']}_{attrs['table_id']}_{attrs['source_id']}_"
f"{attrs['experiment_id']}_{attrs['variant_label']}_"
f"{attrs['grid_label']}_{time_range}.nc"
)
else:
# Time-independent variable: use "fx" (fixed) indicator instead of time range
filename = (
f"{attrs['variable_id']}_{attrs['table_id']}_{attrs['source_id']}_"
f"{attrs['experiment_id']}_{attrs['variant_label']}_"
f"{attrs['grid_label']}_fx.nc"
)

# Check if this is sub-daily or daily data based on table_id or compound_name
is_subdaily_data = False
is_daily_data = False

if hasattr(self, "compound_name") and self.compound_name:
table_name = self.compound_name.split(".")[0]
table_lower = table_name.lower()
is_subdaily_data = any(freq in table_lower for freq in ["3hr", "6hr", "hr"])
is_daily_data = "day" in table_lower
elif "table_id" in attrs:
table_lower = attrs["table_id"].lower()
is_subdaily_data = any(freq in table_lower for freq in ["3hr", "6hr", "hr"])
is_daily_data = "day" in table_lower

# Format time range based on frequency
if is_subdaily_data:
# Sub-daily data: include hour and minute (YYYYMMDDHHMM)
start, end = [
f"{t.year:04d}{t.month:02d}{t.day:02d}{t.hour:02d}{t.minute:02d}"
for t in times
]
elif is_daily_data:
# Daily data: include day (YYYYMMDD)
start, end = [f"{t.year:04d}{t.month:02d}{t.day:02d}" for t in times]
else:
# Monthly or other data: year and month only (YYYYMM)
start, end = [f"{t.year:04d}{t.month:02d}" for t in times]

time_range = f"{start}-{end}"

filename = (
f"{attrs['variable_id']}_{attrs['table_id']}_{attrs['source_id']}_"
f"{attrs['experiment_id']}_{attrs['variant_label']}_"
f"{attrs['grid_label']}_{time_range}.nc"
# Generate filename using vocabulary-specific logic
filename = self.vocab.generate_filename(
attrs, self.ds, self.cmor_name, self.compound_name
)

if self.drs_root:
Expand Down
Loading