diff --git a/.github/workflows/track_dependencies.yml b/.github/workflows/track_dependencies.yml index 46a80702..ec28c3c1 100644 --- a/.github/workflows/track_dependencies.yml +++ b/.github/workflows/track_dependencies.yml @@ -4,7 +4,7 @@ on: push: branches: - main - + jobs: generate-sbom: runs-on: ubuntu-latest @@ -28,4 +28,3 @@ jobs: apikey: ${{ secrets.DEPENDENCY_TRACK_API_KEY }} project: '8d39a492-bf9e-49fa-a58c-b391ed4a1243' bomfilename: 'sbom.json' - diff --git a/MANIFEST.in b/MANIFEST.in index 1a445074..28eb28d5 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include README.md include LICENSE graft cosmotech/orchestrator_plugins -graft cosmotech/translation \ No newline at end of file +graft cosmotech/translation diff --git a/cosmotech/coal/azure/__init__.py b/cosmotech/coal/azure/__init__.py index 96b47c88..8bf15248 100644 --- a/cosmotech/coal/azure/__init__.py +++ b/cosmotech/coal/azure/__init__.py @@ -11,13 +11,13 @@ This module provides functions for interacting with Azure services like Storage and ADX. """ +# Re-export blob functions for easier importing +from cosmotech.coal.azure.blob import ( + dump_store_to_azure, +) + # Re-export storage functions for easier importing from cosmotech.coal.azure.storage import ( upload_file, upload_folder, ) - -# Re-export blob functions for easier importing -from cosmotech.coal.azure.blob import ( - dump_store_to_azure, -) diff --git a/cosmotech/coal/azure/adx/__init__.py b/cosmotech/coal/azure/adx/__init__.py index c8f5fbe7..98a68579 100644 --- a/cosmotech/coal/azure/adx/__init__.py +++ b/cosmotech/coal/azure/adx/__init__.py @@ -5,22 +5,36 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.coal.azure.adx.auth import create_kusto_client, create_ingest_client, initialize_clients -from cosmotech.coal.azure.adx.query import run_query, run_command_query +from cosmotech.coal.azure.adx.auth import ( + create_ingest_client, + create_kusto_client, + initialize_clients, +) from cosmotech.coal.azure.adx.ingestion import ( - ingest_dataframe, - send_to_adx, + IngestionStatus, check_ingestion_status, - monitor_ingestion, handle_failures, - IngestionStatus, + ingest_dataframe, + monitor_ingestion, + send_to_adx, ) -from cosmotech.coal.azure.adx.tables import table_exists, create_table, check_and_create_table, _drop_by_tag -from cosmotech.coal.azure.adx.utils import type_mapping, create_column_mapping -from cosmotech.coal.azure.adx.store import send_pyarrow_table_to_adx, send_table_data, process_tables, send_store_to_adx +from cosmotech.coal.azure.adx.query import run_command_query, run_query from cosmotech.coal.azure.adx.runner import ( - prepare_csv_content, construct_create_query, insert_csv_files, + prepare_csv_content, send_runner_data, ) +from cosmotech.coal.azure.adx.store import ( + process_tables, + send_pyarrow_table_to_adx, + send_store_to_adx, + send_table_data, +) +from cosmotech.coal.azure.adx.tables import ( + _drop_by_tag, + check_and_create_table, + create_table, + table_exists, +) +from cosmotech.coal.azure.adx.utils import create_column_mapping, type_mapping diff --git a/cosmotech/coal/azure/adx/ingestion.py b/cosmotech/coal/azure/adx/ingestion.py index 481f07e9..c9f96902 100644 --- a/cosmotech/coal/azure/adx/ingestion.py +++ b/cosmotech/coal/azure/adx/ingestion.py @@ -5,28 +5,24 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. +import os +import time from enum import Enum -from typing import Dict -from typing import Iterator -from typing import List -from typing import Optional -from typing import Tuple +from typing import Dict, Iterator, List, Optional, Tuple -import os import pandas as pd -import time import tqdm from azure.kusto.data import KustoClient from azure.kusto.data.data_format import DataFormat -from azure.kusto.ingest import IngestionProperties -from azure.kusto.ingest import QueuedIngestClient -from azure.kusto.ingest import ReportLevel -from azure.kusto.ingest.status import FailureMessage -from azure.kusto.ingest.status import KustoIngestStatusQueues -from azure.kusto.ingest.status import SuccessMessage +from azure.kusto.ingest import IngestionProperties, QueuedIngestClient, ReportLevel +from azure.kusto.ingest.status import ( + FailureMessage, + KustoIngestStatusQueues, + SuccessMessage, +) from cosmotech.orchestrator.utils.translate import T -from cosmotech.coal.azure.adx.tables import create_table, _drop_by_tag +from cosmotech.coal.azure.adx.tables import _drop_by_tag, create_table from cosmotech.coal.azure.adx.utils import type_mapping from cosmotech.coal.utils.logger import LOGGER diff --git a/cosmotech/coal/azure/adx/query.py b/cosmotech/coal/azure/adx/query.py index 50850a7d..0916f17a 100644 --- a/cosmotech/coal/azure/adx/query.py +++ b/cosmotech/coal/azure/adx/query.py @@ -7,9 +7,9 @@ from azure.kusto.data import KustoClient from azure.kusto.data.response import KustoResponseDataSet +from cosmotech.orchestrator.utils.translate import T from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T def run_query(client: KustoClient, database: str, query: str) -> KustoResponseDataSet: diff --git a/cosmotech/coal/azure/adx/utils.py b/cosmotech/coal/azure/adx/utils.py index 9481e81c..a68d2771 100644 --- a/cosmotech/coal/azure/adx/utils.py +++ b/cosmotech/coal/azure/adx/utils.py @@ -5,13 +5,13 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import dateutil.parser from typing import Any, Dict +import dateutil.parser import pyarrow +from cosmotech.orchestrator.utils.translate import T from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T def create_column_mapping(data: pyarrow.Table) -> Dict[str, str]: diff --git a/cosmotech/coal/azure/blob.py b/cosmotech/coal/azure/blob.py index 902db7c3..1bada51a 100644 --- a/cosmotech/coal/azure/blob.py +++ b/cosmotech/coal/azure/blob.py @@ -21,6 +21,7 @@ from cosmotech.orchestrator.utils.translate import T from cosmotech.coal.store.store import Store +from cosmotech.coal.utils.configuration import Configuration from cosmotech.coal.utils.logger import LOGGER VALID_TYPES = ( @@ -31,42 +32,35 @@ def dump_store_to_azure( - store_folder: str, - account_name: str, - container_name: str, - tenant_id: str, - client_id: str, - client_secret: str, - output_type: str = "sqlite", - file_prefix: str = "", + configuration: Configuration = Configuration(), selected_tables: list[str] = [], ) -> None: """ Dump Store data to Azure Blob Storage. Args: - store_folder: Folder containing the Store - account_name: Azure Storage account name - container_name: Azure Storage container name - tenant_id: Azure tenant ID - client_id: Azure client ID - client_secret: Azure client secret - output_type: Output file type (sqlite, csv, or parquet) - file_prefix: Prefix for uploaded files + configuration: Configuration utils class + selected_tables: List of tables name Raises: ValueError: If the output type is invalid """ - _s = Store(store_location=store_folder) + _s = Store(configuration=configuration) + output_type = configuration.safe_get("azure.output_type", default="sqlite") + file_prefix = configuration.safe_get("azure.file_prefix", default="") if output_type not in VALID_TYPES: LOGGER.error(T("coal.common.validation.invalid_output_type").format(output_type=output_type)) raise ValueError(T("coal.common.validation.invalid_output_type").format(output_type=output_type)) container_client = BlobServiceClient( - account_url=f"https://{account_name}.blob.core.windows.net/", - credential=ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret), - ).get_container_client(container_name) + account_url=f"https://{configuration.azure.account_name}.blob.core.windows.net/", + credential=ClientSecretCredential( + tenant_id=configuration.azure.tenant_id, + client_id=configuration.azure.client_id, + client_secret=configuration.azure.client_secret, + ), + ).get_container_client(configuration.azure.container_name) def data_upload(data_stream: BytesIO, file_name: str): uploaded_file_name = file_prefix + file_name diff --git a/cosmotech/coal/postgresql/runner.py b/cosmotech/coal/postgresql/runner.py index 12d58dc9..15d74e9f 100644 --- a/cosmotech/coal/postgresql/runner.py +++ b/cosmotech/coal/postgresql/runner.py @@ -61,7 +61,7 @@ def send_runner_metadata_to_postgresql( LOGGER.info(T("coal.services.postgresql.metadata")) sql_upsert = f""" INSERT INTO {schema_table} (id, name, last_csm_run_id, run_template_id) - VALUES(%s, %s, %s, %s) + VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, last_csm_run_id = EXCLUDED.last_csm_run_id; diff --git a/cosmotech/coal/postgresql/store.py b/cosmotech/coal/postgresql/store.py index 5ad4b557..14167ec9 100644 --- a/cosmotech/coal/postgresql/store.py +++ b/cosmotech/coal/postgresql/store.py @@ -53,24 +53,27 @@ def dump_store_to_postgresql( selected_tables: list of tables to send fk_id: foreign key id to add to all table on all rows """ - _c = Configuration() - _c.postgres.host = postgres_host - _c.postgres.port = postgres_port - _c.postgres.db_name = postgres_db - _c.postgres.db_schema = postgres_schema - _c.postgres.user_name = postgres_user - _c.postgres.user_password = postgres_password - _c.postgres.password_encoding = force_encode - _c.postgres.table_prefix = table_prefix - - dump_store_to_postgresql_from_conf( - configuration=_c, store_folder=store_folder, replace=replace, selected_tables=selected_tables, fk_id=fk_id + _c = Configuration( + { + "coal": {"store": store_folder}, + "postgres": { + "host": postgres_host, + "port": postgres_port, + "db_name": postgres_db, + "db_schema": postgres_schema, + "user_name": postgres_user, + "user_password": postgres_password, + "password_encoding": force_encode, + "table_prefix": table_prefix, + }, + } ) + dump_store_to_postgresql_from_conf(configuration=_c, replace=replace, selected_tables=selected_tables, fk_id=fk_id) + def dump_store_to_postgresql_from_conf( configuration: Configuration, - store_folder: str, replace: bool = True, selected_tables: list[str] = [], fk_id: str = None, @@ -80,13 +83,12 @@ def dump_store_to_postgresql_from_conf( Args: configuration: coal Configuration - store_folder: Folder containing the Store replace: Whether to replace existing tables selected_tables: list of tables to send fk_id: foreign key id to add to all table on all rows """ _psql = PostgresUtils(configuration) - _s = Store(store_location=store_folder) + _s = Store(configuration=configuration) tables = list(_s.list_tables()) if selected_tables: @@ -104,7 +106,7 @@ def dump_store_to_postgresql_from_conf( f""" ALTER TABLE {table_name} ADD csm_run_id TEXT NOT NULL - DEFAULT ('{fk_id}) + DEFAULT ('{fk_id}') """ ) data = _s.get_table(table_name) diff --git a/cosmotech/coal/postgresql/utils.py b/cosmotech/coal/postgresql/utils.py index 95a1aa7b..62fed80c 100644 --- a/cosmotech/coal/postgresql/utils.py +++ b/cosmotech/coal/postgresql/utils.py @@ -75,7 +75,8 @@ def full_uri(self) -> str: f"/{self.db_name}" ) - def metadata_table_name(self): + @property + def metadata_table_name(self) -> str: return f"{self.table_prefix}RunnerMetadata" def get_postgresql_table_schema(self, target_table_name: str) -> Optional[pa.Schema]: diff --git a/cosmotech/coal/singlestore/store.py b/cosmotech/coal/singlestore/store.py index 57876637..4d234350 100644 --- a/cosmotech/coal/singlestore/store.py +++ b/cosmotech/coal/singlestore/store.py @@ -12,15 +12,16 @@ for store operations. """ +import csv import pathlib import time -import csv + import singlestoredb as s2 +from cosmotech.orchestrator.utils.translate import T from cosmotech.coal.store.csv import store_csv_file from cosmotech.coal.store.store import Store from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T def _get_data(table_name: str, output_directory: str, cursor) -> None: diff --git a/cosmotech/coal/store/__init__.py b/cosmotech/coal/store/__init__.py index b5e6bf92..64b02a79 100644 --- a/cosmotech/coal/store/__init__.py +++ b/cosmotech/coal/store/__init__.py @@ -12,31 +12,34 @@ including loading and converting data. """ -# Re-export the Store class -from cosmotech.coal.store.store import Store - # Re-export functions from the csv module from cosmotech.coal.store.csv import ( - store_csv_file, convert_store_table_to_csv, + store_csv_file, ) # Re-export functions from the native_python module from cosmotech.coal.store.native_python import ( - store_pylist, convert_table_as_pylist, + store_pylist, ) - -# Re-export functions from the pandas module (if available) - from cosmotech.coal.store.pandas import ( - store_dataframe, convert_store_table_to_dataframe as convert_store_table_to_pandas_dataframe, ) - -# Re-export functions from the pyarrow module (if available) - +from cosmotech.coal.store.pandas import ( + store_dataframe, +) from cosmotech.coal.store.pyarrow import ( - store_table, convert_store_table_to_dataframe as convert_store_table_to_pyarrow_table, ) +from cosmotech.coal.store.pyarrow import ( + store_table, +) + +# Re-export the Store class +from cosmotech.coal.store.store import Store + +# Re-export functions from the pandas module (if available) + + +# Re-export functions from the pyarrow module (if available) diff --git a/cosmotech/coal/store/output/aws_channel.py b/cosmotech/coal/store/output/aws_channel.py index d05707ae..c5a24428 100644 --- a/cosmotech/coal/store/output/aws_channel.py +++ b/cosmotech/coal/store/output/aws_channel.py @@ -6,7 +6,10 @@ from cosmotech.orchestrator.utils.translate import T from cosmotech.coal.aws import S3 -from cosmotech.coal.store.output.channel_interface import ChannelInterface +from cosmotech.coal.store.output.channel_interface import ( + ChannelInterface, + MissingChannelConfigError, +) from cosmotech.coal.store.store import Store from cosmotech.coal.utils.configuration import Configuration, Dotdict from cosmotech.coal.utils.logger import LOGGER @@ -14,20 +17,18 @@ class AwsChannel(ChannelInterface): required_keys = { - "cosmotech": [ - "dataset_absolute_path", - ], + "coal": ["store"], "s3": ["access_key_id", "endpoint_url", "secret_access_key"], } requirement_string = required_keys def __init__(self, dct: Dotdict = None): - self.configuration = Configuration(dct) + super().__init__(dct) self._s3 = S3(self.configuration) def send(self, filter: Optional[list[str]] = None) -> bool: - _s = Store(store_location=self.configuration.cosmotech.parameters_absolute_path) + _s = Store(configuration=self.configuration) if self._s3.output_type not in ("sqlite", "csv", "parquet"): LOGGER.error(T("coal.common.errors.data_invalid_output_type").format(output_type=self._s3.output_type)) diff --git a/cosmotech/coal/store/output/az_storage_channel.py b/cosmotech/coal/store/output/az_storage_channel.py index 1cd467c0..a51eec8d 100644 --- a/cosmotech/coal/store/output/az_storage_channel.py +++ b/cosmotech/coal/store/output/az_storage_channel.py @@ -1,15 +1,16 @@ from typing import Optional from cosmotech.coal.azure.blob import dump_store_to_azure -from cosmotech.coal.store.output.channel_interface import ChannelInterface +from cosmotech.coal.store.output.channel_interface import ( + ChannelInterface, + MissingChannelConfigError, +) from cosmotech.coal.utils.configuration import Configuration, Dotdict class AzureStorageChannel(ChannelInterface): required_keys = { - "cosmotech": [ - "dataset_absolute_path", - ], + "coal": ["store"], "azure": [ "account_name", "container_name", @@ -22,19 +23,9 @@ class AzureStorageChannel(ChannelInterface): } requirement_string = required_keys - def __init__(self, dct: Dotdict = None): - self.configuration = Configuration(dct) - def send(self, filter: Optional[list[str]] = None) -> bool: dump_store_to_azure( - store_folder=self.configuration.cosmotech.dataset_absolute_path, - account_name=self.configuration.azure.account_name, - container_name=self.configuration.azure.container_name, - tenant_id=self.configuration.azure.tenant_id, - client_id=self.configuration.azure.client_id, - client_secret=self.configuration.azure.client_secret, - output_type=self.configuration.azure.output_type, - file_prefix=self.configuration.azure.file_prefix, + self.configuration, selected_tables=filter, ) diff --git a/cosmotech/coal/store/output/channel_interface.py b/cosmotech/coal/store/output/channel_interface.py index d7609683..53a86491 100644 --- a/cosmotech/coal/store/output/channel_interface.py +++ b/cosmotech/coal/store/output/channel_interface.py @@ -2,11 +2,18 @@ from cosmotech.orchestrator.utils.translate import T +from cosmotech.coal.utils.configuration import Configuration, Dotdict + class ChannelInterface: required_keys = {} requirement_string: str = T("coal.store.output.data_interface.requirements") + def __init__(self, dct: Dotdict = None): + self.configuration = Configuration(dct) + if not self.is_available(): + raise MissingChannelConfigError(self) + def send(self, filter: Optional[list[str]] = None) -> bool: raise NotImplementedError() @@ -21,3 +28,11 @@ def is_available(self) -> bool: ) except KeyError: return False + + +class MissingChannelConfigError(Exception): + def __init__(self, interface_class): + self.message = T("coal.store.output.split.requirements").format( + interface_name=interface_class.__class__.__name__, requirements=interface_class.requirement_string + ) + super().__init__(self.message) diff --git a/cosmotech/coal/store/output/channel_spliter.py b/cosmotech/coal/store/output/channel_spliter.py index 9916bfcd..671173ed 100644 --- a/cosmotech/coal/store/output/channel_spliter.py +++ b/cosmotech/coal/store/output/channel_spliter.py @@ -6,7 +6,7 @@ from cosmotech.coal.store.output.az_storage_channel import AzureStorageChannel from cosmotech.coal.store.output.channel_interface import ChannelInterface from cosmotech.coal.store.output.postgres_channel import PostgresChannel -from cosmotech.coal.utils.configuration import Configuration +from cosmotech.coal.utils.configuration import Dotdict from cosmotech.coal.utils.logger import LOGGER @@ -19,9 +19,11 @@ class ChannelSpliter(ChannelInterface): "postgres": PostgresChannel, } - def __init__(self, configuration: Configuration): - self.configuration = configuration + def __init__(self, dct: Dotdict = None): + super().__init__(dct) self.targets = [] + if "outputs" not in self.configuration: + raise AttributeError(T("coal.store.output.split.no_targets")) for output in self.configuration.outputs: channel = self.available_interfaces[output.type] _i = channel(output.conf) diff --git a/cosmotech/coal/store/output/postgres_channel.py b/cosmotech/coal/store/output/postgres_channel.py index 8cc6accc..e1f850fc 100644 --- a/cosmotech/coal/store/output/postgres_channel.py +++ b/cosmotech/coal/store/output/postgres_channel.py @@ -11,10 +11,11 @@ class PostgresChannel(ChannelInterface): required_keys = { - "cosmotech": ["dataset_absolute_path", "organization_id", "workspace_id", "runner_id"], + "coal": ["store"], + "cosmotech": ["organization_id", "workspace_id", "runner_id"], "postgres": [ "host", - "post", + "port", "db_name", "db_schema", "user_name", @@ -23,14 +24,10 @@ class PostgresChannel(ChannelInterface): } requirement_string = required_keys - def __init__(self, dct: Dotdict = None): - self.configuration = Configuration(dct) - def send(self, filter: Optional[list[str]] = None) -> bool: run_id = send_runner_metadata_to_postgresql(self.configuration) dump_store_to_postgresql_from_conf( - self.configuration, - store_folder=self.configuration.cosmotech.dataset_absolute_path, + configuration=self.configuration, selected_tables=filter, fk_id=run_id, ) diff --git a/cosmotech/coal/store/pandas.py b/cosmotech/coal/store/pandas.py index 31773ad1..96dfc3bd 100644 --- a/cosmotech/coal/store/pandas.py +++ b/cosmotech/coal/store/pandas.py @@ -5,10 +5,10 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. +import pandas as pd import pyarrow from cosmotech.coal.store.store import Store -import pandas as pd def store_dataframe( diff --git a/cosmotech/coal/store/pyarrow.py b/cosmotech/coal/store/pyarrow.py index 68440ba2..d6385b75 100644 --- a/cosmotech/coal/store/pyarrow.py +++ b/cosmotech/coal/store/pyarrow.py @@ -5,10 +5,10 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.coal.store.store import Store - import pyarrow as pa +from cosmotech.coal.store.store import Store + def store_table( table_name: str, diff --git a/cosmotech/coal/store/store.py b/cosmotech/coal/store/store.py index 36bd656b..86015d88 100644 --- a/cosmotech/coal/store/store.py +++ b/cosmotech/coal/store/store.py @@ -5,14 +5,14 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os import pathlib import pyarrow from adbc_driver_sqlite import dbapi +from cosmotech.orchestrator.utils.translate import T +from cosmotech.coal.utils.configuration import Configuration from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T class Store: @@ -20,11 +20,8 @@ class Store: def sanitize_column(column_name: str) -> str: return column_name.replace(" ", "_") - def __init__( - self, - reset=False, - store_location: pathlib.Path = pathlib.Path(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH", ".")), - ): + def __init__(self, reset=False, configuration: Configuration = Configuration()): + store_location = configuration.safe_get("coal.store", ".") self.store_location = pathlib.Path(store_location) / ".coal/store" self.store_location.mkdir(parents=True, exist_ok=True) self._tables = dict() diff --git a/cosmotech/coal/utils/configuration.py b/cosmotech/coal/utils/configuration.py index 3e627772..496be5ef 100644 --- a/cosmotech/coal/utils/configuration.py +++ b/cosmotech/coal/utils/configuration.py @@ -6,6 +6,10 @@ from cosmotech.coal.utils.logger import LOGGER +class ReferenceKeyError(KeyError): + pass + + class Dotdict(dict): """dot.notation access to dictionary attributes""" @@ -17,9 +21,13 @@ def __getattr__(self, key): _v = self.__getitem__(key) if isinstance(_v, str) and _v.startswith("$"): _r = self.root - for _p in _v[1:].split("."): - _r = _r[_p] - return _r + try: + for _p in _v[1:].split("."): + _r = _r.__getattr__(_p) + return _r + except (KeyError, AttributeError): + LOGGER.warning("dotdict Ref {_v} doesn't exist") + raise ReferenceKeyError(_v) return _v __delattr__ = dict.__delitem__ @@ -35,48 +43,56 @@ def update(data): return data for k, v in dct.items(): - self[k] = update(v) + self.__setattr__(k, update(v)) def merge(self, d): for k, v in d.items(): if isinstance(v, Dotdict) and k in self: self[k].merge(v) else: - self[k] = v + # this trigger dict to dotdict conversion at merge + self.__setattr__(k, v) class Configuration(Dotdict): - - # HARD CODED ENVVAR CONVERSION - CONVERSION_DICT = { - "outputs": [ - { - "type": "postgres", - "conf": { - "cosmotech": { - "dataset_absolute_path": "$cosmotech.dataset_absolute_path", - "organization_id": "$cosmotech.organization_id", - "workspace_id": "$cosmotech.workspace_id", - "runner_id": "$cosmotech.runner_id", - }, - "postgres": { - "host": "$postgres.host", - "post": "$postgres.post", - "db_name": "$postgres.db_name", - "db_schema": "$postgres.db_schema", - "user_name": "$postgres.user_name", - "user_password": "$postgres.user_password", - }, + # Env var set by the cosmotech api at runtime + API_ENV_DICT = { + "secrets": { + "cosmotech": { + "twin_cache": { + "host": "TWIN_CACHE_HOST", + "port": "TWIN_CACHE_PORT", + "password": "TWIN_CACHE_PASSWORD", + "username": "TWIN_CACHE_USERNAME", + }, + "idp": { + "base_url": "IDP_BASE_URL", + "tenant_id": "IDP_TENANT_ID", + "client_id": "IDP_CLIENT_ID", + "client_secret": "IDP_CLIENT_SECRET", }, + "api": {"url": "CSM_API_URL", "scope": "CSM_API_SCOPE"}, + "dataset_absolute_path": "CSM_DATASET_ABSOLUTE_PATH", + "parameters_absolute_path": "CSM_PARAMETERS_ABSOLUTE_PATH", + "organization_id": "CSM_ORGANIZATION_ID", + "workspace_id": "CSM_WORKSPACE_ID", + "runner_id": "CSM_RUNNER_ID", + "run_id": "CSM_RUN_ID", + "run_template_id": "CSM_RUN_TEMPLATE_ID", } - ], + } + } + # HARD CODED ENVVAR CONVERSION + CONVERSION_DICT = { "secrets": { "log_level": "LOG_LEVEL", "s3": { "access_key_id": "AWS_ACCESS_KEY_ID", "endpoint_url": "AWS_ENDPOINT_URL", "secret_access_key": "AWS_SECRET_ACCESS_KEY", + "bucket_name": "CSM_DATA_BUCKET_NAME", "bucket_prefix": "CSM_DATA_BUCKET_PREFIX", + "ca_bundle": "CSM_S3_CA_BUNDLE", }, "azure": { "account_name": "AZURE_ACCOUNT_NAME", @@ -87,30 +103,16 @@ class Configuration(Dotdict): "data_explorer_resource_ingest_uri": "AZURE_DATA_EXPLORER_RESOURCE_INGEST_URI", "data_explorer_resource_uri": "AZURE_DATA_EXPLORER_RESOURCE_URI", "storage_blob_name": "AZURE_STORAGE_BLOB_NAME", + "data_blob_prefix": "CSM_DATA_BLOB_PREFIX", + "data_prefix": "CSM_DATA_PREFIX", "storage_sas_url": "AZURE_STORAGE_SAS_URL", "tenant_id": "AZURE_TENANT_ID", }, "cosmotech": { - "api_url": "CSM_API_URL", - "container_mode": "CSM_CONTAINER_MODE", "data_adx_tag": "CSM_DATA_ADX_TAG", "data_adx_wait_ingestion": "CSM_DATA_ADX_WAIT_INGESTION", - "data_blob_prefix": "CSM_DATA_BLOB_PREFIX", - "data_bucket_name": "CSM_DATA_BUCKET_NAME", - "data_prefix": "CSM_DATA_PREFIX", - "dataset_absolute_path": "CSM_DATASET_ABSOLUTE_PATH", - "organization_id": "CSM_ORGANIZATION_ID", - "parameters_absolute_path": "CSM_PARAMETERS_ABSOLUTE_PATH", - "run_id": "CSM_RUN_ID", - "runner_id": "CSM_RUNNER_ID", - "run_template_id": "CSM_RUN_TEMPLATE_ID", - "s3_ca_bundle": "CSM_S3_CA_BUNDLE", - "scenario_id": "CSM_SCENARIO_ID", "send_datawarehouse_datasets": "CSM_SEND_DATAWAREHOUSE_DATASETS", "send_datawarehouse_parameters": "CSM_SEND_DATAWAREHOUSE_PARAMETERS", - "workspace_id": "CSM_WORKSPACE_ID", - "fetch_dataset": "FETCH_DATASET", - "fetch_datasets_in_parallel": "FETCH_DATASETS_IN_PARALLEL", }, "postgres": { "db_name": "POSTGRES_DB_NAME", @@ -132,38 +134,64 @@ class Configuration(Dotdict): }, } + # HARD CODED configmap mount path set in K8s simulation pod by API run function + K8S_CONFIG = "/mnt/coal/coal-config.toml" + def __init__(self, dct: dict = None): - if dct: + if isinstance(dct, Dotdict): + super().__init__(dct, root=dct.root) + elif isinstance(dct, dict): super().__init__(dct) elif config_path := os.environ.get("CONFIG_FILE_PATH", default=None): - with open(config_path, "rb") as f: - super().__init__(tomllib.load(f)) + with open(config_path, "rb") as config: + super().__init__(tomllib.load(config)) + elif os.path.isfile(self.K8S_CONFIG): + with open(self.K8S_CONFIG, "rb") as config: + super().__init__(tomllib.load(config)) else: LOGGER.info(T("coal.utils.configuration.no_config_file")) super().__init__(self.CONVERSION_DICT) + # add envvar set by the API + self.merge(Dotdict(self.API_ENV_DICT)) + if "secrets" in self: self.secrets = self._env_swap_recusion(self.secrets) # set secret section back to respective section self.merge(self.secrets) del self.secrets + # add coal.store default value if ont define + if self.safe_get("coal.store") is None: + self.merge({"coal": {"store": "$cosmotech.parameters_absolute_path"}}) + # convert value to env def _env_swap_recusion(self, dic): for k, v in dic.items(): if isinstance(v, Dotdict): dic[k] = self._env_swap_recusion(v) - # remove value not found dic[k] = {k: v for k, v in dic[k].items() if v is not None} elif isinstance(v, list): dic[k] = list(self._env_swap_recusion(_v) for _v in v) elif isinstance(v, str): dic[k] = os.environ.get(v) + # remove value not found + dic = {k: v for k, v in dic.items() if v} return dic def merge_in(self, dic): trans_dic = self._env_swap_recusion(dic) - self._merge(trans_dic) + self.merge(trans_dic) + + def safe_get(self, key, default=None): + try: + _r = self + for _k in key.split("."): + _r = _r.__getattr__(_k) + return _r + except (KeyError, AttributeError) as err: + LOGGER.warning(err) + return default ENVIRONMENT_CONFIGURATION = Configuration() diff --git a/cosmotech/csm_data/commands/adx_send_data.py b/cosmotech/csm_data/commands/adx_send_data.py index a1a7ad7a..d5ddda8f 100644 --- a/cosmotech/csm_data/commands/adx_send_data.py +++ b/cosmotech/csm_data/commands/adx_send_data.py @@ -7,7 +7,7 @@ from cosmotech.orchestrator.utils.translate import T from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help +from cosmotech.csm_data.utils.decorators import translate_help, web_help @click.command() diff --git a/cosmotech/csm_data/commands/adx_send_runnerdata.py b/cosmotech/csm_data/commands/adx_send_runnerdata.py index 77abb9a4..4d985dc6 100644 --- a/cosmotech/csm_data/commands/adx_send_runnerdata.py +++ b/cosmotech/csm_data/commands/adx_send_runnerdata.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + @click.command() @web_help("csm-data/adx-send-runner-data") diff --git a/cosmotech/csm_data/commands/az_storage_upload.py b/cosmotech/csm_data/commands/az_storage_upload.py index 7d06d20e..60e5d3cb 100644 --- a/cosmotech/csm_data/commands/az_storage_upload.py +++ b/cosmotech/csm_data/commands/az_storage_upload.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + @click.command() @click.option( diff --git a/cosmotech/csm_data/commands/store/dump_to_azure.py b/cosmotech/csm_data/commands/store/dump_to_azure.py index c6e06632..b086d837 100644 --- a/cosmotech/csm_data/commands/store/dump_to_azure.py +++ b/cosmotech/csm_data/commands/store/dump_to_azure.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + VALID_TYPES = ( "sqlite", "csv", diff --git a/cosmotech/csm_data/commands/store/dump_to_postgresql.py b/cosmotech/csm_data/commands/store/dump_to_postgresql.py index 88ea603a..b0dd321a 100644 --- a/cosmotech/csm_data/commands/store/dump_to_postgresql.py +++ b/cosmotech/csm_data/commands/store/dump_to_postgresql.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + @click.command() @web_help("csm-data/store/dump-to-postgres") diff --git a/cosmotech/csm_data/commands/store/list_tables.py b/cosmotech/csm_data/commands/store/list_tables.py index 3049bdc9..902de700 100644 --- a/cosmotech/csm_data/commands/store/list_tables.py +++ b/cosmotech/csm_data/commands/store/list_tables.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + @click.command() @web_help("csm-data/store/list-tables") diff --git a/cosmotech/csm_data/commands/store/load_csv_folder.py b/cosmotech/csm_data/commands/store/load_csv_folder.py index aca47287..593b9c73 100644 --- a/cosmotech/csm_data/commands/store/load_csv_folder.py +++ b/cosmotech/csm_data/commands/store/load_csv_folder.py @@ -4,10 +4,10 @@ # Any use, reproduction, translation, broadcasting, transmission, distribution, # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. +from cosmotech.orchestrator.utils.translate import T from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help -from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.decorators import translate_help, web_help @click.command() @@ -34,10 +34,16 @@ def load_csv_folder(store_folder, csv_folder): # Import the modules and functions at the start of the command import pathlib + from cosmotech.coal.store.csv import store_csv_file from cosmotech.coal.store.store import Store + from cosmotech.coal.utils.configuration import Configuration from cosmotech.coal.utils.logger import LOGGER + _conf = Configuration() + + _conf.coal.store = store_folder + for csv_path in pathlib.Path(csv_folder).glob("*.csv"): LOGGER.info(T("coal.services.azure_storage.found_file").format(file=csv_path.name)) - store_csv_file(csv_path.name[:-4], csv_path, store=Store(False, store_folder)) + store_csv_file(csv_path.name[:-4], csv_path, store=Store(False, _conf)) diff --git a/cosmotech/csm_data/commands/store/load_from_singlestore.py b/cosmotech/csm_data/commands/store/load_from_singlestore.py index 289a4dbf..958309ed 100644 --- a/cosmotech/csm_data/commands/store/load_from_singlestore.py +++ b/cosmotech/csm_data/commands/store/load_from_singlestore.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + @click.command() @web_help("csm-data/store/load-from-singlestore") diff --git a/cosmotech/csm_data/commands/store/reset.py b/cosmotech/csm_data/commands/store/reset.py index d6808d71..a1fbe732 100644 --- a/cosmotech/csm_data/commands/store/reset.py +++ b/cosmotech/csm_data/commands/store/reset.py @@ -4,10 +4,10 @@ # Any use, reproduction, translation, broadcasting, transmission, distribution, # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. +from cosmotech.orchestrator.utils.translate import T from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help -from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.decorators import translate_help, web_help @click.command() @@ -25,7 +25,12 @@ def reset(store_folder): # Import the modules and functions at the start of the command from cosmotech.coal.store.store import Store + from cosmotech.coal.utils.configuration import Configuration from cosmotech.coal.utils.logger import LOGGER - Store(True, store_folder) + _conf = Configuration() + + _conf.coal.store = store_folder + + Store(True, _conf) LOGGER.info(T("coal.services.database.store_reset").format(folder=store_folder)) diff --git a/cosmotech/csm_data/main.py b/cosmotech/csm_data/main.py index 147580bb..27d931bd 100644 --- a/cosmotech/csm_data/main.py +++ b/cosmotech/csm_data/main.py @@ -5,20 +5,20 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. import click_log +from cosmotech.orchestrator.utils.translate import T from cosmotech.coal import __version__ +from cosmotech.coal.utils.logger import LOGGER +from cosmotech.csm_data.commands.adx_send_data import adx_send_data from cosmotech.csm_data.commands.adx_send_runnerdata import adx_send_runnerdata from cosmotech.csm_data.commands.api.api import api from cosmotech.csm_data.commands.az_storage_upload import az_storage_upload +from cosmotech.csm_data.commands.s3_bucket_delete import s3_bucket_delete from cosmotech.csm_data.commands.s3_bucket_download import s3_bucket_download from cosmotech.csm_data.commands.s3_bucket_upload import s3_bucket_upload -from cosmotech.csm_data.commands.s3_bucket_delete import s3_bucket_delete -from cosmotech.csm_data.commands.adx_send_data import adx_send_data from cosmotech.csm_data.commands.store.store import store from cosmotech.csm_data.utils.click import click from cosmotech.csm_data.utils.decorators import translate_help, web_help -from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T def print_version(ctx, param, value): diff --git a/cosmotech/csm_data/utils/decorators.py b/cosmotech/csm_data/utils/decorators.py index 49a95fa5..b16c74b6 100644 --- a/cosmotech/csm_data/utils/decorators.py +++ b/cosmotech/csm_data/utils/decorators.py @@ -9,10 +9,11 @@ import webbrowser from functools import wraps +from cosmotech.orchestrator.utils.translate import T + from cosmotech.coal.utils import WEB_DOCUMENTATION_ROOT -from cosmotech.csm_data.utils.click import click from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click def translate_help(translation_key): diff --git a/docs/javascript/tablesort.js b/docs/javascript/tablesort.js index ee04e900..98717aed 100644 --- a/docs/javascript/tablesort.js +++ b/docs/javascript/tablesort.js @@ -3,4 +3,4 @@ document$.subscribe(function () { tables.forEach(function (table) { new Tablesort(table) }) -}) \ No newline at end of file +}) diff --git a/docs/overrides/partials/copyright.html b/docs/overrides/partials/copyright.html index 18c5ac5b..7a0a14ba 100644 --- a/docs/overrides/partials/copyright.html +++ b/docs/overrides/partials/copyright.html @@ -11,4 +11,4 @@

