Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Change Log
All notable changes to this project will be documented in this file.

## 2.1.5 - 2026-03
### Runner
- Added multi-column partitioning support via `target_partition_column` accepting a list
- Added `date_column` config option to explicitly specify which column receives the info_date
- Added `filters` option to dependencies for filtering by partition column values
- Added `target_table` config option to override the target table name
- Added `target_filters` config option for completion checking with multi-column partitions

## 2.1.4 - 2026-02
### Common
- table reader optimization
Expand Down
159 changes: 155 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,53 @@ This library currently contains:
Runner is the orchestrator and scheduler of Rialto. It can be used to execute any [job](#jobs) but is primarily designed to execute feature [maker](#maker) jobs.
The core of runner is execution of a Transformation class that can be extended for any purpose and the execution configuration that defines the handling of i/o, scheduling, dependencies and reporting.

Runner operates on the assumption that your Databricks tables contain a date column (partition column) that indicates the date of data arrival. This enables:

1. **Time-aware computation**: Run operations on dated tables while managing time-related dependencies automatically
2. **Retrospective simulation**: Run computations as they would have occurred on a specific date by setting `run_date` - ensuring data newer than that date is never used
3. **Dependency tracking**: Automatically verify that input data meets required freshness constraints relative to the run date
4. **Automatic completion detection**: Skip computations when output data already exists (configurable with `rerun` parameter)

#### Scheduling and Execution

Runner uses a schedule-based approach:
* Define a schedule (e.g., weekly on day 2, monthly on day 6) in the configuration
* Specify a watch period (how far back to look for missing runs)
* Runner finds all scheduled run dates within the watch period and executes missing ones
* For each date, dependencies are checked and target existence is verified before execution

#### Data Flow

For each pipeline execution:
1. **Dependency verification**: Check that all required input tables have data within specified time intervals
2. **Transformation execution**: Run your transformation to produce a Spark DataFrame
3. **Automatic enrichment**: Runner adds `INFORMATION_DATE` (the run date) and `VERSION` (package version) columns
4. **Partitioned write**: Data is written to Databricks with partitioning configuration
5. **Reporting**: Optional email notifications on failure and run information stored to tracking table

#### Dependency Tracking

Runner's dependency tracking ensures that all required input data is available before executing a pipeline. For each dependency:

* **Date-based checking**: Runner looks for data in the dependency table's date column
* **Interval calculation**: The required date is calculated by subtracting the dependency's interval from the run date
* **Existence verification**: Runner checks if data exists for the calculated date (and within any specified filters)
* **Missing data handling**: If required data is missing, Runner raises an error for that specific pipeline/date but continues executing other pipelines and dates in the queue

**Example:** If you're running a pipeline on 2024-01-15 with a dependency that has a 7-day interval:
* Runner checks if the dependency table has data for 2024-01-08 (15 days - 7 days)
* If the dependency has `filters: {VERSION: "v2"}`, it specifically checks for data where VERSION='v2'
* If data exists, the pipeline proceeds; otherwise, an error is raised for this specific execution, but other scheduled runs continue


### Transformation
For the details on the interface see the [implementation](rialto/runner/transformation.py)
Inside the transformation you have access to a [TableReader](#common), date of running, and if provided to Runner, a live spark session and [metadata manager](#metadata).
You can either implement your jobs directly via extending the Transformation class, or by using the [jobs](#jobs) abstraction.

### Runner
### Basic Usage

Bellow is the minimal code necessary to execute the runner.
Below is the minimal code necessary to execute the runner.

```python
from rialto.runner import Runner
Expand All @@ -64,6 +103,12 @@ Transformations are not included in the runner itself, it imports them dynamical

### Configuration

Runner is supplied with a run configuration that defines the computations it will execute. In each pipeline configuration you define:
- **Module**: Python transformation class to execute
- **Schedule**: When to run (daily/weekly/monthly) and on which day
- **Dependencies**: Input tables to check, with required freshness intervals and optional filters
- **Target**: Output location, partitioning strategy, and optional completion filters

```yaml
runner:
watched_period_units: "months" # unit of default run period
Expand Down Expand Up @@ -97,12 +142,15 @@ pipelines: # a list of pipelines to run
value: 1
- table: catalog.schema.table2
name: "table2"
date_col: info_date
interval:
units: "months"
value: 1
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
target_partition_column: INFORMATION_DATE # date to partition new tables on (can be a list for multi-column partitioning)
date_column: INFORMATION_DATE # Optional: explicitly specify which column receives the info_date value
target_table: custom_table_name # Optional: override the target table name (defaults to pipeline name)
metadata_manager: # optional
metadata_schema: catalog.metadata # schema where metadata is stored
feature_loader: # optional
Expand Down Expand Up @@ -131,9 +179,102 @@ pipelines: # a list of pipelines to run
value: 6
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
target_partition_column: # can be a single column or list for multi-column partitioning
- INFORMATION_DATE
- VERSION
```

### Multi-Column Partitioning

By default, Rialto partitions tables by a single date column (e.g., `INFORMATION_DATE`). However, you can partition by multiple columns to support use cases like A/B testing. For example, partitioning by `[INFORMATION_DATE, VERSION]` allows you to store multiple model versions side-by-side and track experiment variants.

#### Basic Multi-Column Partitioning

```yaml
target:
target_schema: catalog.schema
target_partition_column:
- INFORMATION_DATE # Date partition (receives run date automatically)
- VERSION # Additional partition (must exist in transformation output)
```

#### Explicit Date Column

If you want the date column to not be the first partition, specify it explicitly:

```yaml
target:
target_schema: catalog.schema
target_partition_column:
- VERSION # Primary partition
- INFORMATION_DATE # Secondary partition
date_column: INFORMATION_DATE # Explicit: this column receives the run date
```

#### Filtering Dependencies by Partition Values

When working with multi-partitioned tables, you may need to ensure dependencies come from specific partition values. For example, when a multi-partitioned table is used as a dependency further in the pipeline, use `filters` to specify required values:

```yaml
dependencies:
- table: catalog.schema.features
date_col: INFORMATION_DATE
interval:
units: days
value: 7
filters:
VERSION: "v2" # Only check for data where VERSION='v2'
REGION: "US" # Can filter on multiple columns
```

#### Target Filters for Completion Checking

When your target table is multi-partitioned, you need to specify which partition combination to check for completion:

```yaml
target:
target_schema: catalog.schema
target_partition_column:
- INFORMATION_DATE
- VERSION
target_filters:
VERSION: "v2" # Only skip computation if v2 data exists
```

**Example scenario:**
* Table has data for `INFORMATION_DATE=2024-01-01, VERSION=v1`
* You want to compute `INFORMATION_DATE=2024-01-01, VERSION=v2`
* Without `target_filters`: Runner sees the date exists → skips execution
* With `target_filters`: Runner checks for v2 specifically → doesn't exist → runs computation


### Using Filters in Transformations

When you've defined dependency filters in your config, you should use the same filters when reading data in your transformation. The `TableReader.get_latest()` method supports a `filters` parameter:

```python
from rialto.runner import Transformation
from rialto.runner.utils import find_dependency

class MyTransformation(Transformation):
def run(self, reader, run_date, spark):
# Get dependency config including filters
dep = find_dependency(self.pipeline_config, "table1")

# Read with same filters used in dependency checking
df = reader.get_latest(
table=dep.table,
date_column=dep.date_col,
date_until=run_date,
filters=dep.filters # Use filters from config
)

# Your transformation logic here
return df
```

### Runtime Parameterization

The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema.
Here are few examples of overrides:

Expand Down Expand Up @@ -685,7 +826,17 @@ until = datetime.strptime("2020-01-01", "%Y-%m-%d").date()

df = reader.get_latest(table="catalog.schema.table", date_until=until, date_column="information_date")

# most recent partition with filters (for multi-partitioned tables)
df = reader.get_latest(
table="catalog.schema.table",
date_column="information_date",
date_until=until,
filters={"VERSION": "v2", "REGION": "US"} # Find latest within filtered subset
)
```

The `filters` parameter is particularly useful when working with multi-column partitioned tables. It ensures "latest" means the latest date within the filtered subset, not the latest date overall.

For full information on parameters and their optionality see technical documentation.

_TableReader_ needs an active spark session and an information which column is the **date column**.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "rialto"
version = "2.1.4"
version = "2.1.5"
description = "Rialto is a framework for building and deploying machine learning features in a scalable and reusable way. It provides a set of tools that make it easy to define and deploy features and models, and it provides a way to orchestrate the execution of these features and models."
authors = [
{ name = "Marek Dobransky", email = "marekdobr@gmail.com" },
Expand Down
10 changes: 9 additions & 1 deletion rialto/common/table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import abc
import datetime
from typing import Optional
from typing import Dict, Optional

import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
Expand All @@ -37,13 +37,15 @@ def get_latest(
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
filters: Optional[Dict[str, str]] = None,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date

:param table: input table path
:param date_until: Optional until date (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:param filters: Optional dict of column filters to apply before finding latest date
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point that it's applied before the date filter if this is an outside partition like your usecase, but once you give people an option to filter, they will filter, and it would be faster to first select date, which is always a physical partition and then filter on anything else after that.

:return: Dataframe
"""
raise NotImplementedError
Expand Down Expand Up @@ -102,6 +104,7 @@ def get_latest(
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
filters: Optional[Dict[str, str]] = None,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date
Expand All @@ -110,10 +113,15 @@ def get_latest(
:param date_until: Optional until date (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param uppercase_columns: Option to refactor all column names to uppercase
:param filters: Optional dict of column filters to apply before finding latest date
:return: Dataframe
"""
df = self.spark.read.table(table)

if filters:
for col, val in filters.items():
df = df.filter(df[col] == val)

selected_date = self._get_latest_available_date(df, date_column, date_until)
df = df.filter(F.col(date_column) == selected_date)

Expand Down
8 changes: 6 additions & 2 deletions rialto/runner/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"get_pipelines_config",
]

from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from pydantic import BaseModel, ConfigDict

Expand Down Expand Up @@ -44,6 +44,7 @@ class DependencyConfig(BaseConfig):
name: Optional[str] = None
date_col: str
interval: IntervalConfig
filters: Optional[Dict[str, str]] = None


class ModuleConfig(BaseConfig):
Expand All @@ -68,7 +69,10 @@ class RunnerConfig(BaseConfig):

class TargetConfig(BaseConfig):
target_schema: str
target_partition_column: str
target_partition_column: Union[str, List[str]]
date_column: Optional[str] = None
target_table: Optional[str] = None
target_filters: Optional[Dict[str, str]] = None


class MetadataManagerConfig(BaseConfig):
Expand Down
Loading