Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/track_dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- main

jobs:
generate-sbom:
runs-on: ubuntu-latest
Expand All @@ -28,4 +28,3 @@ jobs:
apikey: ${{ secrets.DEPENDENCY_TRACK_API_KEY }}
project: '8d39a492-bf9e-49fa-a58c-b391ed4a1243'
bomfilename: 'sbom.json'

2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include README.md
include LICENSE
graft cosmotech/orchestrator_plugins
graft cosmotech/translation
graft cosmotech/translation
10 changes: 5 additions & 5 deletions cosmotech/coal/azure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
This module provides functions for interacting with Azure services like Storage and ADX.
"""

# Re-export blob functions for easier importing
from cosmotech.coal.azure.blob import (
dump_store_to_azure,
)

# Re-export storage functions for easier importing
from cosmotech.coal.azure.storage import (
upload_file,
upload_folder,
)

# Re-export blob functions for easier importing
from cosmotech.coal.azure.blob import (
dump_store_to_azure,
)
34 changes: 24 additions & 10 deletions cosmotech/coal/azure/adx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,36 @@
# etc., to any person is prohibited unless it has been previously and
# specifically authorized by written means by Cosmo Tech.

from cosmotech.coal.azure.adx.auth import create_kusto_client, create_ingest_client, initialize_clients
from cosmotech.coal.azure.adx.query import run_query, run_command_query
from cosmotech.coal.azure.adx.auth import (
create_ingest_client,
create_kusto_client,
initialize_clients,
)
from cosmotech.coal.azure.adx.ingestion import (
ingest_dataframe,
send_to_adx,
IngestionStatus,
check_ingestion_status,
monitor_ingestion,
handle_failures,
IngestionStatus,
ingest_dataframe,
monitor_ingestion,
send_to_adx,
)
from cosmotech.coal.azure.adx.tables import table_exists, create_table, check_and_create_table, _drop_by_tag
from cosmotech.coal.azure.adx.utils import type_mapping, create_column_mapping
from cosmotech.coal.azure.adx.store import send_pyarrow_table_to_adx, send_table_data, process_tables, send_store_to_adx
from cosmotech.coal.azure.adx.query import run_command_query, run_query
from cosmotech.coal.azure.adx.runner import (
prepare_csv_content,
construct_create_query,
insert_csv_files,
prepare_csv_content,
send_runner_data,
)
from cosmotech.coal.azure.adx.store import (
process_tables,
send_pyarrow_table_to_adx,
send_store_to_adx,
send_table_data,
)
from cosmotech.coal.azure.adx.tables import (
_drop_by_tag,
check_and_create_table,
create_table,
table_exists,
)
from cosmotech.coal.azure.adx.utils import create_column_mapping, type_mapping
24 changes: 10 additions & 14 deletions cosmotech/coal/azure/adx/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,24 @@
# etc., to any person is prohibited unless it has been previously and
# specifically authorized by written means by Cosmo Tech.

import os
import time
from enum import Enum
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
from typing import Dict, Iterator, List, Optional, Tuple

import os
import pandas as pd
import time
import tqdm
from azure.kusto.data import KustoClient
from azure.kusto.data.data_format import DataFormat
from azure.kusto.ingest import IngestionProperties
from azure.kusto.ingest import QueuedIngestClient
from azure.kusto.ingest import ReportLevel
from azure.kusto.ingest.status import FailureMessage
from azure.kusto.ingest.status import KustoIngestStatusQueues
from azure.kusto.ingest.status import SuccessMessage
from azure.kusto.ingest import IngestionProperties, QueuedIngestClient, ReportLevel
from azure.kusto.ingest.status import (
FailureMessage,
KustoIngestStatusQueues,
SuccessMessage,
)
from cosmotech.orchestrator.utils.translate import T

from cosmotech.coal.azure.adx.tables import create_table, _drop_by_tag
from cosmotech.coal.azure.adx.tables import _drop_by_tag, create_table
from cosmotech.coal.azure.adx.utils import type_mapping
from cosmotech.coal.utils.logger import LOGGER

Expand Down
2 changes: 1 addition & 1 deletion cosmotech/coal/azure/adx/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from azure.kusto.data import KustoClient
from azure.kusto.data.response import KustoResponseDataSet
from cosmotech.orchestrator.utils.translate import T

from cosmotech.coal.utils.logger import LOGGER
from cosmotech.orchestrator.utils.translate import T


def run_query(client: KustoClient, database: str, query: str) -> KustoResponseDataSet:
Expand Down
4 changes: 2 additions & 2 deletions cosmotech/coal/azure/adx/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
# etc., to any person is prohibited unless it has been previously and
# specifically authorized by written means by Cosmo Tech.

import dateutil.parser
from typing import Any, Dict

import dateutil.parser
import pyarrow
from cosmotech.orchestrator.utils.translate import T

from cosmotech.coal.utils.logger import LOGGER
from cosmotech.orchestrator.utils.translate import T


def create_column_mapping(data: pyarrow.Table) -> Dict[str, str]:
Expand Down
34 changes: 14 additions & 20 deletions cosmotech/coal/azure/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cosmotech.orchestrator.utils.translate import T

from cosmotech.coal.store.store import Store
from cosmotech.coal.utils.configuration import Configuration
from cosmotech.coal.utils.logger import LOGGER

VALID_TYPES = (
Expand All @@ -31,42 +32,35 @@


def dump_store_to_azure(
store_folder: str,
account_name: str,
container_name: str,
tenant_id: str,
client_id: str,
client_secret: str,
output_type: str = "sqlite",
file_prefix: str = "",
configuration: Configuration = Configuration(),
selected_tables: list[str] = [],
) -> None:
"""
Dump Store data to Azure Blob Storage.