Change cookie settings - \ No newline at end of file + diff --git a/docs/scripts/generate_command_helps.py b/docs/scripts/generate_command_helps.py index fb949941..2cd8084c 100644 --- a/docs/scripts/generate_command_helps.py +++ b/docs/scripts/generate_command_helps.py @@ -7,9 +7,9 @@ import contextlib import io +import os import pathlib import re -import os import click diff --git a/docs/scripts/generate_references.py b/docs/scripts/generate_references.py index 5bd8a3b1..16531870 100644 --- a/docs/scripts/generate_references.py +++ b/docs/scripts/generate_references.py @@ -1,8 +1,7 @@ import os import mkdocs_gen_files -from griffe import Alias -from griffe import Module +from griffe import Alias, Module pyhand = mkdocs_gen_files.config["plugins"]["mkdocstrings"].handlers.get_handler("python") module_name = "cosmotech.coal" diff --git a/docs/scripts/generic_ref.md.template b/docs/scripts/generic_ref.md.template index 21d0f4cb..a4c2cbd6 100644 --- a/docs/scripts/generic_ref.md.template +++ b/docs/scripts/generic_ref.md.template @@ -3,4 +3,3 @@ show_root_heading: true show_root_full_path: false show_source: true - diff --git a/docs/stylesheets/templates.css b/docs/stylesheets/templates.css index 5a3aab6b..727ced81 100644 --- a/docs/stylesheets/templates.css +++ b/docs/stylesheets/templates.css @@ -32,4 +32,4 @@ article > .tpl_table { .hidden { visibility: hidden; display: none; -} \ No newline at end of file +} diff --git a/docs/tutorials/cosmotech-api.md b/docs/tutorials/cosmotech-api.md index dfa5d22b..f5935e0c 100644 --- a/docs/tutorials/cosmotech-api.md +++ b/docs/tutorials/cosmotech-api.md @@ -31,7 +31,7 @@ The API integration is organized into several modules, each focused on specific !!! info "API vs CLI" While the `csm-data` CLI provides command-line tools for many common operations, the direct API integration offers more flexibility and programmatic control. Use the API integration when you need to: - + - Build custom workflows - Integrate with other Python code - Perform complex operations not covered by the CLI @@ -108,10 +108,10 @@ The `download_workspace_file` function downloads a file from the workspace to a ```python downloaded_file = download_workspace_file( - api_client, - organization_id, - workspace_id, - file_to_download, + api_client, + organization_id, + workspace_id, + file_to_download, target_directory ) ``` @@ -139,7 +139,7 @@ The `workspace_destination` parameter can be: !!! tip "Workspace Paths" When working with workspace paths: - + - Use forward slashes (`/`) regardless of your operating system - End directory paths with a trailing slash (`/`) - Use relative paths from the workspace root @@ -244,10 +244,10 @@ This function: !!! tip "Dataset References" Runners can reference datasets in two ways: - + - Through parameters with the `%DATASETID%` variable type - Through the `dataset_list` property - + The `download_runner_data` function handles both types of references. ## Complete Workflow Example @@ -268,7 +268,7 @@ This workflow: !!! tip "Real-world Workflows" In real-world scenarios, you might: - + - Use more complex data transformations - Integrate with external systems - Implement error handling and retries diff --git a/docs/tutorials/datastore.md b/docs/tutorials/datastore.md index 11cab217..9135e688 100644 --- a/docs/tutorials/datastore.md +++ b/docs/tutorials/datastore.md @@ -93,7 +93,7 @@ The datastore provides specialized adapters for working with various data format ### Joining multiple tables -```python title="Joining tables in the datastore" linenums="1" +```python title="Joining tables in the datastore" linenums="1" --8<-- 'tutorial/datastore/joining_tables.py' ``` diff --git a/find_untested_functions.py b/find_untested_functions.py index 7aa854cf..46341441 100644 --- a/find_untested_functions.py +++ b/find_untested_functions.py @@ -3,11 +3,11 @@ Script to find functions in cosmotech/coal/ that don't have corresponding tests. """ -import os import ast +import os import re -from pathlib import Path from collections import defaultdict +from pathlib import Path def get_functions_from_file(file_path): diff --git a/generate_test_files.py b/generate_test_files.py index f9783849..38ac0b64 100755 --- a/generate_test_files.py +++ b/generate_test_files.py @@ -6,12 +6,12 @@ corresponding tests and generates test files for them based on a template. """ -import os +import argparse import ast +import os import re -from pathlib import Path from collections import defaultdict -import argparse +from pathlib import Path def get_functions_from_file(file_path): @@ -268,7 +268,6 @@ def generate_test_file(module_path, functions, overwrite=False): except SyntaxError: print(f"Warning: Could not parse existing test file {test_file}") # If we can't parse the file, we'll just append our new tests to it - pass # If the file exists and we're not overwriting, check if we need to add any tests if test_file.exists() and not overwrite: diff --git a/tests/unit/coal/test_aws/test_aws_s3.py b/tests/unit/coal/test_aws/test_aws_s3.py index 80f5244a..51d7cd0b 100644 --- a/tests/unit/coal/test_aws/test_aws_s3.py +++ b/tests/unit/coal/test_aws/test_aws_s3.py @@ -17,13 +17,18 @@ @pytest.fixture def base_configuration(): - _c = Configuration() - _c.s3.use_ssl = True - _c.s3.endpoint_url = "https://s3.example.com" - _c.s3.access_key_id = "test-access-id" - _c.s3.secret_access_key = "test-secret-key" - _c.s3.bucket_name = "test-bucket" - _c.s3.bucket_prefix = "prefix/" + _c = Configuration( + { + "s3": { + "use_ssl": True, + "endpoint_url": "https://s3.example.com", + "access_key_id": "test-access-id", + "secret_access_key": "test-secret-key", + "bucket_name": "test-bucket", + "bucket_prefix": "prefix/", + } + } + ) return _c diff --git a/tests/unit/coal/test_azure/test_adx/test_adx_auth.py b/tests/unit/coal/test_azure/test_adx/test_adx_auth.py index 79774b6d..971c9f69 100644 --- a/tests/unit/coal/test_azure/test_adx/test_adx_auth.py +++ b/tests/unit/coal/test_azure/test_adx/test_adx_auth.py @@ -5,14 +5,17 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os -import pytest from unittest.mock import MagicMock, patch +import pytest from azure.kusto.data import KustoClient, KustoConnectionStringBuilder from azure.kusto.ingest import QueuedIngestClient -from cosmotech.coal.azure.adx.auth import create_kusto_client, create_ingest_client, get_cluster_urls +from cosmotech.coal.azure.adx.auth import ( + create_ingest_client, + create_kusto_client, + get_cluster_urls, +) class TestAuthFunctions: diff --git a/tests/unit/coal/test_azure/test_adx/test_adx_ingestion.py b/tests/unit/coal/test_azure/test_adx/test_adx_ingestion.py index 7bd38725..41a7c094 100644 --- a/tests/unit/coal/test_azure/test_adx/test_adx_ingestion.py +++ b/tests/unit/coal/test_azure/test_adx/test_adx_ingestion.py @@ -6,22 +6,26 @@ # specifically authorized by written means by Cosmo Tech. import time -import pytest -import pandas as pd -from unittest.mock import MagicMock, patch, call +from unittest.mock import MagicMock, patch +import pandas as pd +import pytest from azure.kusto.data import KustoClient -from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, ReportLevel -from azure.kusto.ingest.status import KustoIngestStatusQueues, SuccessMessage, FailureMessage +from azure.kusto.ingest import IngestionProperties, QueuedIngestClient, ReportLevel +from azure.kusto.ingest.status import ( + FailureMessage, + KustoIngestStatusQueues, + SuccessMessage, +) from cosmotech.coal.azure.adx.ingestion import ( - ingest_dataframe, - send_to_adx, - check_ingestion_status, - clear_ingestion_status_queues, IngestionStatus, _ingest_status, _ingest_times, + check_ingestion_status, + clear_ingestion_status_queues, + ingest_dataframe, + send_to_adx, ) @@ -103,7 +107,7 @@ def test_ingest_dataframe_no_drop_by_tag(self, mock_ingest_client, mock_datafram mock_ingest_client.ingest_from_dataframe.return_value = mock_ingestion_result # Act - result = ingest_dataframe(mock_ingest_client, database, table_name, mock_dataframe) + ingest_dataframe(mock_ingest_client, database, table_name, mock_dataframe) # Assert mock_ingest_client.ingest_from_dataframe.assert_called_once() diff --git a/tests/unit/coal/test_azure/test_adx/test_adx_ingestion_edge_cases.py b/tests/unit/coal/test_azure/test_adx/test_adx_ingestion_edge_cases.py index 4614f21b..5076b16f 100644 --- a/tests/unit/coal/test_azure/test_adx/test_adx_ingestion_edge_cases.py +++ b/tests/unit/coal/test_azure/test_adx/test_adx_ingestion_edge_cases.py @@ -6,18 +6,18 @@ # specifically authorized by written means by Cosmo Tech. import time -import pytest from unittest.mock import MagicMock, patch +import pytest from azure.kusto.ingest import QueuedIngestClient from azure.kusto.ingest.status import KustoIngestStatusQueues - from cosmotech.orchestrator.utils.translate import T + from cosmotech.coal.azure.adx.ingestion import ( - check_ingestion_status, IngestionStatus, _ingest_status, _ingest_times, + check_ingestion_status, ) diff --git a/tests/unit/coal/test_azure/test_adx/test_adx_query.py b/tests/unit/coal/test_azure/test_adx/test_adx_query.py index 3c0ca74f..5e4017df 100644 --- a/tests/unit/coal/test_azure/test_adx/test_adx_query.py +++ b/tests/unit/coal/test_azure/test_adx/test_adx_query.py @@ -5,13 +5,13 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock +import pytest from azure.kusto.data import KustoClient from azure.kusto.data.response import KustoResponseDataSet -from cosmotech.coal.azure.adx.query import run_query, run_command_query +from cosmotech.coal.azure.adx.query import run_command_query, run_query class TestQueryFunctions: diff --git a/tests/unit/coal/test_azure/test_adx/test_adx_runner.py b/tests/unit/coal/test_azure/test_adx/test_adx_runner.py index 46cdc8e2..c2b95b96 100644 --- a/tests/unit/coal/test_azure/test_adx/test_adx_runner.py +++ b/tests/unit/coal/test_azure/test_adx/test_adx_runner.py @@ -5,22 +5,20 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os -import tempfile -import pytest -from unittest.mock import MagicMock, patch, call, mock_open +from unittest.mock import MagicMock, call, patch +import pytest +from azure.kusto.data import KustoClient from azure.kusto.data.response import KustoResponseDataSet +from azure.kusto.ingest import QueuedIngestClient +from cosmotech.coal.azure.adx.ingestion import IngestionStatus from cosmotech.coal.azure.adx.runner import ( - prepare_csv_content, construct_create_query, insert_csv_files, + prepare_csv_content, send_runner_data, ) -from azure.kusto.data import KustoClient -from azure.kusto.ingest import QueuedIngestClient -from cosmotech.coal.azure.adx.ingestion import IngestionStatus class TestRunnerFunctions: diff --git a/tests/unit/coal/test_azure/test_azure_blob.py b/tests/unit/coal/test_azure/test_azure_blob.py index 59d0e677..0c2a09e2 100644 --- a/tests/unit/coal/test_azure/test_azure_blob.py +++ b/tests/unit/coal/test_azure/test_azure_blob.py @@ -6,32 +6,41 @@ # specifically authorized by written means by Cosmo Tech. import io -import pytest -from unittest.mock import MagicMock, patch, mock_open +from unittest.mock import MagicMock, mock_open, patch import pyarrow as pa -import pyarrow.csv as pc -import pyarrow.parquet as pq +import pytest from azure.identity import ClientSecretCredential from azure.storage.blob import BlobServiceClient, ContainerClient -from cosmotech.coal.azure.blob import dump_store_to_azure, VALID_TYPES +from cosmotech.coal.azure.blob import dump_store_to_azure from cosmotech.coal.store.store import Store +from cosmotech.coal.utils.configuration import Configuration + + +@pytest.fixture +def base_azure_blob_config(): + return Configuration( + { + "coal": {"store": "path/to/store"}, + "azure": { + "account_name": "teststorageaccount", + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "container_name": "testcontainer", + "tenant_id": "test-tenant-id", + }, + } + ) class TestBlobFunctions: """Tests for top-level functions in the blob module.""" - def test_dump_store_to_azure_invalid_output_type(self): + def test_dump_store_to_azure_invalid_output_type(self, base_azure_blob_config): """Test the dump_store_to_azure function with an invalid output type.""" # Arrange - store_folder = "/path/to/store" - account_name = "teststorageaccount" - container_name = "testcontainer" - tenant_id = "test-tenant-id" - client_id = "test-client-id" - client_secret = "test-client-secret" - output_type = "invalid_type" # Not in VALID_TYPES + base_azure_blob_config.azure.output_type = "invalid_type" # Not in VALID_TYPES # Mock Store mock_store = MagicMock(spec=Store) @@ -39,27 +48,13 @@ def test_dump_store_to_azure_invalid_output_type(self): with patch("cosmotech.coal.azure.blob.Store", return_value=mock_store): # Act & Assert with pytest.raises(ValueError, match="Invalid output type"): - dump_store_to_azure( - store_folder=store_folder, - account_name=account_name, - container_name=container_name, - tenant_id=tenant_id, - client_id=client_id, - client_secret=client_secret, - output_type=output_type, - ) - - def test_dump_store_to_azure_sqlite(self): + dump_store_to_azure(configuration=base_azure_blob_config) + + def test_dump_store_to_azure_sqlite(self, base_azure_blob_config): """Test the dump_store_to_azure function with SQLite output type.""" # Arrange - store_folder = "/path/to/store" - account_name = "teststorageaccount" - container_name = "testcontainer" - tenant_id = "test-tenant-id" - client_id = "test-client-id" - client_secret = "test-client-secret" - output_type = "sqlite" - file_prefix = "prefix_" + base_azure_blob_config.azure.output_type = "sqlite" + base_azure_blob_config.azure.file_prefix = "prefix_" # Mock Store mock_store = MagicMock(spec=Store) @@ -83,19 +78,12 @@ def test_dump_store_to_azure_sqlite(self): patch("builtins.open", mock_open(read_data=mock_file_data)), ): # Act - dump_store_to_azure( - store_folder=store_folder, - account_name=account_name, - container_name=container_name, - tenant_id=tenant_id, - client_id=client_id, - client_secret=client_secret, - output_type=output_type, - file_prefix=file_prefix, - ) + dump_store_to_azure(configuration=base_azure_blob_config) # Assert - mock_blob_service_client.get_container_client.assert_called_once_with(container_name) + mock_blob_service_client.get_container_client.assert_called_once_with( + base_azure_blob_config.azure.container_name + ) mock_container_client.upload_blob.assert_called_once() # Check the call arguments without comparing the exact mock object @@ -104,17 +92,11 @@ def test_dump_store_to_azure_sqlite(self): assert call_args.kwargs["overwrite"] is True # We don't check the exact data object since it's a mock and the identity might differ - def test_dump_store_to_azure_csv(self): + def test_dump_store_to_azure_csv(self, base_azure_blob_config): """Test the dump_store_to_azure function with CSV output type.""" # Arrange - store_folder = "/path/to/store" - account_name = "teststorageaccount" - container_name = "testcontainer" - tenant_id = "test-tenant-id" - client_id = "test-client-id" - client_secret = "test-client-secret" - output_type = "csv" - file_prefix = "prefix_" + base_azure_blob_config.azure.output_type = "csv" + base_azure_blob_config.azure.file_prefix = "prefix_" # Mock Store mock_store = MagicMock(spec=Store) @@ -155,19 +137,12 @@ def get_table_side_effect(table_name): patch("pyarrow.csv.write_csv") as mock_write_csv, ): # Act - dump_store_to_azure( - store_folder=store_folder, - account_name=account_name, - container_name=container_name, - tenant_id=tenant_id, - client_id=client_id, - client_secret=client_secret, - output_type=output_type, - file_prefix=file_prefix, - ) + dump_store_to_azure(configuration=base_azure_blob_config) # Assert - mock_blob_service_client.get_container_client.assert_called_once_with(container_name) + mock_blob_service_client.get_container_client.assert_called_once_with( + base_azure_blob_config.azure.container_name + ) assert mock_container_client.upload_blob.call_count == 2 # Only for non-empty tables mock_container_client.upload_blob.assert_any_call( name="prefix_table1.csv", data=mock_bytesio, length=len(b"csv data"), overwrite=True @@ -179,17 +154,11 @@ def get_table_side_effect(table_name): mock_write_csv.assert_any_call(table1, mock_bytesio) mock_write_csv.assert_any_call(table2, mock_bytesio) - def test_dump_store_to_azure_parquet(self): + def test_dump_store_to_azure_parquet(self, base_azure_blob_config): """Test the dump_store_to_azure function with Parquet output type.""" # Arrange - store_folder = "/path/to/store" - account_name = "teststorageaccount" - container_name = "testcontainer" - tenant_id = "test-tenant-id" - client_id = "test-client-id" - client_secret = "test-client-secret" - output_type = "parquet" - file_prefix = "prefix_" + base_azure_blob_config.azure.output_type = "parquet" + base_azure_blob_config.azure.file_prefix = "prefix_" # Mock Store mock_store = MagicMock(spec=Store) @@ -230,19 +199,12 @@ def get_table_side_effect(table_name): patch("pyarrow.parquet.write_table") as mock_write_table, ): # Act - dump_store_to_azure( - store_folder=store_folder, - account_name=account_name, - container_name=container_name, - tenant_id=tenant_id, - client_id=client_id, - client_secret=client_secret, - output_type=output_type, - file_prefix=file_prefix, - ) + dump_store_to_azure(configuration=base_azure_blob_config) # Assert - mock_blob_service_client.get_container_client.assert_called_once_with(container_name) + mock_blob_service_client.get_container_client.assert_called_once_with( + base_azure_blob_config.azure.container_name + ) assert mock_container_client.upload_blob.call_count == 2 # Only for non-empty tables mock_container_client.upload_blob.assert_any_call( name="prefix_table1.parquet", data=mock_bytesio, length=len(b"parquet data"), overwrite=True @@ -254,16 +216,10 @@ def get_table_side_effect(table_name): mock_write_table.assert_any_call(table1, mock_bytesio) mock_write_table.assert_any_call(table2, mock_bytesio) - def test_dump_store_to_azure_empty_tables(self): + def test_dump_store_to_azure_empty_tables(self, base_azure_blob_config): """Test the dump_store_to_azure function with empty tables.""" # Arrange - store_folder = "/path/to/store" - account_name = "teststorageaccount" - container_name = "testcontainer" - tenant_id = "test-tenant-id" - client_id = "test-client-id" - client_secret = "test-client-secret" - output_type = "csv" + base_azure_blob_config.azure.output_type = "csv" # Mock Store with only empty tables mock_store = MagicMock(spec=Store) @@ -285,21 +241,14 @@ def test_dump_store_to_azure_empty_tables(self): patch("cosmotech.coal.azure.blob.Store", return_value=mock_store), patch("cosmotech.coal.azure.blob.BlobServiceClient", return_value=mock_blob_service_client), patch("cosmotech.coal.azure.blob.ClientSecretCredential", return_value=mock_credential), - patch("cosmotech.coal.azure.blob.BytesIO") as mock_bytesio, patch("pyarrow.csv.write_csv") as mock_write_csv, ): # Act - dump_store_to_azure( - store_folder=store_folder, - account_name=account_name, - container_name=container_name, - tenant_id=tenant_id, - client_id=client_id, - client_secret=client_secret, - output_type=output_type, - ) + dump_store_to_azure(configuration=base_azure_blob_config) # Assert - mock_blob_service_client.get_container_client.assert_called_once_with(container_name) + mock_blob_service_client.get_container_client.assert_called_once_with( + base_azure_blob_config.azure.container_name + ) mock_container_client.upload_blob.assert_not_called() # No uploads for empty tables mock_write_csv.assert_not_called() # No writes for empty tables diff --git a/tests/unit/coal/test_azure/test_azure_storage.py b/tests/unit/coal/test_azure/test_azure_storage.py index d67c8921..a2186dd8 100644 --- a/tests/unit/coal/test_azure/test_azure_storage.py +++ b/tests/unit/coal/test_azure/test_azure_storage.py @@ -5,10 +5,10 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os import pathlib +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import MagicMock, patch, mock_open from cosmotech.coal.azure.storage import upload_file, upload_folder diff --git a/tests/unit/coal/test_csm/test_csm_engine.py b/tests/unit/coal/test_csm/test_csm_engine.py index 1489fe54..0f6d4ba0 100644 --- a/tests/unit/coal/test_csm/test_csm_engine.py +++ b/tests/unit/coal/test_csm/test_csm_engine.py @@ -5,11 +5,9 @@ # etc. to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os -import glob import json -import tempfile -from unittest.mock import MagicMock, patch, mock_open +import os +from unittest.mock import MagicMock, mock_open, patch import pytest @@ -36,7 +34,6 @@ def test_apply_simple_csv_parameter_to_simulator(self, mock_file, mock_glob, moc # Create mock entities mock_entity1 = MagicMock() mock_entity2 = MagicMock() - mock_entity_not_found = None # Configure model to return entities def find_entity_by_name(name): diff --git a/tests/unit/coal/test_csm/test_engine/__init__.py b/tests/unit/coal/test_csm/test_engine/__init__.py index 1489fe54..0f6d4ba0 100644 --- a/tests/unit/coal/test_csm/test_engine/__init__.py +++ b/tests/unit/coal/test_csm/test_engine/__init__.py @@ -5,11 +5,9 @@ # etc. to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os -import glob import json -import tempfile -from unittest.mock import MagicMock, patch, mock_open +import os +from unittest.mock import MagicMock, mock_open, patch import pytest @@ -36,7 +34,6 @@ def test_apply_simple_csv_parameter_to_simulator(self, mock_file, mock_glob, moc # Create mock entities mock_entity1 = MagicMock() mock_entity2 = MagicMock() - mock_entity_not_found = None # Configure model to return entities def find_entity_by_name(name): diff --git a/tests/unit/coal/test_postgresql/test_postgresql_store.py b/tests/unit/coal/test_postgresql/test_postgresql_store.py index f6dfa10c..6b548eec 100644 --- a/tests/unit/coal/test_postgresql/test_postgresql_store.py +++ b/tests/unit/coal/test_postgresql/test_postgresql_store.py @@ -10,7 +10,7 @@ import pyarrow as pa from cosmotech.coal.postgresql.store import dump_store_to_postgresql -from cosmotech.coal.postgresql.utils import Configuration, PostgresUtils +from cosmotech.coal.postgresql.utils import Configuration class TestStoreFunctions: @@ -45,6 +45,24 @@ def test_dump_store_to_postgresql_with_tables(self, mock_send_to_postgresql, moc table_prefix = "Test_" replace = True + _config = Configuration( + { + "coal": { + "store": store_folder, + }, + "postgres": { + "host": postgres_host, + "port": postgres_port, + "db_name": postgres_db, + "db_schema": postgres_schema, + "user_name": postgres_user, + "user_password": postgres_password, + "password_encoding": False, + "table_prefix": table_prefix, + }, + } + ) + # Act dump_store_to_postgresql( store_folder, @@ -60,7 +78,7 @@ def test_dump_store_to_postgresql_with_tables(self, mock_send_to_postgresql, moc # Assert # Check that Store was initialized with the correct parameters - mock_store_class.assert_called_once_with(store_location=store_folder) + mock_store_class.assert_called_once_with(configuration=_config) # Check that list_tables was called mock_store_instance.list_tables.assert_called_once() @@ -106,6 +124,24 @@ def test_dump_store_to_postgresql_empty_store(self, mock_send_to_postgresql, moc postgres_user = "user" postgres_password = "password" + _config = Configuration( + { + "coal": { + "store": store_folder, + }, + "postgres": { + "host": postgres_host, + "port": postgres_port, + "db_name": postgres_db, + "db_schema": postgres_schema, + "user_name": postgres_user, + "user_password": postgres_password, + "password_encoding": False, + "table_prefix": "Cosmotech_", + }, + } + ) + # Act dump_store_to_postgresql( store_folder, postgres_host, postgres_port, postgres_db, postgres_schema, postgres_user, postgres_password @@ -113,7 +149,7 @@ def test_dump_store_to_postgresql_empty_store(self, mock_send_to_postgresql, moc # Assert # Check that Store was initialized with the correct parameters - mock_store_class.assert_called_once_with(store_location=store_folder) + mock_store_class.assert_called_once_with(configuration=_config) # Check that list_tables was called mock_store_instance.list_tables.assert_called_once() @@ -150,6 +186,24 @@ def test_dump_store_to_postgresql_empty_table(self, mock_send_to_postgresql, moc postgres_password = "password" table_prefix = "Test_" + _config = Configuration( + { + "coal": { + "store": store_folder, + }, + "postgres": { + "host": postgres_host, + "port": postgres_port, + "db_name": postgres_db, + "db_schema": postgres_schema, + "user_name": postgres_user, + "user_password": postgres_password, + "password_encoding": False, + "table_prefix": table_prefix, + }, + } + ) + # Act dump_store_to_postgresql( store_folder, @@ -164,7 +218,7 @@ def test_dump_store_to_postgresql_empty_table(self, mock_send_to_postgresql, moc # Assert # Check that Store was initialized with the correct parameters - mock_store_class.assert_called_once_with(store_location=store_folder) + mock_store_class.assert_called_once_with(configuration=_config) # Check that list_tables was called mock_store_instance.list_tables.assert_called_once() diff --git a/tests/unit/coal/test_postgresql/test_postgresql_utils.py b/tests/unit/coal/test_postgresql/test_postgresql_utils.py index 2ee3d5a5..2c3eafc0 100644 --- a/tests/unit/coal/test_postgresql/test_postgresql_utils.py +++ b/tests/unit/coal/test_postgresql/test_postgresql_utils.py @@ -19,14 +19,19 @@ @pytest.fixture def base_configuration(): - _c = Configuration() - _c.postgres.host = "localhost" - _c.postgres.port = "5432" - _c.postgres.db_name = "testdb" - _c.postgres.db_schema = "dbschema" - _c.postgres.user_name = "user" - _c.postgres.user_password = "password" - _c.postgres.password_encoding = False + _c = Configuration( + { + "postgres": { + "host": "localhost", + "port": "5432", + "db_name": "testdb", + "db_schema": "dbschema", + "user_name": "user", + "user_password": "password", + "password_encoding": False, + } + } + ) return _c diff --git a/tests/unit/coal/test_singlestore/test_singlestore_store.py b/tests/unit/coal/test_singlestore/test_singlestore_store.py index d3063445..4fe5e6b1 100644 --- a/tests/unit/coal/test_singlestore/test_singlestore_store.py +++ b/tests/unit/coal/test_singlestore/test_singlestore_store.py @@ -5,16 +5,12 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import csv -import os -import pathlib -import tempfile -from unittest.mock import MagicMock, patch, mock_open, call +from unittest.mock import MagicMock, mock_open, patch import pytest import singlestoredb as s2 -from cosmotech.coal.singlestore.store import load_from_singlestore, _get_data +from cosmotech.coal.singlestore.store import _get_data, load_from_singlestore class TestStoreFunctions: diff --git a/tests/unit/coal/test_store/test_output/test_aws_channel.py b/tests/unit/coal/test_store/test_output/test_aws_channel.py index 005c1f56..33e83a1f 100644 --- a/tests/unit/coal/test_store/test_output/test_aws_channel.py +++ b/tests/unit/coal/test_store/test_output/test_aws_channel.py @@ -13,20 +13,28 @@ from cosmotech.coal.store.output.aws_channel import AwsChannel +@pytest.fixture +def base_aws_config(): + return { + "coal": {"store": "$cosmotech.parameters_absolute_path"}, + "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, + "s3": { + "access_key_id": "test_key", + "endpoint_url": "http://test.url", + "secret_access_key": "test_secret", + "bucket_prefix": "prefix/", + }, + } + + class TestAwsChannel: """Tests for the AwsChannel class.""" - def test_init_with_configuration(self): + def test_init_with_configuration(self, base_aws_config): """Test AwsChannel initialization with configuration.""" - # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": {"access_key_id": "test_key", "endpoint_url": "http://test.url", "secret_access_key": "test_secret"}, - } - # Act with patch("cosmotech.coal.store.output.aws_channel.S3"): - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Assert assert channel.configuration is not None @@ -34,28 +42,18 @@ def test_init_with_configuration(self): def test_required_keys(self): """Test that required_keys are properly defined.""" # Assert - assert "cosmotech" in AwsChannel.required_keys + assert "coal" in AwsChannel.required_keys assert "s3" in AwsChannel.required_keys - assert "dataset_absolute_path" in AwsChannel.required_keys["cosmotech"] + assert "store" in AwsChannel.required_keys["coal"] assert "access_key_id" in AwsChannel.required_keys["s3"] assert "endpoint_url" in AwsChannel.required_keys["s3"] assert "secret_access_key" in AwsChannel.required_keys["s3"] @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") - def test_send_sqlite(self, mock_s3_class, mock_store_class): + def test_send_sqlite(self, mock_s3_class, mock_store_class, base_aws_config): """Test sending data as SQLite database.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": { - "access_key_id": "test_key", - "endpoint_url": "http://test.url", - "secret_access_key": "test_secret", - "bucket_prefix": "prefix/", - }, - } - mock_s3 = MagicMock() mock_s3.output_type = "sqlite" mock_s3_class.return_value = mock_s3 @@ -64,7 +62,7 @@ def test_send_sqlite(self, mock_s3_class, mock_store_class): mock_store._database_path = "/path/to/db.sqlite" mock_store_class.return_value = mock_store - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act channel.send() @@ -75,19 +73,9 @@ def test_send_sqlite(self, mock_s3_class, mock_store_class): @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") @patch("cosmotech.coal.store.output.aws_channel.pc.write_csv") - def test_send_csv(self, mock_write_csv, mock_s3_class, mock_store_class): + def test_send_csv(self, mock_write_csv, mock_s3_class, mock_store_class, base_aws_config): """Test sending data as CSV files.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": { - "access_key_id": "test_key", - "endpoint_url": "http://test.url", - "secret_access_key": "test_secret", - "bucket_prefix": "prefix/", - }, - } - mock_s3 = MagicMock() mock_s3.output_type = "csv" mock_s3_class.return_value = mock_s3 @@ -98,7 +86,7 @@ def test_send_csv(self, mock_write_csv, mock_s3_class, mock_store_class): mock_store.get_table.return_value = mock_table mock_store_class.return_value = mock_store - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act channel.send() @@ -110,19 +98,9 @@ def test_send_csv(self, mock_write_csv, mock_s3_class, mock_store_class): @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") @patch("cosmotech.coal.store.output.aws_channel.pq.write_table") - def test_send_parquet(self, mock_write_parquet, mock_s3_class, mock_store_class): + def test_send_parquet(self, mock_write_parquet, mock_s3_class, mock_store_class, base_aws_config): """Test sending data as Parquet files.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": { - "access_key_id": "test_key", - "endpoint_url": "http://test.url", - "secret_access_key": "test_secret", - "bucket_prefix": "prefix/", - }, - } - mock_s3 = MagicMock() mock_s3.output_type = "parquet" mock_s3_class.return_value = mock_s3 @@ -133,7 +111,7 @@ def test_send_parquet(self, mock_write_parquet, mock_s3_class, mock_store_class) mock_store.get_table.return_value = mock_table mock_store_class.return_value = mock_store - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act channel.send() @@ -144,19 +122,9 @@ def test_send_parquet(self, mock_write_parquet, mock_s3_class, mock_store_class) @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") - def test_send_with_tables_filter(self, mock_s3_class, mock_store_class): + def test_send_with_tables_filter(self, mock_s3_class, mock_store_class, base_aws_config): """Test sending data with table filter.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": { - "access_key_id": "test_key", - "endpoint_url": "http://test.url", - "secret_access_key": "test_secret", - "bucket_prefix": "prefix/", - }, - } - mock_s3 = MagicMock() mock_s3.output_type = "csv" mock_s3_class.return_value = mock_s3 @@ -167,7 +135,7 @@ def test_send_with_tables_filter(self, mock_s3_class, mock_store_class): mock_store.get_table.return_value = mock_table mock_store_class.return_value = mock_store - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act channel.send(filter=["table1", "table3"]) @@ -180,19 +148,9 @@ def test_send_with_tables_filter(self, mock_s3_class, mock_store_class): @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") - def test_send_empty_table(self, mock_s3_class, mock_store_class): + def test_send_empty_table(self, mock_s3_class, mock_store_class, base_aws_config): """Test sending data with empty table.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": { - "access_key_id": "test_key", - "endpoint_url": "http://test.url", - "secret_access_key": "test_secret", - "bucket_prefix": "prefix/", - }, - } - mock_s3 = MagicMock() mock_s3.output_type = "csv" mock_s3_class.return_value = mock_s3 @@ -204,7 +162,7 @@ def test_send_empty_table(self, mock_s3_class, mock_store_class): mock_store.get_table.return_value = mock_table mock_store_class.return_value = mock_store - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act channel.send() @@ -215,14 +173,9 @@ def test_send_empty_table(self, mock_s3_class, mock_store_class): @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") - def test_send_invalid_output_type(self, mock_s3_class, mock_store_class): + def test_send_invalid_output_type(self, mock_s3_class, mock_store_class, base_aws_config): """Test sending data with invalid output type.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": {"access_key_id": "test_key", "endpoint_url": "http://test.url", "secret_access_key": "test_secret"}, - } - mock_s3 = MagicMock() mock_s3.output_type = "invalid_type" mock_s3_class.return_value = mock_s3 @@ -230,25 +183,20 @@ def test_send_invalid_output_type(self, mock_s3_class, mock_store_class): mock_store = MagicMock() mock_store_class.return_value = mock_store - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act & Assert with pytest.raises(ValueError): channel.send() @patch("cosmotech.coal.store.output.aws_channel.S3") - def test_delete(self, mock_s3_class): + def test_delete(self, mock_s3_class, base_aws_config): """Test delete method.""" # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, - "s3": {"access_key_id": "test_key", "endpoint_url": "http://test.url", "secret_access_key": "test_secret"}, - } - mock_s3 = MagicMock() mock_s3_class.return_value = mock_s3 - channel = AwsChannel(config) + channel = AwsChannel(base_aws_config) # Act channel.delete() diff --git a/tests/unit/coal/test_store/test_output/test_az_storage_channel.py b/tests/unit/coal/test_store/test_output/test_az_storage_channel.py index 24b06cdf..fad986d2 100644 --- a/tests/unit/coal/test_store/test_output/test_az_storage_channel.py +++ b/tests/unit/coal/test_store/test_output/test_az_storage_channel.py @@ -10,16 +10,15 @@ import pytest from cosmotech.coal.store.output.az_storage_channel import AzureStorageChannel +from cosmotech.coal.utils.configuration import Configuration -class TestAzureStorageChannel: - """Tests for the AzureStorageChannel class.""" - - def test_init_with_configuration(self): - """Test AzureStorageChannel initialization with configuration.""" - # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset"}, +@pytest.fixture +def base_azure_storage_config(): + return Configuration( + { + "coal": {"store": "$cosmotech.parameters_absolute_path"}, + "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, "azure": { "account_name": "test_account", "container_name": "test_container", @@ -30,9 +29,16 @@ def test_init_with_configuration(self): "file_prefix": "prefix_", }, } + ) + +class TestAzureStorageChannel: + """Tests for the AzureStorageChannel class.""" + + def test_init_with_configuration(self, base_azure_storage_config): + """Test AzureStorageChannel initialization with configuration.""" # Act - channel = AzureStorageChannel(config) + channel = AzureStorageChannel(base_azure_storage_config) # Assert assert channel.configuration is not None @@ -40,9 +46,9 @@ def test_init_with_configuration(self): def test_required_keys(self): """Test that required_keys are properly defined.""" # Assert - assert "cosmotech" in AzureStorageChannel.required_keys + assert "coal" in AzureStorageChannel.required_keys + assert "store" in AzureStorageChannel.required_keys["coal"] assert "azure" in AzureStorageChannel.required_keys - assert "dataset_absolute_path" in AzureStorageChannel.required_keys["cosmotech"] assert "account_name" in AzureStorageChannel.required_keys["azure"] assert "container_name" in AzureStorageChannel.required_keys["azure"] assert "tenant_id" in AzureStorageChannel.required_keys["azure"] @@ -52,58 +58,25 @@ def test_required_keys(self): assert "file_prefix" in AzureStorageChannel.required_keys["azure"] @patch("cosmotech.coal.store.output.az_storage_channel.dump_store_to_azure") - def test_send_without_filter(self, mock_dump): + @patch("cosmotech.coal.azure.blob.Store") + @patch("cosmotech.coal.azure.blob.ClientSecretCredential") + def test_send_without_filter(self, mock_client_secret, mock_store, mock_dump, base_azure_storage_config): """Test sending data without table filter.""" - # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset"}, - "azure": { - "account_name": "test_account", - "container_name": "test_container", - "tenant_id": "test_tenant", - "client_id": "test_client", - "client_secret": "test_secret", - "output_type": "csv", - "file_prefix": "prefix_", - }, - } - - channel = AzureStorageChannel(config) + channel = AzureStorageChannel(base_azure_storage_config) # Act channel.send() # Assert - mock_dump.assert_called_once_with( - store_folder="/path/to/dataset", - account_name="test_account", - container_name="test_container", - tenant_id="test_tenant", - client_id="test_client", - client_secret="test_secret", - output_type="csv", - file_prefix="prefix_", - selected_tables=None, - ) + mock_dump.assert_called_once_with(base_azure_storage_config, selected_tables=None) @patch("cosmotech.coal.store.output.az_storage_channel.dump_store_to_azure") - def test_send_with_filter(self, mock_dump): + @patch("cosmotech.coal.azure.blob.Store") + @patch("cosmotech.coal.azure.blob.ClientSecretCredential") + def test_send_with_filter(self, mock_client_secret, mock_store, mock_dump, base_azure_storage_config): """Test sending data with table filter.""" - # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset"}, - "azure": { - "account_name": "test_account", - "container_name": "test_container", - "tenant_id": "test_tenant", - "client_id": "test_client", - "client_secret": "test_secret", - "output_type": "parquet", - "file_prefix": "data_", - }, - } - channel = AzureStorageChannel(config) + channel = AzureStorageChannel(base_azure_storage_config) tables_filter = ["table1", "table2"] # Act @@ -111,34 +84,13 @@ def test_send_with_filter(self, mock_dump): # Assert mock_dump.assert_called_once_with( - store_folder="/path/to/dataset", - account_name="test_account", - container_name="test_container", - tenant_id="test_tenant", - client_id="test_client", - client_secret="test_secret", - output_type="parquet", - file_prefix="data_", + base_azure_storage_config, selected_tables=["table1", "table2"], ) - def test_delete(self): + def test_delete(self, base_azure_storage_config): """Test delete method (should do nothing).""" - # Arrange - config = { - "cosmotech": {"dataset_absolute_path": "/path/to/dataset"}, - "azure": { - "account_name": "test_account", - "container_name": "test_container", - "tenant_id": "test_tenant", - "client_id": "test_client", - "client_secret": "test_secret", - "output_type": "csv", - "file_prefix": "prefix_", - }, - } - - channel = AzureStorageChannel(config) + channel = AzureStorageChannel(base_azure_storage_config) # Act result = channel.delete() diff --git a/tests/unit/coal/test_store/test_output/test_channel_spliter.py b/tests/unit/coal/test_store/test_output/test_channel_spliter.py index fd607b14..e22755c3 100644 --- a/tests/unit/coal/test_store/test_output/test_channel_spliter.py +++ b/tests/unit/coal/test_store/test_output/test_channel_spliter.py @@ -10,6 +10,7 @@ import pytest from cosmotech.coal.store.output.channel_spliter import ChannelSpliter +from cosmotech.coal.utils.configuration import Dotdict class TestChannelSpliter: @@ -18,11 +19,10 @@ class TestChannelSpliter: def test_init_with_single_target(self): """Test ChannelSpliter initialization with a single valid target.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) # Mock the channel class in the available_interfaces dict mock_channel_class = MagicMock() @@ -45,8 +45,6 @@ def test_init_with_single_target(self): def test_init_with_multiple_targets(self, mock_aws, mock_azure, mock_postgres): """Test ChannelSpliter initialization with multiple valid targets.""" # Arrange - mock_config = MagicMock() - mock_output1 = MagicMock() mock_output1.type = "s3" mock_output1.conf = {"s3": "config"} @@ -59,7 +57,7 @@ def test_init_with_multiple_targets(self, mock_aws, mock_azure, mock_postgres): mock_output3.type = "postgres" mock_output3.conf = {"postgres": "config"} - mock_config.outputs = [mock_output1, mock_output2, mock_output3] + mock_config = Dotdict({"outputs": [mock_output1, mock_output2, mock_output3]}) # Create mock instances mock_aws_instance = MagicMock() @@ -86,11 +84,10 @@ def test_init_with_multiple_targets(self, mock_aws, mock_azure, mock_postgres): def test_init_with_unavailable_target(self): """Test ChannelSpliter initialization when target is not available.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) mock_channel_class = MagicMock() mock_channel_instance = MagicMock() @@ -107,8 +104,6 @@ def test_init_with_unavailable_target(self): def test_init_with_mixed_availability(self, mock_aws, mock_azure): """Test ChannelSpliter initialization with mixed target availability.""" # Arrange - mock_config = MagicMock() - mock_output1 = MagicMock() mock_output1.type = "s3" mock_output1.conf = {"s3": "config"} @@ -117,7 +112,7 @@ def test_init_with_mixed_availability(self, mock_aws, mock_azure): mock_output2.type = "az_storage" mock_output2.conf = {"azure": "config"} - mock_config.outputs = [mock_output1, mock_output2] + mock_config = Dotdict({"outputs": [mock_output1, mock_output2]}) # AWS is available mock_aws_instance = MagicMock() @@ -146,11 +141,10 @@ def test_init_with_mixed_availability(self, mock_aws, mock_azure): def test_send_success(self): """Test send method when all targets succeed.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) mock_channel_class = MagicMock() mock_channel_instance = MagicMock() @@ -170,11 +164,10 @@ def test_send_success(self): def test_send_with_filter(self): """Test send method with filter.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) mock_channel_class = MagicMock() mock_channel_instance = MagicMock() @@ -196,8 +189,6 @@ def test_send_with_filter(self): def test_send_multiple_targets(self, mock_aws, mock_azure): """Test send method with multiple targets.""" # Arrange - mock_config = MagicMock() - mock_output1 = MagicMock() mock_output1.type = "s3" mock_output1.conf = {"s3": "config"} @@ -206,7 +197,7 @@ def test_send_multiple_targets(self, mock_aws, mock_azure): mock_output2.type = "az_storage" mock_output2.conf = {"azure": "config"} - mock_config.outputs = [mock_output1, mock_output2] + mock_config = Dotdict({"outputs": [mock_output1, mock_output2]}) mock_aws_instance = MagicMock() mock_aws_instance.is_available.return_value = True @@ -239,11 +230,10 @@ def test_send_multiple_targets(self, mock_aws, mock_azure): def test_send_with_exception(self): """Test send method when target raises exception.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) mock_channel_class = MagicMock() mock_channel_instance = MagicMock() @@ -262,11 +252,10 @@ def test_send_with_exception(self): def test_delete_success(self): """Test delete method when all targets succeed.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) mock_channel_class = MagicMock() mock_channel_instance = MagicMock() @@ -288,8 +277,6 @@ def test_delete_success(self): def test_delete_multiple_targets(self, mock_aws, mock_azure): """Test delete method with multiple targets.""" # Arrange - mock_config = MagicMock() - mock_output1 = MagicMock() mock_output1.type = "s3" mock_output1.conf = {"s3": "config"} @@ -298,7 +285,7 @@ def test_delete_multiple_targets(self, mock_aws, mock_azure): mock_output2.type = "az_storage" mock_output2.conf = {"azure": "config"} - mock_config.outputs = [mock_output1, mock_output2] + mock_config = Dotdict({"outputs": [mock_output1, mock_output2]}) mock_aws_instance = MagicMock() mock_aws_instance.is_available.return_value = True @@ -330,11 +317,10 @@ def test_delete_multiple_targets(self, mock_aws, mock_azure): def test_delete_with_exception(self): """Test delete method when target raises exception.""" # Arrange - mock_config = MagicMock() mock_output = MagicMock() mock_output.type = "s3" mock_output.conf = {"test": "config"} - mock_config.outputs = [mock_output] + mock_config = Dotdict({"outputs": [mock_output]}) mock_channel_class = MagicMock() mock_channel_instance = MagicMock() diff --git a/tests/unit/coal/test_store/test_output/test_postgres_channel.py b/tests/unit/coal/test_store/test_output/test_postgres_channel.py index a58dc42b..19046000 100644 --- a/tests/unit/coal/test_store/test_output/test_postgres_channel.py +++ b/tests/unit/coal/test_store/test_output/test_postgres_channel.py @@ -10,33 +10,38 @@ import pytest from cosmotech.coal.store.output.postgres_channel import PostgresChannel +from cosmotech.coal.utils.configuration import Configuration -class TestPostgresChannel: - """Tests for the PostgresChannel class.""" - - def test_init_with_configuration(self): - """Test PostgresChannel initialization with configuration.""" - # Arrange - config = { +@pytest.fixture +def base_postgres_config(): + return Configuration( + { "cosmotech": { - "dataset_absolute_path": "/path/to/dataset", + "parameters_absolute_path": "/path/to/dataset", "organization_id": "org123", "workspace_id": "ws456", "runner_id": "run789", }, "postgres": { "host": "localhost", - "post": "5432", + "port": "5432", "db_name": "testdb", "db_schema": "public", "user_name": "testuser", "user_password": "testpass", }, } + ) + +class TestPostgresChannel: + """Tests for the PostgresChannel class.""" + + def test_init_with_configuration(self, base_postgres_config): + """Test PostgresChannel initialization with configuration.""" # Act - channel = PostgresChannel(config) + channel = PostgresChannel(base_postgres_config) # Assert assert channel.configuration is not None @@ -44,14 +49,15 @@ def test_init_with_configuration(self): def test_required_keys(self): """Test that required_keys are properly defined.""" # Assert + assert "coal" in PostgresChannel.required_keys + assert "store" in PostgresChannel.required_keys["coal"] assert "cosmotech" in PostgresChannel.required_keys - assert "postgres" in PostgresChannel.required_keys - assert "dataset_absolute_path" in PostgresChannel.required_keys["cosmotech"] assert "organization_id" in PostgresChannel.required_keys["cosmotech"] assert "workspace_id" in PostgresChannel.required_keys["cosmotech"] assert "runner_id" in PostgresChannel.required_keys["cosmotech"] + assert "postgres" in PostgresChannel.required_keys assert "host" in PostgresChannel.required_keys["postgres"] - assert "post" in PostgresChannel.required_keys["postgres"] + assert "port" in PostgresChannel.required_keys["postgres"] assert "db_name" in PostgresChannel.required_keys["postgres"] assert "db_schema" in PostgresChannel.required_keys["postgres"] assert "user_name" in PostgresChannel.required_keys["postgres"] @@ -59,28 +65,10 @@ def test_required_keys(self): @patch("cosmotech.coal.store.output.postgres_channel.dump_store_to_postgresql_from_conf") @patch("cosmotech.coal.store.output.postgres_channel.send_runner_metadata_to_postgresql") - def test_send_without_filter(self, mock_send_metadata, mock_dump): + def test_send_without_filter(self, mock_send_metadata, mock_dump, base_postgres_config): """Test sending data without table filter.""" - # Arrange - config = { - "cosmotech": { - "dataset_absolute_path": "/path/to/dataset", - "organization_id": "org123", - "workspace_id": "ws456", - "runner_id": "run789", - }, - "postgres": { - "host": "localhost", - "post": "5432", - "db_name": "testdb", - "db_schema": "public", - "user_name": "testuser", - "user_password": "testpass", - }, - } - mock_send_metadata.return_value = "run_id_123" - channel = PostgresChannel(config) + channel = PostgresChannel(base_postgres_config) # Act channel.send() @@ -91,34 +79,16 @@ def test_send_without_filter(self, mock_send_metadata, mock_dump): # Check the arguments passed to dump_store_to_postgresql_from_conf call_args = mock_dump.call_args - assert call_args[1]["store_folder"] == "/path/to/dataset" - assert call_args[1]["selected_tables"] is None - assert call_args[1]["fk_id"] == "run_id_123" + assert call_args.kwargs["configuration"] == base_postgres_config + assert call_args.kwargs["selected_tables"] is None + assert call_args.kwargs["fk_id"] == "run_id_123" @patch("cosmotech.coal.store.output.postgres_channel.dump_store_to_postgresql_from_conf") @patch("cosmotech.coal.store.output.postgres_channel.send_runner_metadata_to_postgresql") - def test_send_with_filter(self, mock_send_metadata, mock_dump): + def test_send_with_filter(self, mock_send_metadata, mock_dump, base_postgres_config): """Test sending data with table filter.""" - # Arrange - config = { - "cosmotech": { - "dataset_absolute_path": "/path/to/dataset", - "organization_id": "org123", - "workspace_id": "ws456", - "runner_id": "run789", - }, - "postgres": { - "host": "localhost", - "post": "5432", - "db_name": "testdb", - "db_schema": "public", - "user_name": "testuser", - "user_password": "testpass", - }, - } - mock_send_metadata.return_value = "run_id_456" - channel = PostgresChannel(config) + channel = PostgresChannel(base_postgres_config) tables_filter = ["table1", "table2", "table3"] # Act @@ -130,32 +100,15 @@ def test_send_with_filter(self, mock_send_metadata, mock_dump): # Check the arguments passed to dump_store_to_postgresql_from_conf call_args = mock_dump.call_args - assert call_args[1]["store_folder"] == "/path/to/dataset" + assert call_args[1]["configuration"] == base_postgres_config assert call_args[1]["selected_tables"] == ["table1", "table2", "table3"] assert call_args[1]["fk_id"] == "run_id_456" @patch("cosmotech.coal.store.output.postgres_channel.remove_runner_metadata_from_postgresql") - def test_delete(self, mock_remove_metadata): + def test_delete(self, mock_remove_metadata, base_postgres_config): """Test delete method.""" - # Arrange - config = { - "cosmotech": { - "dataset_absolute_path": "/path/to/dataset", - "organization_id": "org123", - "workspace_id": "ws456", - "runner_id": "run789", - }, - "postgres": { - "host": "localhost", - "post": "5432", - "db_name": "testdb", - "db_schema": "public", - "user_name": "testuser", - "user_password": "testpass", - }, - } - channel = PostgresChannel(config) + channel = PostgresChannel(base_postgres_config) # Act channel.delete() @@ -164,4 +117,4 @@ def test_delete(self, mock_remove_metadata): mock_remove_metadata.assert_called_once() # Check that configuration was passed call_args = mock_remove_metadata.call_args - assert call_args[0][0] is not None # Configuration object passed + assert call_args.args[0] == base_postgres_config diff --git a/tests/unit/coal/test_store/test_store_csv.py b/tests/unit/coal/test_store/test_store_csv.py index dfff0717..b186c942 100644 --- a/tests/unit/coal/test_store/test_store_csv.py +++ b/tests/unit/coal/test_store/test_store_csv.py @@ -6,13 +6,13 @@ # specifically authorized by written means by Cosmo Tech. import pathlib -import pytest from unittest.mock import MagicMock, patch import pyarrow as pa import pyarrow.csv as pc +import pytest -from cosmotech.coal.store.csv import store_csv_file, convert_store_table_to_csv +from cosmotech.coal.store.csv import convert_store_table_to_csv, store_csv_file from cosmotech.coal.store.store import Store diff --git a/tests/unit/coal/test_store/test_store_import_errors.py b/tests/unit/coal/test_store/test_store_import_errors.py index 294af9b7..8bdd64b1 100644 --- a/tests/unit/coal/test_store/test_store_import_errors.py +++ b/tests/unit/coal/test_store/test_store_import_errors.py @@ -5,7 +5,6 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from unittest.mock import patch, MagicMock import pytest diff --git a/tests/unit/coal/test_store/test_store_init.py b/tests/unit/coal/test_store/test_store_init.py index 3f20ebc0..9c52ddc6 100644 --- a/tests/unit/coal/test_store/test_store_init.py +++ b/tests/unit/coal/test_store/test_store_init.py @@ -5,9 +5,7 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from unittest.mock import patch, MagicMock import pytest -import importlib class TestStoreInit: diff --git a/tests/unit/coal/test_store/test_store_native_python.py b/tests/unit/coal/test_store/test_store_native_python.py index b4d6eb40..9aa97216 100644 --- a/tests/unit/coal/test_store/test_store_native_python.py +++ b/tests/unit/coal/test_store/test_store_native_python.py @@ -5,12 +5,12 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import pytest from unittest.mock import MagicMock, patch import pyarrow as pa +import pytest -from cosmotech.coal.store.native_python import store_pylist, convert_table_as_pylist +from cosmotech.coal.store.native_python import convert_table_as_pylist, store_pylist from cosmotech.coal.store.store import Store diff --git a/tests/unit/coal/test_store/test_store_pandas.py b/tests/unit/coal/test_store/test_store_pandas.py index da7e0105..838cf201 100644 --- a/tests/unit/coal/test_store/test_store_pandas.py +++ b/tests/unit/coal/test_store/test_store_pandas.py @@ -5,13 +5,16 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import pytest from unittest.mock import MagicMock, patch -import pyarrow as pa import pandas as pd +import pyarrow as pa +import pytest -from cosmotech.coal.store.pandas import store_dataframe, convert_store_table_to_dataframe +from cosmotech.coal.store.pandas import ( + convert_store_table_to_dataframe, + store_dataframe, +) from cosmotech.coal.store.store import Store diff --git a/tests/unit/coal/test_store/test_store_pyarrow.py b/tests/unit/coal/test_store/test_store_pyarrow.py index de0be512..4e3c4083 100644 --- a/tests/unit/coal/test_store/test_store_pyarrow.py +++ b/tests/unit/coal/test_store/test_store_pyarrow.py @@ -5,12 +5,12 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pyarrow as pa +import pytest -from cosmotech.coal.store.pyarrow import store_table, convert_store_table_to_dataframe +from cosmotech.coal.store.pyarrow import convert_store_table_to_dataframe, store_table from cosmotech.coal.store.store import Store diff --git a/tests/unit/coal/test_store/test_store_store.py b/tests/unit/coal/test_store/test_store_store.py index f920549f..8098ac3e 100644 --- a/tests/unit/coal/test_store/test_store_store.py +++ b/tests/unit/coal/test_store/test_store_store.py @@ -5,15 +5,15 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os import pathlib -import pytest -from unittest.mock import MagicMock, patch, mock_open +from unittest.mock import MagicMock, patch import pyarrow as pa +import pytest from adbc_driver_sqlite import dbapi from cosmotech.coal.store.store import Store +from cosmotech.coal.utils import configuration class TestStore: @@ -348,13 +348,15 @@ def test_init_with_reset(self, mock_unlink, mock_exists, mock_mkdir): def test_init_with_custom_location(self, mock_mkdir): """Test the __init__ method with a custom store_location.""" # Arrange - custom_location = pathlib.Path("/custom/path") + _c = configuration.Configuration() + custom_location = "/custom/path" + _c.coal.store = custom_location # Act - store = Store(store_location=custom_location) + store = Store(configuration=_c) # Assert mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) - assert store.store_location == custom_location / ".coal/store" - assert store._database_path == custom_location / ".coal/store" / "db.sqlite" + assert store.store_location == pathlib.Path(custom_location) / ".coal/store" + assert store._database_path == pathlib.Path(custom_location) / ".coal/store" / "db.sqlite" assert store._database == str(store._database_path) diff --git a/tests/unit/coal/test_utils/test_utils_configuration.py b/tests/unit/coal/test_utils/test_utils_configuration.py index 6004a529..6668255e 100644 --- a/tests/unit/coal/test_utils/test_utils_configuration.py +++ b/tests/unit/coal/test_utils/test_utils_configuration.py @@ -10,11 +10,19 @@ import pytest from cosmotech.coal.utils import configuration -from cosmotech.coal.utils.configuration import Dotdict +from cosmotech.coal.utils.configuration import Dotdict, ReferenceKeyError class TestUtilsConfiguration: + @pytest.fixture(autouse=True) + def reset_environ(self): + if "CONFIG_FILE_PATH" in os.environ: + os.environ.pop("CONFIG_FILE_PATH") + if "LOG_LEVEL" in os.environ: + os.environ.pop("LOG_LEVEL") + yield + def test_no_config_file(self): c = configuration.Configuration() assert isinstance(c, Dotdict) @@ -29,7 +37,6 @@ def test_no_config_file_with_env_var(self): assert c.log_level == "test_value" def test_config_file_with_secrets(self): - os.environ["CONFIG_FILE_PATH"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "conf.ini")) os.environ["faismoinlmalin"] = "la" @@ -40,6 +47,36 @@ def test_config_file_with_secrets(self): c.secrets assert c.foo.alors == "la" + def test_config_file_with_ref(self): + os.environ["CONFIG_FILE_PATH"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "conf.ini")) + c = configuration.Configuration() + c.section.ref = "$section.sub.TEST" + + assert c.section.ref == c.section.sub.TEST + + def test_safe_get(self): + os.environ["LOG_LEVEL"] = "test_value" + c = configuration.Configuration() + + assert c.safe_get("log_level") == "test_value" + + def test_safe_get_none_dict(self): + os.environ["LOG_LEVEL"] = "test_value" + c = configuration.Configuration() + + assert c.safe_get("log_level.test") is None + + def test_safe_get_sub_section(self): + os.environ["CONFIG_FILE_PATH"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "conf.ini")) + c = configuration.Configuration() + + assert c.safe_get("DEFAULT.A.test") == 1 + + def test_safe_get_default_value(self): + c = configuration.Configuration() + + assert c.safe_get("log_level", "thats_a_no") == "thats_a_no" + class TestUtilsDotdict: @@ -65,7 +102,6 @@ def test_nested_merge(self): db1.merge(db2) - print(db1) expected = {"pain": {"baguette": 4, "sesame": 4, "noix": 1}, "croissant": 5, "chocolatine": 5} assert db1 == expected @@ -75,6 +111,16 @@ def test_ref_value(self): assert dotdict_a.ref.ref_lvl3 == "here" + def test_nested_ref_value(self): + dict_a = { + "lvl1": {"lvl2": {"lvl3": "here"}}, + "ref": {"ref_lvl3": "$lvl1.lvl2.lvl3", "ref_ref": "$ref.ref_lvl3"}, + } + + dotdict_a = Dotdict(dict_a) + + assert dotdict_a.ref.ref_ref == "here" + def test_ref_dict_lvl(self): dict_a = {"lvl1": {"lvl2": {"lvl3": "here"}}, "ref": {"ref_lvl2": "$lvl1.lvl2"}} dotdict_a = Dotdict(dict_a) @@ -97,11 +143,11 @@ def test_ref_dict_update(self): dotdict_a.lvl1.lvl2.lvl3 = "there" assert dotdict_a.ref.ref_lvl2 == {"lvl3": "there"} - def test_unknow_ref_key_error(self): + def test_unknown_ref_key_error(self): dict_a = {"lvl1": {"lvl2": {"lvl3": "here"}}, "ref_lvl99": "$lvl1.lvl2.lvl99"} dotdict_a = Dotdict(dict_a) - with pytest.raises(KeyError): + with pytest.raises(ReferenceKeyError): dotdict_a.ref_lvl99 def test_ref_in_sub_dict(self): diff --git a/tests/unit/coal/test_utils/test_utils_init.py b/tests/unit/coal/test_utils/test_utils_init.py index ef9ac46d..8bcbb59f 100644 --- a/tests/unit/coal/test_utils/test_utils_init.py +++ b/tests/unit/coal/test_utils/test_utils_init.py @@ -7,7 +7,7 @@ import pytest -from cosmotech.coal.utils import strtobool, WEB_DOCUMENTATION_ROOT +from cosmotech.coal.utils import WEB_DOCUMENTATION_ROOT, strtobool class TestUtilsInit: diff --git a/tutorial/contributing/command/command.py b/tutorial/contributing/command/command.py index 03ae71db..9f37657a 100644 --- a/tutorial/contributing/command/command.py +++ b/tutorial/contributing/command/command.py @@ -5,10 +5,11 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -from cosmotech.csm_data.utils.click import click -from cosmotech.csm_data.utils.decorators import web_help, translate_help from cosmotech.orchestrator.utils.translate import T +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + @click.command() @web_help("csm-data/store/dump-to-mongodb") diff --git a/tutorial/contributing/command/register.py b/tutorial/contributing/command/register.py index d210e26a..cda14ccc 100644 --- a/tutorial/contributing/command/register.py +++ b/tutorial/contributing/command/register.py @@ -6,12 +6,16 @@ # specifically authorized by written means by Cosmo Tech. from cosmotech.csm_data.commands.store.dump_to_azure import dump_to_azure +from cosmotech.csm_data.commands.store.dump_to_mongodb import ( + dump_to_mongodb, # Add this line +) from cosmotech.csm_data.commands.store.dump_to_postgresql import dump_to_postgresql from cosmotech.csm_data.commands.store.dump_to_s3 import dump_to_s3 -from cosmotech.csm_data.commands.store.dump_to_mongodb import dump_to_mongodb # Add this line from cosmotech.csm_data.commands.store.list_tables import list_tables from cosmotech.csm_data.commands.store.load_csv_folder import load_csv_folder -from cosmotech.csm_data.commands.store.load_from_singlestore import load_from_singlestore +from cosmotech.csm_data.commands.store.load_from_singlestore import ( + load_from_singlestore, +) from cosmotech.csm_data.commands.store.reset import reset from cosmotech.csm_data.commands.store.store import store diff --git a/tutorial/contributing/mongodb/store.py b/tutorial/contributing/mongodb/store.py index 7c5eb965..65d8af1b 100644 --- a/tutorial/contributing/mongodb/store.py +++ b/tutorial/contributing/mongodb/store.py @@ -13,12 +13,13 @@ """ from time import perf_counter + import pyarrow import pymongo +from cosmotech.orchestrator.utils.translate import T from cosmotech.coal.store.store import Store from cosmotech.coal.utils.logger import LOGGER -from cosmotech.orchestrator.utils.translate import T def send_pyarrow_table_to_mongodb( diff --git a/tutorial/contributing/testing/store_test.py b/tutorial/contributing/testing/store_test.py index 15551909..d7972531 100644 --- a/tutorial/contributing/testing/store_test.py +++ b/tutorial/contributing/testing/store_test.py @@ -5,14 +5,16 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. -import os import tempfile -from unittest.mock import patch, MagicMock +from unittest.mock import MagicMock, patch import pyarrow import pytest -from cosmotech.coal.mongodb.store import send_pyarrow_table_to_mongodb, dump_store_to_mongodb +from cosmotech.coal.mongodb.store import ( + dump_store_to_mongodb, + send_pyarrow_table_to_mongodb, +) from cosmotech.coal.store.store import Store diff --git a/tutorial/cosmotech-api/complete_workflow.py b/tutorial/cosmotech-api/complete_workflow.py index 44a4b761..df7d432b 100644 --- a/tutorial/cosmotech-api/complete_workflow.py +++ b/tutorial/cosmotech-api/complete_workflow.py @@ -1,21 +1,23 @@ # Example: Complete workflow using the CosmoTech API +import csv +import json import os import pathlib -import json -import csv + +from cosmotech_api.api.dataset_api import DatasetApi +from cosmotech_api.api.twin_graph_api import TwinGraphApi + from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.cosmotech_api.runner import ( - get_runner_data, download_runner_data, + get_runner_data, ) +from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile from cosmotech.coal.cosmotech_api.workspace import ( - list_workspace_files, download_workspace_file, + list_workspace_files, upload_workspace_file, ) -from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile -from cosmotech_api.api.twin_graph_api import TwinGraphApi -from cosmotech_api.api.dataset_api import DatasetApi from cosmotech.coal.utils.logger import LOGGER # Set up environment variables for authentication @@ -139,27 +141,27 @@ """ # Create a dataset dataset_api = DatasetApi(api_client) - + new_dataset = { "name": "Customers with Loyalty Scores", "description": "Processed customer data with calculated loyalty scores", "tags": ["processed", "customers", "loyalty"] } - + try: dataset = dataset_api.create_dataset( organization_id=organization_id, workspace_id=workspace_id, dataset=new_dataset ) - + dataset_id = dataset.id print(f"Created dataset with ID: {dataset_id}") - + # Upload the processed file to the dataset # This would typically involve additional API calls # ... - + except Exception as e: print(f"Error creating dataset: {e}") """ @@ -178,14 +180,14 @@ # In a real scenario, you would send this data to the Twin Data Layer """ twin_graph_api = TwinGraphApi(api_client) - + # For each customer, create a node in the Twin Data Layer with open(processed_file, "r") as f: reader = csv.DictReader(f) for row in reader: # Create parameters for the Cypher query params = {k: v for k, v in row.items()} - + # Execute the query twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, diff --git a/tutorial/cosmotech-api/connection_setup.py b/tutorial/cosmotech-api/connection_setup.py index 10485ff3..7926cc6e 100644 --- a/tutorial/cosmotech-api/connection_setup.py +++ b/tutorial/cosmotech-api/connection_setup.py @@ -1,5 +1,6 @@ # Example: Setting up connections to the CosmoTech API import os + from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.utils.logger import LOGGER diff --git a/tutorial/cosmotech-api/runner_operations.py b/tutorial/cosmotech-api/runner_operations.py index 4d7c2ce6..27f290c9 100644 --- a/tutorial/cosmotech-api/runner_operations.py +++ b/tutorial/cosmotech-api/runner_operations.py @@ -1,12 +1,13 @@ # Example: Working with runners and runs in the CosmoTech API import os import pathlib + from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.cosmotech_api.runner import ( + download_datasets, + download_runner_data, get_runner_data, get_runner_parameters, - download_runner_data, - download_datasets, ) from cosmotech.coal.utils.logger import LOGGER @@ -79,15 +80,15 @@ # Example 5: Download specific datasets """ from cosmotech.coal.cosmotech_api.runner import get_dataset_ids_from_runner - + # Get dataset IDs from the runner dataset_ids = get_dataset_ids_from_runner(runner_data) - + if dataset_ids: # Create a directory for the datasets specific_dataset_dir = pathlib.Path("./specific_datasets") specific_dataset_dir.mkdir(exist_ok=True, parents=True) - + # Download the datasets datasets = download_datasets( organization_id=organization_id, @@ -96,7 +97,7 @@ read_files=True, parallel=True, ) - + print("\nDownloaded specific datasets:") for dataset_id, dataset_info in datasets.items(): print(f" - Dataset ID: {dataset_id}") diff --git a/tutorial/cosmotech-api/twin_data_layer.py b/tutorial/cosmotech-api/twin_data_layer.py index 6356c48a..0036f658 100644 --- a/tutorial/cosmotech-api/twin_data_layer.py +++ b/tutorial/cosmotech-api/twin_data_layer.py @@ -1,10 +1,12 @@ # Example: Working with the Twin Data Layer in the CosmoTech API +import csv import os import pathlib -import csv + +from cosmotech_api.api.twin_graph_api import TwinGraphApi + from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.cosmotech_api.twin_data_layer import CSVSourceFile -from cosmotech_api.api.twin_graph_api import TwinGraphApi from cosmotech.coal.utils.logger import LOGGER # Set up environment variables for authentication @@ -85,7 +87,7 @@ for row in reader: # Create parameters for the Cypher query params = {k: v for k, v in row.items()} - + # Execute the query twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, @@ -96,14 +98,14 @@ "parameters": params } ) - + # For relationships, you would typically: with open(knows_file, "r") as f: reader = csv.DictReader(f) for row in reader: # Create parameters for the Cypher query params = {k: v for k, v in row.items()} - + # Execute the query twin_graph_api.run_twin_graph_cypher_query( organization_id=organization_id, @@ -128,7 +130,7 @@ "parameters": {} } ) - + # Process the results print("\nPerson nodes in the Twin Data Layer:") for record in result.records: diff --git a/tutorial/cosmotech-api/workspace_operations.py b/tutorial/cosmotech-api/workspace_operations.py index 4db59623..90114ee1 100644 --- a/tutorial/cosmotech-api/workspace_operations.py +++ b/tutorial/cosmotech-api/workspace_operations.py @@ -1,10 +1,11 @@ # Example: Working with workspaces in the CosmoTech API import os import pathlib + from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.cosmotech_api.workspace import ( - list_workspace_files, download_workspace_file, + list_workspace_files, upload_workspace_file, ) from cosmotech.coal.utils.logger import LOGGER diff --git a/tutorial/datastore/basic_example.py b/tutorial/datastore/basic_example.py index c68b79e5..87d0afbd 100644 --- a/tutorial/datastore/basic_example.py +++ b/tutorial/datastore/basic_example.py @@ -1,5 +1,5 @@ -from cosmotech.coal.store.store import Store from cosmotech.coal.store.native_python import store_pylist +from cosmotech.coal.store.store import Store # We initialize and reset the data store my_datastore = Store(reset=True) diff --git a/tutorial/datastore/complete_pipeline.py b/tutorial/datastore/complete_pipeline.py index e725992d..96e559cf 100644 --- a/tutorial/datastore/complete_pipeline.py +++ b/tutorial/datastore/complete_pipeline.py @@ -1,7 +1,8 @@ -from cosmotech.coal.store.store import Store -from cosmotech.coal.store.native_python import store_pylist, convert_table_as_pylist import pathlib -from cosmotech.coal.store.csv import store_csv_file, convert_store_table_to_csv + +from cosmotech.coal.store.csv import convert_store_table_to_csv, store_csv_file +from cosmotech.coal.store.native_python import convert_table_as_pylist, store_pylist +from cosmotech.coal.store.store import Store # Initialize the store store = Store(reset=True) @@ -14,7 +15,7 @@ store.execute_query( """ CREATE TABLE cleaned_data AS - SELECT + SELECT id, TRIM(name) as name, UPPER(category) as category, diff --git a/tutorial/datastore/csv_files.py b/tutorial/datastore/csv_files.py index 724f35c6..c99d58b9 100644 --- a/tutorial/datastore/csv_files.py +++ b/tutorial/datastore/csv_files.py @@ -1,6 +1,7 @@ import pathlib + +from cosmotech.coal.store.csv import convert_store_table_to_csv, store_csv_file from cosmotech.coal.store.store import Store -from cosmotech.coal.store.csv import store_csv_file, convert_store_table_to_csv # Initialize the store store = Store(reset=True) @@ -12,7 +13,7 @@ # Query the data high_value_customers = store.execute_query( """ - SELECT * FROM customers + SELECT * FROM customers WHERE annual_spend > 10000 ORDER BY annual_spend DESC """ diff --git a/tutorial/datastore/joining_tables.py b/tutorial/datastore/joining_tables.py index eeb632dc..499beea2 100644 --- a/tutorial/datastore/joining_tables.py +++ b/tutorial/datastore/joining_tables.py @@ -1,5 +1,5 @@ -from cosmotech.coal.store.store import Store from cosmotech.coal.store.native_python import store_pylist +from cosmotech.coal.store.store import Store store = Store(reset=True) diff --git a/tutorial/datastore/pandas_dataframes.py b/tutorial/datastore/pandas_dataframes.py index 8841536b..26f8706b 100644 --- a/tutorial/datastore/pandas_dataframes.py +++ b/tutorial/datastore/pandas_dataframes.py @@ -1,6 +1,10 @@ import pandas as pd + +from cosmotech.coal.store.pandas import ( + convert_store_table_to_dataframe, + store_dataframe, +) from cosmotech.coal.store.store import Store -from cosmotech.coal.store.pandas import store_dataframe, convert_store_table_to_dataframe # Initialize the store store = Store(reset=True) diff --git a/tutorial/datastore/pyarrow_tables.py b/tutorial/datastore/pyarrow_tables.py index fd919359..d7c3cee8 100644 --- a/tutorial/datastore/pyarrow_tables.py +++ b/tutorial/datastore/pyarrow_tables.py @@ -1,6 +1,7 @@ import pyarrow as pa -from cosmotech.coal.store.store import Store + from cosmotech.coal.store.pyarrow import store_table +from cosmotech.coal.store.store import Store # Initialize the store store = Store(reset=True) diff --git a/tutorial/datastore/step1_load_data.py b/tutorial/datastore/step1_load_data.py index 7a993b52..8bfeb2b4 100644 --- a/tutorial/datastore/step1_load_data.py +++ b/tutorial/datastore/step1_load_data.py @@ -1,7 +1,8 @@ -from cosmotech.coal.store.store import Store -from cosmotech.coal.store.csv import store_csv_file import pathlib +from cosmotech.coal.store.csv import store_csv_file +from cosmotech.coal.store.store import Store + # Initialize the store store = Store(reset=True) diff --git a/tutorial/datastore/step2_clean_data.py b/tutorial/datastore/step2_clean_data.py index 81e9beae..b5a47f6d 100644 --- a/tutorial/datastore/step2_clean_data.py +++ b/tutorial/datastore/step2_clean_data.py @@ -2,7 +2,7 @@ store.execute_query( """ CREATE TABLE cleaned_data AS - SELECT + SELECT id, TRIM(name) as name, UPPER(category) as category, diff --git a/tutorial/datastore/step4_export_results.py b/tutorial/datastore/step4_export_results.py index fd5a9e15..5de690c9 100644 --- a/tutorial/datastore/step4_export_results.py +++ b/tutorial/datastore/step4_export_results.py @@ -1,7 +1,8 @@ -from cosmotech.coal.store.native_python import convert_table_as_pylist -from cosmotech.coal.store.csv import convert_store_table_to_csv import pathlib +from cosmotech.coal.store.csv import convert_store_table_to_csv +from cosmotech.coal.store.native_python import convert_table_as_pylist + # Export to Python list summary_data = convert_table_as_pylist("summary_data", store=store) print(summary_data)