diff --git a/feast/README.md b/feast/README.md new file mode 100644 index 0000000..c2fcb7e --- /dev/null +++ b/feast/README.md @@ -0,0 +1,123 @@ +# Feast Feature Store Examples + +These examples demonstrate how to use [Feast](https://docs.feast.dev/) on prokube +for feature management in ML workflows. + +## Prerequisites + +- Feast must be enabled on your cluster (ask your admin) +- You have `kubectl` access to your Kubeflow profile namespace + +## Quick Start + +### 1. Deploy a Redis instance + +Create a password secret and a Redis CR in your namespace: + +```bash +# Generate a random password +kubectl create secret generic redis-feast \ + -n \ + --from-literal=password=$(openssl rand -base64 24 | tr -d '/') + +# Deploy the Redis CR +kubectl apply -f redis-cr.yaml # see file below — edit namespace first +kubectl get redis -n -w +``` + +`redis-cr.yaml` is a plain Kubernetes manifest for the OpsTree Redis operator. +Create it with the contents shown in the prokube [user docs](https://docs.prokube.ai/user_docs/feast/). + +### 2. Create the Feast Redis secret + +```bash +NAMESPACE= +PASSWORD=$(kubectl get secret redis-feast -n $NAMESPACE \ + -o jsonpath='{.data.password}' | base64 -d) + +cat > /tmp/redis-config.yaml << EOF +connection_string: "redis-feast.${NAMESPACE}.svc.cluster.local:6379,password=${PASSWORD}" +EOF + +kubectl create secret generic feast-redis-config \ + -n $NAMESPACE \ + --from-file=redis=/tmp/redis-config.yaml + +rm /tmp/redis-config.yaml +``` + +### 3. Deploy a FeatureStore + +Edit `feast-cr.yaml` to set your namespace, then: + +```bash +kubectl apply -f feast-cr.yaml +kubectl get featurestore -n -w # wait until Ready +``` + +### 4. Run the notebook + +Open `feast_example.ipynb` in your Kubeflow notebook. The notebook reads the +`feast-redis-config` secret automatically and builds `feature_store.yaml` for you. + +## Files + +| File | What it is | +|------|------------| +| `feast-cr.yaml` | Kubernetes manifest — deploys the FeatureStore CR | +| `feature_store.yaml` | Feast SDK config — tells the Python client where registry and stores are | +| `features.py` | Feature definitions — entities, data sources, feature views | +| `feast_example.ipynb` | End-to-end notebook: generate data, apply, train, materialize, serve | + +### Why two YAML files? + +`feast-cr.yaml` is a **Kubernetes resource** (`kind: FeatureStore`) that the operator +reads to provision PVCs and the Feast server pod. You apply it once with `kubectl`. + +`feature_store.yaml` is a **Feast SDK config file** (fixed filename — Feast convention) +that the Python client and CLI read to know how to connect to the registry and stores. +You use it in notebooks and scripts. + +## Architecture + +Feast has three stores. Here is what each one does and which backend prokube uses: + +| Store | Purpose | Prokube default | Alternatives | +|-------|---------|-----------------|--------------| +| **Registry** | Stores feature definitions (entities, feature views, sources). Written on `feast apply`, read at startup. | SQLite on PVC | SQL databases (PostgreSQL, etc.) for multi-replica or shared setups | +| **Online store** | Holds the *latest* feature value per entity. Read on every inference request — latency critical. | Redis (your `Redis` CR) | SQLite on PVC (dev/test only; not multi-replica safe) | +| **Offline store** | Historical feature records for point-in-time joins during training. Batch workload, not on serving path. | Parquet/file on PVC | Dask (same parquet files, distributed compute — use only if data exceeds pod memory); cloud warehouses (BigQuery, Snowflake, Redshift) | + +The offline store default is `type: file` (pandas). You can switch to `type: dask` in +`feast-cr.yaml` if your datasets are too large to fit in memory, but it adds complexity +and is rarely needed. + +``` + ┌─────────────────────────────────┐ + │ Your Namespace │ + │ │ + │ Redis CR (redis-feast) │ + │ - your private Redis instance │ + │ │ + feast apply ──────▶ SQLite /tmp/registry.db │ + (notebook) │ - feature definitions │ + │ - entity schemas │ + │ │ + materialize ──────▶ Redis online store │ + │ - latest feature values │ + │ - sub-ms latency │ + │ - persistent across sessions │ + │ │ + historical ──────▶ Parquet on PVC (offline store) │ + features │ - time-series feature data │ + │ │ + │ Feast Server pod │ + │ - HTTP API for online features │ + │ - registry on PVC (/data/...) │ + └─────────────────────────────────┘ +``` + +- **Redis** (per-namespace): your private online store. You own and manage it. +- **Registry** (SQLite): feature definitions. In notebook workflows, uses `/tmp/registry.db`. + The Feast server pod uses the registry PVC at `/data/registry/registry.db`. +- **Offline store** (parquet/PVC): historical feature data for training. diff --git a/feast/feast-cr.yaml b/feast/feast-cr.yaml new file mode 100644 index 0000000..7aa27fb --- /dev/null +++ b/feast/feast-cr.yaml @@ -0,0 +1,50 @@ +# Example FeatureStore CR for prokube. +# Edit the namespace to match your Kubeflow profile. +# +# The operator will create: +# - A Feast deployment + service (online feature server) +# - PVCs for the SQLite registry and offline data store +# - A ConfigMap (feast--client) with client connection info +# +# Prerequisites: +# - feast-redis-config secret must exist in your namespace (see README) +apiVersion: feast.dev/v1 +kind: FeatureStore +metadata: + name: my-store + namespace: # <-- change this +spec: + feastProject: my_features + services: + runFeastApplyOnInit: false + securityContext: + runAsUser: 0 + registry: + local: + persistence: + file: + pvc: + mountPath: /data/registry + create: + storageClassName: mayastor-no-redundancy # adjust for your cluster + resources: + requests: + storage: 1Gi + offlineStore: + persistence: + file: + type: file + pvc: + mountPath: /data/offline + create: + storageClassName: mayastor-no-redundancy # adjust for your cluster + resources: + requests: + storage: 10Gi + onlineStore: + persistence: + store: + type: redis + secretRef: + name: feast-redis-config + secretKeyName: redis diff --git a/feast/feast_example.ipynb b/feast/feast_example.ipynb new file mode 100644 index 0000000..1bd09a6 --- /dev/null +++ b/feast/feast_example.ipynb @@ -0,0 +1,409 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1eaa631f", + "metadata": {}, + "source": [ + "# Feast Feature Store — End-to-End Example\n", + "\n", + "This notebook walks through the full Feast workflow on prokube:\n", + "\n", + "1. Generate sample feature data\n", + "2. Configure the Feast client\n", + "3. Register features in the registry (`feast apply`)\n", + "4. Retrieve historical features for training\n", + "5. Train a model and log to MLflow\n", + "6. Materialize features to the Redis online store\n", + "7. Serve features online for inference\n", + "\n", + "## Prerequisites\n", + "\n", + "- A `FeatureStore` CR is deployed in your namespace (see `feast-cr.yaml`)\n", + "- The `feast-redis-config` secret exists in your namespace (ask your admin)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4b8f4c32", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -q feast scikit-learn mlflow" + ] + }, + { + "cell_type": "markdown", + "id": "3956d1d1", + "metadata": {}, + "source": [ + "## 1. Generate sample data\n", + "\n", + "We create a parquet file simulating hourly driver statistics over the past 7 days.\n", + "In a real scenario this would come from your data pipeline.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f2b288da", + "metadata": {}, + "outputs": [], + "source": [ + "import datetime\n", + "import os\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "np.random.seed(42)\n", + "n = 1000\n", + "now = datetime.datetime.now()\n", + "timestamps = [now - datetime.timedelta(hours=i) for i in range(n)]\n", + "\n", + "driver_df = pd.DataFrame({\n", + " \"driver_id\": np.random.choice([1001, 1002, 1003, 1004, 1005], n),\n", + " \"event_timestamp\": timestamps,\n", + " \"conv_rate\": np.random.uniform(0.1, 1.0, n).astype(np.float32),\n", + " \"acc_rate\": np.random.uniform(0.5, 1.0, n).astype(np.float32),\n", + " \"avg_daily_trips\": np.random.randint(1, 50, n).astype(np.int64),\n", + " \"created\": timestamps,\n", + "})\n", + "\n", + "os.makedirs(\"data\", exist_ok=True)\n", + "driver_df.to_parquet(\"data/driver_stats.parquet\")\n", + "print(f\"Created {n} rows for {driver_df['driver_id'].nunique()} drivers\")\n", + "driver_df.head()\n" + ] + }, + { + "cell_type": "markdown", + "id": "ef7e1942", + "metadata": {}, + "source": [ + "## 2. Configure the Feast client\n", + "\n", + "We build `feature_store.yaml` by reading the Redis connection string from the\n", + "`feast-redis-config` secret in our namespace.\n", + "\n", + "The registry uses a local SQLite file (`/tmp/registry.db`). This is ephemeral\n", + "within the notebook session — re-run `feast apply` at the start of each session\n", + "to repopulate it. The Redis online store is persistent across sessions.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8804025", + "metadata": {}, + "outputs": [], + "source": [ + "import base64\n", + "import subprocess\n", + "import yaml\n", + "\n", + "\n", + "def get_namespace():\n", + " \"\"\"Read the current namespace from the pod's service account.\"\"\"\n", + " try:\n", + " with open(\"/var/run/secrets/kubernetes.io/serviceaccount/namespace\") as f:\n", + " return f.read().strip()\n", + " except FileNotFoundError:\n", + " return subprocess.check_output(\n", + " [\"kubectl\", \"config\", \"view\", \"--minify\", \"-o\", \"jsonpath={..namespace}\"]\n", + " ).decode().strip()\n", + "\n", + "\n", + "def get_redis_connection_string():\n", + " \"\"\"Read the Redis connection string from the feast-redis-config secret.\n", + "\n", + " The secret has key 'redis' whose value is a YAML map:\n", + " connection_string: \"host:port,password=...\"\n", + " \"\"\"\n", + " result = subprocess.run(\n", + " [\"kubectl\", \"get\", \"secret\", \"feast-redis-config\",\n", + " \"-o\", \"jsonpath={.data.redis}\"],\n", + " capture_output=True, text=True, check=True,\n", + " )\n", + " raw = base64.b64decode(result.stdout).decode()\n", + " return yaml.safe_load(raw)[\"connection_string\"]\n", + "\n", + "\n", + "NAMESPACE = get_namespace()\n", + "REDIS_CONNECTION_STRING = get_redis_connection_string()\n", + "FEAST_PROJECT = \"my_features\" # must match spec.feastProject in your FeatureStore CR\n", + "\n", + "feature_store_yaml = (\n", + " f\"project: {FEAST_PROJECT}\\n\"\n", + " \"provider: local\\n\"\n", + " \"offline_store:\\n\"\n", + " \" type: file\\n\"\n", + " \"online_store:\\n\"\n", + " \" type: redis\\n\"\n", + " f\" connection_string: \\\"{REDIS_CONNECTION_STRING}\\\"\\n\"\n", + " \"registry:\\n\"\n", + " \" registry_type: file\\n\"\n", + " \" path: /tmp/registry.db\\n\"\n", + " \"auth:\\n\"\n", + " \" type: no_auth\\n\"\n", + " \"entity_key_serialization_version: 3\\n\"\n", + ")\n", + "\n", + "with open(\"feature_store.yaml\", \"w\") as f:\n", + " f.write(feature_store_yaml)\n", + "\n", + "print(\"feature_store.yaml written\")\n", + "print(f\"Namespace: {NAMESPACE}\")\n", + "# Print host:port only — hide password\n", + "print(f\"Redis: {REDIS_CONNECTION_STRING.split(',')[0]}\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "358c2624", + "metadata": {}, + "source": [ + "## 3. Register features\n", + "\n", + "`feast apply` reads `features.py` and writes the entity, data source, and\n", + "feature view definitions to the local SQLite registry (`/tmp/registry.db`).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "52ecbcf6", + "metadata": {}, + "outputs": [], + "source": [ + "!feast apply" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10eb1055", + "metadata": {}, + "outputs": [], + "source": [ + "# Verify what was registered\n", + "!feast feature-views list" + ] + }, + { + "cell_type": "markdown", + "id": "c7f35090", + "metadata": {}, + "source": [ + "## 4. Retrieve historical features for training\n", + "\n", + "`get_historical_features` performs a **point-in-time join**: for each entity row,\n", + "it finds the most recent feature values as of that entity's timestamp. This\n", + "prevents data leakage — you only see features that were available when the\n", + "event occurred.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7796c5fc", + "metadata": {}, + "outputs": [], + "source": [ + "from feast import FeatureStore\n", + "\n", + "store = FeatureStore(repo_path=\".\")\n", + "\n", + "entity_df = pd.DataFrame({\n", + " \"driver_id\": [1001, 1002, 1003, 1004, 1005],\n", + " \"event_timestamp\": [now] * 5,\n", + "})\n", + "\n", + "training_df = store.get_historical_features(\n", + " entity_df=entity_df,\n", + " features=[\n", + " \"driver_hourly_stats:conv_rate\",\n", + " \"driver_hourly_stats:acc_rate\",\n", + " \"driver_hourly_stats:avg_daily_trips\",\n", + " ],\n", + ").to_df()\n", + "\n", + "print(\"Training data (point-in-time correct):\")\n", + "training_df\n" + ] + }, + { + "cell_type": "markdown", + "id": "5e15eee6", + "metadata": {}, + "source": [ + "## 5. Train a model and log to MLflow\n", + "\n", + "Use the Feast-provided training data to train a simple model, then log\n", + "everything to MLflow — including which Feast features were used.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "61f19018", + "metadata": {}, + "outputs": [], + "source": [ + "import mlflow\n", + "from sklearn.linear_model import LinearRegression\n", + "from sklearn.metrics import mean_squared_error, r2_score\n", + "from sklearn.model_selection import train_test_split\n", + "\n", + "# Adjust the tracking URI if your cluster uses a different location\n", + "mlflow.set_tracking_uri(\"http://mlflow-mlflow-tracking-server.mlflow.svc.cluster.local:80\")\n", + "mlflow.set_experiment(\"feast-driver-prediction\")\n", + "\n", + "FEATURE_COLS = [\"acc_rate\", \"avg_daily_trips\"]\n", + "TARGET = \"conv_rate\"\n", + "\n", + "X = training_df[FEATURE_COLS].fillna(0)\n", + "y = training_df[TARGET].fillna(0)\n", + "\n", + "X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n", + "\n", + "with mlflow.start_run(run_name=\"feast-driver-conv-rate\") as run:\n", + " model = LinearRegression()\n", + " model.fit(X_train, y_train)\n", + " y_pred = model.predict(X_test)\n", + "\n", + " mse = mean_squared_error(y_test, y_pred)\n", + " r2 = r2_score(y_test, y_pred)\n", + "\n", + " mlflow.log_param(\"feast_project\", FEAST_PROJECT)\n", + " mlflow.log_param(\"feast_feature_view\", \"driver_hourly_stats\")\n", + " mlflow.log_param(\"features\", \", \".join(FEATURE_COLS))\n", + " mlflow.log_param(\"target\", TARGET)\n", + " mlflow.log_param(\"n_training_samples\", len(X_train))\n", + " mlflow.log_metric(\"mse\", mse)\n", + " mlflow.log_metric(\"r2_score\", r2)\n", + " mlflow.sklearn.log_model(model, \"driver_conv_model\")\n", + "\n", + " print(f\"MLflow run: {run.info.run_id}\")\n", + " print(f\"MSE: {mse:.4f}, R2: {r2:.4f}\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "51b15c4b", + "metadata": {}, + "source": [ + "## 6. Materialize features to Redis\n", + "\n", + "Materialization reads the latest feature values from the offline parquet store\n", + "and writes them to Redis for low-latency online serving.\n", + "\n", + "In production you would run this on a schedule (e.g. hourly cron job).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6183ced7", + "metadata": {}, + "outputs": [], + "source": [ + "!feast materialize-incremental $(date -u +'%Y-%m-%dT%H:%M:%S')" + ] + }, + { + "cell_type": "markdown", + "id": "66ec423c", + "metadata": {}, + "source": [ + "## 7. Online feature serving\n", + "\n", + "Retrieve the latest feature values for specific entities. The Feast SDK reads\n", + "directly from Redis — no round-trip through the Feast server pod is needed.\n", + "\n", + "This is what you call at inference time: given a `driver_id`, get their\n", + "current features to feed into the model.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6982cdba", + "metadata": {}, + "outputs": [], + "source": [ + "online_features = store.get_online_features(\n", + " features=[\n", + " \"driver_hourly_stats:conv_rate\",\n", + " \"driver_hourly_stats:acc_rate\",\n", + " \"driver_hourly_stats:avg_daily_trips\",\n", + " ],\n", + " entity_rows=[{\"driver_id\": 1001}, {\"driver_id\": 1002}],\n", + ").to_dict()\n", + "\n", + "print(\"Online features (latest values from Redis):\")\n", + "for k, v in online_features.items():\n", + " print(f\" {k}: {v}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "72e36e86", + "metadata": {}, + "outputs": [], + "source": [ + "# Use online features for inference\n", + "inference_df = pd.DataFrame(online_features)\n", + "predictions = model.predict(inference_df[FEATURE_COLS])\n", + "\n", + "for driver_id, pred in zip(inference_df[\"driver_id\"], predictions):\n", + " print(f\"Driver {driver_id}: predicted conv_rate = {pred:.4f}\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "72a52751", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "| Step | Command / API | Purpose |\n", + "|------|--------------|--------|\n", + "| Define features | `features.py` | Declare entities, sources, feature views |\n", + "| Register | `feast apply` | Write definitions to local SQLite registry |\n", + "| Training data | `store.get_historical_features()` | Point-in-time correct join from parquet |\n", + "| Materialize | `feast materialize-incremental` | Push latest values to Redis online store |\n", + "| Online serving | `store.get_online_features()` | Sub-ms lookup from Redis by entity key |\n", + "\n", + "### When to use Feast vs raw parquet\n", + "\n", + "- **Use Feast** when you need consistent feature definitions across training and\n", + " serving, point-in-time correctness, or low-latency online feature serving.\n", + "- **Use raw parquet** for one-off experiments where feature management overhead\n", + " is not worth it.\n", + "\n", + "### Note on the SQLite registry\n", + "\n", + "This notebook uses `/tmp/registry.db` as the registry, which is local to the\n", + "notebook session. Re-run the \"Configure\" and `feast apply` cells at the start\n", + "of each new session. The Redis online store is persistent across sessions —\n", + "features materialized in one session are still available in subsequent ones.\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/feast/feature_store.yaml b/feast/feature_store.yaml new file mode 100644 index 0000000..0d6b181 --- /dev/null +++ b/feast/feature_store.yaml @@ -0,0 +1,19 @@ +# Template — fill in your Redis connection details. +# Get the Redis host and password from your admin. +# +# For workflows running inside the cluster, you can use /tmp/registry.db +# as the registry path (ephemeral, single-run). For persistent registry +# access, mount the registry PVC and use /data/registry/registry.db. +project: my_features +provider: local +offline_store: + type: file +online_store: + type: redis + connection_string: ":6379,password=" +registry: + registry_type: file + path: /tmp/registry.db +auth: + type: no_auth +entity_key_serialization_version: 3 diff --git a/feast/features.py b/feast/features.py new file mode 100644 index 0000000..8116f89 --- /dev/null +++ b/feast/features.py @@ -0,0 +1,50 @@ +""" +Feast feature definitions for the driver stats example. + +This file defines: +- An entity (driver_id) identifying what we're tracking features for +- A data source (parquet file with historical driver data) +- A feature view (driver_hourly_stats) with three features + +To register these with the Feast registry: + feast apply + +To materialize features to the online store: + feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S") +""" + +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource +from feast.types import Float32, Int64 + +# Entity: the "primary key" for feature lookups. +# When you request features, you provide entity values (e.g. driver_id=1001). +driver = Entity( + name="driver_id", + description="Unique driver identifier", +) + +# Data source: where historical feature data lives. +# This parquet file is generated by the notebook example. +driver_stats_source = FileSource( + path="data/driver_stats.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Feature view: a logical group of features from one data source. +# - `ttl` controls how stale a feature can be before it's considered expired +# - `online=True` means features are materialized to the online store +driver_hourly_stats = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=7), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + source=driver_stats_source, + online=True, +)