Args:
store_folder: Folder containing the Store
account_name: Azure Storage account name
container_name: Azure Storage container name
tenant_id: Azure tenant ID
client_id: Azure client ID
client_secret: Azure client secret
output_type: Output file type (sqlite, csv, or parquet)
file_prefix: Prefix for uploaded files
configuration: Configuration utils class
selected_tables: List of tables name

Raises:
ValueError: If the output type is invalid
"""
_s = Store(store_location=store_folder)
_s = Store(configuration=configuration)
output_type = configuration.safe_get("azure.output_type", default="sqlite")
file_prefix = configuration.safe_get("azure.file_prefix", default="")

if output_type not in VALID_TYPES:
LOGGER.error(T("coal.common.validation.invalid_output_type").format(output_type=output_type))
raise ValueError(T("coal.common.validation.invalid_output_type").format(output_type=output_type))

container_client = BlobServiceClient(
account_url=f"https://{account_name}.blob.core.windows.net/",
credential=ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret),
).get_container_client(container_name)
account_url=f"https://{configuration.azure.account_name}.blob.core.windows.net/",
credential=ClientSecretCredential(
tenant_id=configuration.azure.tenant_id,
client_id=configuration.azure.client_id,
client_secret=configuration.azure.client_secret,
),
).get_container_client(configuration.azure.container_name)

def data_upload(data_stream: BytesIO, file_name: str):
uploaded_file_name = file_prefix + file_name
Expand Down
2 changes: 1 addition & 1 deletion cosmotech/coal/postgresql/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def send_runner_metadata_to_postgresql(
LOGGER.info(T("coal.services.postgresql.metadata"))
sql_upsert = f"""
INSERT INTO {schema_table} (id, name, last_csm_run_id, run_template_id)
VALUES(%s, %s, %s, %s)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id)
DO
UPDATE SET name = EXCLUDED.name, last_csm_run_id = EXCLUDED.last_csm_run_id;
Expand Down
34 changes: 18 additions & 16 deletions cosmotech/coal/postgresql/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,27 @@ def dump_store_to_postgresql(
selected_tables: list of tables to send
fk_id: foreign key id to add to all table on all rows
"""
_c = Configuration()
_c.postgres.host = postgres_host
_c.postgres.port = postgres_port
_c.postgres.db_name = postgres_db
_c.postgres.db_schema = postgres_schema
_c.postgres.user_name = postgres_user
_c.postgres.user_password = postgres_password
_c.postgres.password_encoding = force_encode
_c.postgres.table_prefix = table_prefix

dump_store_to_postgresql_from_conf(
configuration=_c, store_folder=store_folder, replace=replace, selected_tables=selected_tables, fk_id=fk_id
_c = Configuration(
{
"coal": {"store": store_folder},
"postgres": {
"host": postgres_host,
"port": postgres_port,
"db_name": postgres_db,
"db_schema": postgres_schema,
"user_name": postgres_user,
"user_password": postgres_password,
"password_encoding": force_encode,
"table_prefix": table_prefix,
},
}
)

dump_store_to_postgresql_from_conf(configuration=_c, replace=replace, selected_tables=selected_tables, fk_id=fk_id)


def dump_store_to_postgresql_from_conf(
configuration: Configuration,
store_folder: str,
replace: bool = True,
selected_tables: list[str] = [],
fk_id: str = None,
Expand All @@ -80,13 +83,12 @@ def dump_store_to_postgresql_from_conf(

Args:
configuration: coal Configuration
store_folder: Folder containing the Store
replace: Whether to replace existing tables
selected_tables: list of tables to send
fk_id: foreign key id to add to all table on all rows
"""
_psql = PostgresUtils(configuration)
_s = Store(store_location=store_folder)
_s = Store(configuration=configuration)

tables = list(_s.list_tables())
if selected_tables:
Expand All @@ -104,7 +106,7 @@ def dump_store_to_postgresql_from_conf(
f"""
ALTER TABLE {table_name}
ADD csm_run_id TEXT NOT NULL
DEFAULT ('{fk_id})
DEFAULT ('{fk_id}')
"""
)
data = _s.get_table(table_name)
Expand Down
3 changes: 2 additions & 1 deletion cosmotech/coal/postgresql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def full_uri(self) -> str:
f"/{self.db_name}"
)

def metadata_table_name(self):
@property
def metadata_table_name(self) -> str:
return f"{self.table_prefix}RunnerMetadata"

def get_postgresql_table_schema(self, target_table_name: str) -> Optional[pa.Schema]:
Expand Down
5 changes: 3 additions & 2 deletions cosmotech/coal/singlestore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
for store operations.
"""

import csv
import pathlib
import time
import csv

import singlestoredb as s2
from cosmotech.orchestrator.utils.translate import T

from cosmotech.coal.store.csv import store_csv_file
from cosmotech.coal.store.store import Store
from cosmotech.coal.utils.logger import LOGGER
from cosmotech.orchestrator.utils.translate import T


def _get_data(table_name: str, output_directory: str, cursor) -> None:
Expand Down
29 changes: 16 additions & 13 deletions cosmotech/coal/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,34 @@
including loading and converting data.
"""

# Re-export the Store class
from cosmotech.coal.store.store import Store

# Re-export functions from the csv module
from cosmotech.coal.store.csv import (
store_csv_file,
convert_store_table_to_csv,
store_csv_file,
)

# Re-export functions from the native_python module
from cosmotech.coal.store.native_python import (
store_pylist,
convert_table_as_pylist,
store_pylist,
)

# Re-export functions from the pandas module (if available)

from cosmotech.coal.store.pandas import (
store_dataframe,
convert_store_table_to_dataframe as convert_store_table_to_pandas_dataframe,
)

# Re-export functions from the pyarrow module (if available)

from cosmotech.coal.store.pandas import (
store_dataframe,
)
from cosmotech.coal.store.pyarrow import (
store_table,
convert_store_table_to_dataframe as convert_store_table_to_pyarrow_table,
)
from cosmotech.coal.store.pyarrow import (
store_table,
)

# Re-export the Store class
from cosmotech.coal.store.store import Store

# Re-export functions from the pandas module (if available)


# Re-export functions from the pyarrow module (if available)
Loading
Loading