diff --git a/Data_Ingestion/Ingest_data_using_yaml/.ipynb_checkpoints/Ingest_data_using_YAML-checkpoint.ipynb b/Data_Ingestion/Ingest_data_using_yaml/.ipynb_checkpoints/Ingest_data_using_YAML-checkpoint.ipynb deleted file mode 100644 index 0c1431e..0000000 --- a/Data_Ingestion/Ingest_data_using_yaml/.ipynb_checkpoints/Ingest_data_using_YAML-checkpoint.ipynb +++ /dev/null @@ -1,525 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "95c2a4f0-715a-4fcc-98b1-4303b6f37095", - "metadata": {}, - "outputs": [], - "source": [ - "Oracle AI Data Platform v1.0\n", - "\n", - "Copyright © 2025, Oracle and/or its affiliates.\n", - "\n", - "Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/" - ] - }, - { - "cell_type": "markdown", - "id": "06893007-c10b-45a9-a689-8fb746510a29", - "metadata": { - "execution": { - "iopub.status.busy": "2025-03-31T17:59:13.953Z" - }, - "type": "python" - }, - "source": [ - "# YAML Driven Data Ingestion\n", - "\n", - "You have multiple data sources (CSV, JSON, other file formats or even JDBC) that need to be ingested into delta tables on AI Data Platform. Instead of hardcoding the file paths, schema, and other parameters in the PySpark script, you define them in a YAML configuration file. You can also ingest from other cloud storage platform including AWS S3, Azure ADLS etc the connectors need to be installed in the cluster. This makes the pipeline flexible and easy to maintain.\n", - "\n", - "This notebook demonstrates this using the PyYAML (https://pyyaml.org/wiki/PyYAMLDocumentation) framework. It covers:\n", - " \n", - " 1. **Ingesting from cloud storage (also includes external volume)**\n", - " 2. **Schema validation and data rules - simple demo**\n", - " 3. **Data preparation in Spark SQL as part of ingest**\n", - " 4. **Ingesting from a JDBC data source**\n", - "\n", - " **Prerequisites**\n", - "\n", - "Before you begin, ensure you have:\n", - " - The necessary IAM policies for accessing AI Data Platform. Learn more about permissions.\n", - " - A configured AI Data Platform environment with a compute cluster created - install the requirements file into cluster libraries, this includes;\n", - " - pyyaml\n", - " - Change the config files with information for your file locations - your bucket/namespace or external volume paths\n", - "\n", - " **Key Benefits:**\n", - " - Config-Driven – No need to modify the script for new data sources.\n", - " - Scalable – Easily add more sources to the YAML file.\n", - " - Flexible – Supports multiple file formats dynamically.\n", - " - Delta Lake Benefits – ACID transactions, schema evolution, and time travel.\n", - "\n", - "\n", - "**Next Steps**\n", - "\n", - "Now that you’ve explored this sample YAML driven example ingest in the notebook, try it out with your own data sources changing the config files!" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f8ac9433-eac6-4dde-bc6d-ccca50c8f2ba", - "metadata": { - "type": "sql" - }, - "outputs": [], - "source": [ - "%sql\n", - "create catalog if not exists lake;\n", - "create schema if not exists lake.bronze;" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "8347ec12-c77e-4a0d-aa41-39459519e02d", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-05T16:16:07.636Z" - }, - "type": "python" - }, - "outputs": [], - "source": [ - "import yaml\n", - "from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType\n", - "from pyspark.sql.functions import col\n", - "\n", - "# Load YAML configuration file\n", - "def load_config(config_path):\n", - " with open(config_path, \"r\") as file:\n", - " return yaml.safe_load(file)\n", - "\n", - "# Function to parse schema from YAML\n", - "def get_spark_schema(schema_config):\n", - " type_mapping = {\n", - " \"integer\": IntegerType(),\n", - " \"string\": StringType(),\n", - " \"double\": DoubleType()\n", - " }\n", - " fields = []\n", - " for col_def in schema_config:\n", - " name, dtype = col_def.split(\":\")\n", - " if dtype not in type_mapping:\n", - " raise ValueError(f\"Unsupported data type: {dtype}\")\n", - " fields.append(StructField(name, type_mapping[dtype], True))\n", - " return StructType(fields)\n", - "\n", - "# Data quality checks\n", - "def validate_data(df, quality_checks):\n", - " for column in quality_checks.get(\"not_null\", []):\n", - " if df.filter(col(column).isNull()).count() > 0:\n", - " raise ValueError(f\"Null values found in required column: {column}\")\n", - "\n", - " for column, min_val in quality_checks.get(\"min_value\", {}).items():\n", - " if df.filter(col(column) < min_val).count() > 0:\n", - " raise ValueError(f\"Values in column {column} are below minimum allowed: {min_val}\")\n", - "\n", - "# Apply preprocessing SQL\n", - "def apply_sql_transformations(df, name, sql_query):\n", - " df.createOrReplaceTempView(name+\"_stg\")\n", - " return spark.sql(sql_query)\n", - "\n", - "def send_alert(message):\n", - " print(f\"ALERT: {message}\")\n", - " return\n", - "\n", - "# Ingestion function with schema validation, data quality checks, and partitioning\n", - "def ingest_data(source):\n", - " try:\n", - " # Get expected schema - this will be checked against input data\n", - " expected_schema = get_spark_schema(source.get(\"schema\", {}))\n", - "\n", - " # Read data\n", - " path = source.get(\"path\")\n", - " if path is None:\n", - " df = spark.read.format(source[\"format\"]).options(**source.get(\"options\",{})).load()\n", - " else:\n", - " df = spark.read.format(source[\"format\"]).options(**source.get(\"options\",{})).load(source[\"path\"])\n", - "\n", - " # Validate schema\n", - " if expected_schema is not None and len(expected_schema) > 0:\n", - " df_schema = set(df.schema.names)\n", - " expected_schema_names = set(field.name for field in expected_schema)\n", - "\n", - " if df_schema != expected_schema_names:\n", - " raise ValueError(f\"Schema mismatch for {source['name']}. Expected: {expected_schema_names}, Found: {df_schema}\")\n", - "\n", - " # Apply preprocessing SQL if defined\n", - " if \"preprocessing_sql\" in source:\n", - " df = apply_sql_transformations(df, source[\"name\"], source[\"preprocessing_sql\"])\n", - "\n", - " # Perform data quality checks\n", - " validate_data(df, source.get(\"quality_checks\", {}))\n", - "\n", - " # Write to Delta Lake with partitioning\n", - " partition_cols = source.get(\"partition_columns\", [])\n", - " if partition_cols:\n", - " df.write.format(\"delta\").mode(\"overwrite\").partitionBy(*partition_cols).saveAsTable(source[\"target_table\"])\n", - " else:\n", - " df.write.format(\"delta\").mode(\"overwrite\").saveAsTable(source[\"target_table\"])\n", - "\n", - " except Exception as e:\n", - " error_message = f\"Error processing {source['name']}: {str(e)}\"\n", - " send_alert(error_message)" - ] - }, - { - "cell_type": "markdown", - "id": "b329c996-76ce-4748-b623-1498b8957562", - "metadata": { - "type": "markdown" - }, - "source": [ - "# Ingest CSV, JSON data easily\n", - "\n", - "1. You can define define data sources to import in the YAML.\n", - "2. The sources can be any Spark support file type, with type specific options (see CSV and JSON below).\n", - "3. The target delta table should be specified as catalog.schema.table.\n", - "\n", - "```\n", - "data_sources:\n", - " - name: sales_data\n", - " path: \"oci://your_bucket/your_namespace/sales.csv\"\n", - " format: \"csv\"\n", - " options:\n", - " header: \"true\"\n", - " inferSchema: \"true\"\n", - " target_table: \"lake.bronze.sales\"\n", - "\n", - " - name: user_activity\n", - " path: \"oci://your_bucket/your_namespace/user_activity.json\"\n", - " format: \"json\"\n", - " options:\n", - " multiline: \"true\"\n", - " target_table: \"lake.bronze.user_activity\"\n", - "```\n", - "4. If you have configured S3, ADLS etc, you can change the file to s3a://, abfss:// etc" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "452b3db0-85ef-42d4-94e1-7128cd833f71", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-05T15:48:56.551Z" - }, - "type": "python" - }, - "outputs": [], - "source": [ - "config = load_config(\"/Workspace/config/config_file_data.yaml\")\n", - "\n", - "for source in config[\"data_sources\"]:\n", - " ingest_data(source)" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "4a077ec4-22a2-482a-b8e6-fa1fa1b1c7bb", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-05T16:36:06.207Z" - }, - "type": "python" - }, - "outputs": [ - { - "data": { - "text/html": [ - "
+-------------+----------+----------+---------------+--------+------------+-----------+\n", - "|TransactionID| Date|CustomerID|ProductCategory|Quantity|PricePerUnit|TotalAmount|\n", - "+-------------+----------+----------+---------------+--------+------------+-----------+\n", - "| 1|2025-03-28| C123| Electronics| 2| 50.0| 100.0|\n", - "| 2|2025-03-28| C456| Clothing| 1| 25.0| 25.0|\n", - "| 3|2025-03-27| C789| Electronics| 1| 100.0| 100.0|\n", - "| 4|2025-03-27| C101| Clothing| 3| 15.0| 45.0|\n", - "| 5|2025-03-26| C123| Electronics| 1| 75.0| 75.0|\n", - "+-------------+----------+----------+---------------+--------+------------+-----------+\n", - "\n", - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "spark.sql(\"select * from lake.bronze.sales\").show()" - ] - }, - { - "cell_type": "markdown", - "id": "9cf8d18f-ee83-4787-9209-3c8156a901d4", - "metadata": { - "type": "markdown" - }, - "source": [ - "# Schema enforcement, validation and rules\n", - "\n", - "1. You can define define a schema with the names and datatypes of columns of the data to be validated. Below TransactionID is expected to be an integer.\n", - "2. Data quality can be checked by specifying some rules like not_null and min_value\n", - "3. The data can be written to the target using partitioning by specifying a partition column.\n", - "\n", - "```\n", - "data_sources:\n", - " - name: sales_data\n", - " path: \"oci://your_bucket/your_namespace/sales.csv\"\n", - " format: \"csv\"\n", - " options:\n", - " header: \"true\"\n", - " inferSchema: \"true\"\n", - " target_table: \"lake.bronze.salesv2\"\n", - " schema:\n", - " - \"TransactionID:integer\"\n", - " - \"Date:string\"\n", - " - \"CustomerID:string\"\n", - " - \"ProductCategory:string\"\n", - " - \"Quantity:integer\"\n", - " - \"PricePerUnit:double\"\n", - " - \"TotalAmount:double\"\n", - " partition_columns: [\"Date\"]\n", - " quality_checks:\n", - " not_null: [\"TransactionID\", \"CustomerID\", \"TotalAmount\"]\n", - " min_value: {\"TotalAmount\": 0.01}\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "700235f5-503d-4511-82d0-0f22559b41ae", - "metadata": { - "execution": { - "iopub.status.busy": "2025-03-31T18:54:54.235Z" - }, - "type": "python" - }, - "outputs": [], - "source": [ - "config = load_config(\"/Workspace/config/config_validation.yaml\")\n", - "\n", - "for source in config[\"data_sources\"]:\n", - " ingest_data(source)" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "f81a69ed-43a2-4405-a7dd-d1062446efe5", - "metadata": { - "execution": { - "iopub.status.busy": "2025-03-31T18:53:07.109Z" - }, - "type": "python" - }, - "outputs": [ - { - "data": { - "text/html": [ - "
+-------------+----------+----------+---------------+--------+------------+-----------+\n", - "|TransactionID| Date|CustomerID|ProductCategory|Quantity|PricePerUnit|TotalAmount|\n", - "+-------------+----------+----------+---------------+--------+------------+-----------+\n", - "| 5|2025-03-26| C123| Electronics| 1| 75.0| 75.0|\n", - "| 1|2025-03-28| C123| Electronics| 2| 50.0| 100.0|\n", - "| 2|2025-03-28| C456| Clothing| 1| 25.0| 25.0|\n", - "| 3|2025-03-27| C789| Electronics| 1| 100.0| 100.0|\n", - "| 4|2025-03-27| C101| Clothing| 3| 15.0| 45.0|\n", - "+-------------+----------+----------+---------------+--------+------------+-----------+\n", - "\n", - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "spark.sql(\"select * from lake.bronze.salesv2\").show()" - ] - }, - { - "cell_type": "markdown", - "id": "caa83e4e-24f5-4e6a-8c6f-77f0e450f08d", - "metadata": { - "type": "markdown" - }, - "source": [ - "# Data Preparation in SQL\n", - "\n", - "Data preparation in Spark SQL(generates temporary view with name {data_source_name}_stg\n", - "\n", - "In the example here, we query the sales_slice_data_stg temporary view on the data being ingested and ingest only the CustomerID column and multiple Quanity by 10 to create NewQuantity.\n", - "```\n", - "data_sources:\n", - " - name: sales_slice_data\n", - " path: \"oci://your_bucket/your_namespace/sales.csv\"\n", - " format: \"csv\"\n", - " options:\n", - " header: \"true\"\n", - " inferSchema: \"true\"\n", - " preprocessing_sql: |\n", - " SELECT \n", - " CustomerID, \n", - " Quantity*10 NewQuantity\n", - " FROM sales_slice_data_stg\n", - " target_table: \"lake.bronze.sales_slicev3\"\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "4811de69-0e87-4712-b73c-936c61994f6b", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-02T19:17:58.421Z" - }, - "type": "python" - }, - "outputs": [], - "source": [ - "config = load_config(\"/Workspace/config/config_sql_prep.yaml\")\n", - "\n", - "for source in config[\"data_sources\"]:\n", - " ingest_data(source)" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "7d4aad04-0c8a-4bf1-84ea-20a88cda2a04", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-02T19:18:11.174Z" - }, - "type": "python" - }, - "outputs": [ - { - "data": { - "text/html": [ - "
+----------+-----------+\n", - "|CustomerID|NewQuantity|\n", - "+----------+-----------+\n", - "| C123| 20|\n", - "| C456| 10|\n", - "| C789| 10|\n", - "| C101| 30|\n", - "| C123| 10|\n", - "+----------+-----------+\n", - "\n", - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "spark.sql(\"select * from lake.bronze.sales_slicev3\").show()" - ] - }, - { - "cell_type": "markdown", - "id": "7c1553da-6ee2-4a9d-a138-82b8570deb73", - "metadata": { - "type": "markdown" - }, - "source": [ - "# Ingest data from JDBC easily\n", - "\n", - "1. You can define define data sources to import in the YAML including files like above or JDBC.\n", - "2. The sources can be any JDBC source and also it can be a table name or a even query.\n", - "3. The target delta table should be specified as catalog.schema.table.\n", - "\n", - "```\n", - "data_sources:\n", - " - name: user_data\n", - " format: \"jdbc\"\n", - " options:\n", - " driver: \"org.sqlite.JDBC\"\n", - " user: \"sa\"\n", - " password: \"\"\n", - " fetchsize: \"100\"\n", - " dbtable: \"(SELECT 1 c1, 2 c2)\"\n", - " url: \"jdbc:sqlite:memory:myDb\"\n", - " target_table: \"lake.bronze.user_data\"\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "c5be2948-7808-4a5a-86ad-75da2313e4b1", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-05T16:34:14.794Z" - }, - "type": "python" - }, - "outputs": [], - "source": [ - "config = load_config(\"/Workspace/config/config_jdbc.yaml\")\n", - "\n", - "for source in config[\"data_sources\"]:\n", - " ingest_data(source)" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "cc18a5ad-706e-4122-877f-3d9ad2d5ab84", - "metadata": { - "execution": { - "iopub.status.busy": "2025-04-05T16:36:19.921Z" - }, - "type": "python" - }, - "outputs": [ - { - "data": { - "text/html": [ - "
+--------------------+--------------------+\n", - "| c1| c2|\n", - "+--------------------+--------------------+\n", - "|1.000000000000000000|2.000000000000000000|\n", - "+--------------------+--------------------+\n", - "\n", - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "\n", - "spark.sql(\"select * from lake.bronze.user_data\").show()\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.12" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -}