Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions biasanalyzer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def set_config(self, config_file_path: str):
except ValidationError as ex:
notify_users(f"configuration yaml file is not valid with validation error: {ex}", level="error")

def set_root_omop(self):
def set_root_omop(self, read_only=True):
if not self.config:
notify_users(
"no valid configuration to set root OMOP CDM data. "
Expand All @@ -62,7 +62,7 @@ def set_root_omop(self):
self.bias_db = BiasDatabase(":memory:", omop_db_url=db_url)
elif db_type == "duckdb":
db_path = self.config["root_omop_cdm_database"].get("database", ":memory:")
self.omop_cdm_db = OMOPCDMDatabase(db_path)
self.omop_cdm_db = OMOPCDMDatabase(db_path, read_only=read_only)
self.bias_db = BiasDatabase(":memory:", omop_db_url=db_path)
else:
notify_users(f"Unsupported database type: {db_type}")
Expand Down
4 changes: 2 additions & 2 deletions biasanalyzer/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from biasanalyzer.concept import ConceptHierarchy
from biasanalyzer.config import load_cohort_creation_config
from biasanalyzer.database import BiasDatabase, OMOPCDMDatabase
from biasanalyzer.models import CohortDefinition, DOMAIN_MAPPING
from biasanalyzer.models import DOMAIN_MAPPING, CohortDefinition
from biasanalyzer.utils import clean_string, hellinger_distance, notify_users


Expand Down Expand Up @@ -60,7 +60,7 @@ def get_concept_stats(
Get cohort concept statistics such as concept prevalence
"""
if concept_type not in DOMAIN_MAPPING:
raise ValueError(f'input concept_type {concept_type} is not a valid concept type to get concept stats')
raise ValueError(f"input concept_type {concept_type} is not a valid concept type to get concept stats")

cohort_stats = self.bias_db.get_cohort_concept_stats(
self.cohort_id,
Expand Down
16 changes: 6 additions & 10 deletions biasanalyzer/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# ruff: noqa: S608
import gc
import platform
from datetime import datetime
from typing import Optional

Expand Down Expand Up @@ -322,13 +321,13 @@ class OMOPCDMDatabase:
_instance = None # indicating a singleton with only one instance of the class ever created
_database_type = None

def __new__(cls, *args, **kwargs):
def __new__(cls, db_url, read_only=True):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize(*args, **kwargs) # Initialize only once
cls._instance._initialize(db_url, read_only=read_only) # Initialize only once
return cls._instance

def _initialize(self, db_url):
def _initialize(self, db_url, read_only=True):
if db_url.endswith(".duckdb"):
# close any potential global connections if any
for obj in gc.get_objects(): # pragma: no cover
Expand All @@ -340,11 +339,8 @@ def _initialize(self, db_url):

# Handle DuckDB connection
try:
if platform.system().lower() == "windows": # pragma: no cover
# it is critical to set duckdb connection to be read-only on windows platform
self.engine = duckdb.connect(db_url, read_only=True)
else:
self.engine = duckdb.connect(db_url)
# it is critical to set duckdb connection to be read-only on windows and Mac platforms
self.engine = duckdb.connect(db_url, read_only=read_only)
notify_users(f"Connected to the DuckDB database: {db_url}.")
except duckdb.Error as e: # pragma: no cover
notify_users(f"Failed to connect to DuckDB: {e}", level="error")
Expand Down Expand Up @@ -573,4 +569,4 @@ def close(self):
else:
self.engine.dispose() # pragma: no cover
OMOPCDMDatabase._instance = None
notify_users("Connection to the OMOP CDM database closed.")
notify_users("Connection to the OMOP CDM database closed.")
152 changes: 91 additions & 61 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ include = [
{path = "biasanalyzer/sql_templates/*.sql", format=["sdist", "wheel"]}
]
[tool.poetry.dependencies]
python = ">=3.8.10,<3.13"
python = ">=3.9,<3.13"
duckdb = "^1.1.1"
pandas = "2.0.3"
pandas = "^2.1.4"

scipy = [
{version = ">=1.10.1,<1.11", markers = "python_version<'3.12'"},
Expand Down
78 changes: 34 additions & 44 deletions scripts/ingest_csvs_to_omop_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,30 @@
"""

import argparse
import sys
import csv
import sys
import time
from pathlib import Path

import duckdb


FILENAME_STEM_TO_TABLE_NAME_MAPPING = {
# 'demographics': 'person'
# 'conditions': 'condition_occurrence'
# 'drugs': 'drug_exposure'
# 'procedures': 'procedure_occurrence'
# 'visits': 'visit_occurrence'
'observations': 'observation'
"observations": "observation"
}

COLUMN_MAPPINGS = {
"person": {
"deid_pat_id": "person_id"
},
"condition_occurrence": {
"deid_pat_id": "person_id"
},
"drug_exposure": {
"deid_pat_id": "person_id"
},
"procedure_occurrence": {
"deid_pat_id": "person_id"
},
"visit_occurrence": {
"deid_pat_id": "person_id"
},
"observation": {
"deid_pat_id": "person_id"
},
"measurement": {
"deid_pat_id": "person_id"
},
"person": {"deid_pat_id": "person_id"},
"condition_occurrence": {"deid_pat_id": "person_id"},
"drug_exposure": {"deid_pat_id": "person_id"},
"procedure_occurrence": {"deid_pat_id": "person_id"},
"visit_occurrence": {"deid_pat_id": "person_id"},
"observation": {"deid_pat_id": "person_id"},
"measurement": {"deid_pat_id": "person_id"},
}

OMOP_TABLE_SCHEMAS = {
Expand All @@ -71,7 +56,7 @@
"race_source_value",
"race_source_concept_id",
"ethnicity_source_value",
"ethnicity_source_concept_id"
"ethnicity_source_concept_id",
],
"condition_occurrence": [
"condition_occurrence_id",
Expand All @@ -89,9 +74,9 @@
"visit_detail_id",
"condition_source_value",
"condition_source_concept_id",
"condition_status_source_value"
"condition_status_source_value",
],
'drug_exposure': [
"drug_exposure": [
"drug_exposure_id",
"person_id",
"drug_concept_id",
Expand All @@ -114,9 +99,9 @@
"drug_source_value",
"drug_source_concept_id",
"route_source_value",
"dose_unit_source_value"
"dose_unit_source_value",
],
'procedure_occurrence': [
"procedure_occurrence": [
"procedure_occurrence_id",
"person_id",
"procedure_concept_id",
Expand All @@ -132,9 +117,9 @@
"visit_detail_id",
"procedure_source_value",
"procedure_source_concept_id",
"modifier_source_value"
"modifier_source_value",
],
'visit_occurrence': [
"visit_occurrence": [
"visit_occurrence_id",
"person_id",
"visit_concept_id",
Expand All @@ -151,9 +136,9 @@
"admitted_from_source_value",
"discharged_to_concept_id",
"discharged_to_source_value",
"preceding_visit_occurrence_id"
"preceding_visit_occurrence_id",
],
'observation': [
"observation": [
"observation_id",
"person_id",
"observation_concept_id",
Expand All @@ -174,28 +159,29 @@
"qualifier_source_value",
"value_source_value",
"observation_event_id",
"obs_event_field_concept_id"
]
"obs_event_field_concept_id",
],
}


def load_csv_to_duckdb(con, csv_path: Path, table_name: str):
"""Load a single CSV file into DuckDB."""
t0 = time.time()
print(f"loading {table_name} from {csv_path}")

# read and normalize header
with open(csv_path, "r", newline="") as f:
with open(csv_path, newline="") as f:
reader = csv.reader(f)
raw_header = next(reader)

# normalize: lower case + strip quotes/spaces
raw_header = [h.strip().replace('"', '') for h in raw_header]
raw_header = [h.strip().replace('"', "") for h in raw_header]
header = [h.lower() for h in raw_header]
print(f'normalized header: {header}')
print(f"normalized header: {header}")

mapping = COLUMN_MAPPINGS.get(table_name, {})
final_cols = [mapping.get(col, col) for col in header]
print(f'mapped header: {final_cols}')
print(f"mapped header: {final_cols}")

expected = OMOP_TABLE_SCHEMAS.get(table_name, [])
final_set = set(final_cols)
Expand All @@ -209,15 +195,15 @@ def load_csv_to_duckdb(con, csv_path: Path, table_name: str):
extra = final_set - set(expected)
if extra:
print(f"WARNING: Extra columns in CSV for {table_name}: {sorted(extra)}")
print(f"Extra columns will NOT be ingested.")
print("Extra columns will NOT be ingested.")

select_clauses = []
for orig, new in zip(raw_header, final_cols):
if new not in expected:
# skip extra columns entirely
continue
if orig != new:
select_clauses.append(f'{orig} AS {new}')
select_clauses.append(f"{orig} AS {new}")
else:
select_clauses.append(orig)

Expand Down Expand Up @@ -268,9 +254,13 @@ def main():
required=False,
help="Directory containing OMOP vocabulary CSVs (concept, concept_relationship, etc.)",
)
parser.add_argument("--output", type=Path,
default=Path("Y:/OMOP_duckdb/omop.duckdb"),
required=False, help="Output DuckDB file path")
parser.add_argument(
"--output",
type=Path,
default=Path("Y:/OMOP_duckdb/omop.duckdb"),
required=False,
help="Output DuckDB file path",
)

args = parser.parse_args()

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def test_db():

# mock configuration file
bias = BIAS(config_file_path=config_file)
bias.set_root_omop()
bias.set_root_omop(read_only=False)

yield bias # Provide the connection to the test

Expand Down
28 changes: 14 additions & 14 deletions tests/query_based/test_cohort_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ def test_cohort_creation_baseline(caplog, test_db):

patient_ids = set([item["subject_id"] for item in cohort.data])
assert_equal(len(patient_ids), 5)
assert_equal(patient_ids, {'106', '108', '110', '111', '112'})
assert_equal(patient_ids, {"106", "108", "110", "111", "112"})
# select two patients to check for cohort_start_date and cohort_end_date automatically computed
patient_106 = next(item for item in cohort.data if item["subject_id"] == '106')
patient_108 = next(item for item in cohort.data if item["subject_id"] == '108')
patient_106 = next(item for item in cohort.data if item["subject_id"] == "106")
patient_108 = next(item for item in cohort.data if item["subject_id"] == "108")

# Replace dates with actual values from your test data
assert_equal(
Expand Down Expand Up @@ -127,7 +127,7 @@ def test_cohort_creation_study(test_db):
assert cohort.data is not None, "Cohort creation wrongly returned None data"
patient_ids = set([item["subject_id"] for item in cohort.data])
assert_equal(len(patient_ids), 4)
assert_equal(patient_ids, {'108', '110', '111', '112'})
assert_equal(patient_ids, {"108", "110", "111", "112"})


def test_cohort_creation_study2(caplog, test_db):
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_cohort_creation_study2(caplog, test_db):
assert cohort.data is not None, "Cohort creation wrongly returned None data"
patient_ids = set([item["subject_id"] for item in cohort.data])
assert_equal(len(patient_ids), 1)
assert_equal(patient_ids, {'106'})
assert_equal(patient_ids, {"106"})


def test_cohort_creation_all(caplog, test_db):
Expand Down Expand Up @@ -191,7 +191,7 @@ def test_cohort_creation_all(caplog, test_db):
patient_ids = set([item["subject_id"] for item in cohort.data])
print(f"patient_ids: {patient_ids}", flush=True)
assert_equal(len(patient_ids), 2)
assert_equal(patient_ids, {'108', '110'})
assert_equal(patient_ids, {"108", "110"})


def test_cohort_creation_multiple_temporary_groups_with_no_operator(test_db):
Expand All @@ -214,7 +214,7 @@ def test_cohort_creation_multiple_temporary_groups_with_no_operator(test_db):
patient_ids = set([item["subject_id"] for item in cohort.data])
print(f"patient_ids: {patient_ids}", flush=True)
assert_equal(len(patient_ids), 2)
assert_equal(patient_ids, {'108', '110'})
assert_equal(patient_ids, {"108", "110"})


def test_cohort_creation_mixed_domains(test_db):
Expand Down Expand Up @@ -242,7 +242,7 @@ def test_cohort_creation_mixed_domains(test_db):
patient_ids = set([item["subject_id"] for item in cohort.data])
print(f"patient_ids: {patient_ids}", flush=True)
assert_equal(len(patient_ids), 3)
assert_equal(patient_ids, {'1', '2', '6'})
assert_equal(patient_ids, {"1", "2", "6"})
start_dates = [item["cohort_start_date"] for item in cohort.data]
assert_equal(len(start_dates), 3)
assert_equal(start_dates, [datetime.date(2020, 6, 1), datetime.date(2020, 6, 1), datetime.date(2018, 1, 1)])
Expand Down Expand Up @@ -356,10 +356,10 @@ def test_cohort_creation_negative_instance(test_db):

patient_ids = set([item["subject_id"] for item in cohort.data])
assert_equal(len(patient_ids), 6) # Female patients 1, 2, 3, 5
assert_equal(patient_ids, {'1', '2', '3', '5', '6', '7'})
assert_equal(patient_ids, {"1", "2", "3", "5", "6", "7"})

# Verify dates for a specific patient (e.g., patient 1 with last diabetes diagnosis)
patient_1 = next(item for item in cohort.data if item["subject_id"] == '1')
patient_1 = next(item for item in cohort.data if item["subject_id"] == "1")
assert_equal(
patient_1["cohort_start_date"],
datetime.date(2020, 6, 1),
Expand Down Expand Up @@ -392,10 +392,10 @@ def test_cohort_creation_offset(test_db):

patient_ids = set([item["subject_id"] for item in cohort.data])
assert_equal(len(patient_ids), 6) # Female patients 1, 2, 3, 5
assert_equal(patient_ids, {'1', '2', '3', '5', '6', '7'})
assert_equal(patient_ids, {"1", "2", "3", "5", "6", "7"})

# Verify dates for a specific patient (e.g., patient 1 with offset)
patient_1 = next(item for item in cohort.data if item["subject_id"] == '1')
patient_1 = next(item for item in cohort.data if item["subject_id"] == "1")
# Diabetes on 2020-06-01: -730 days = 2018-06-02, +180 days = 2020-11-28
assert_equal(
patient_1["cohort_start_date"],
Expand Down Expand Up @@ -435,10 +435,10 @@ def test_cohort_creation_negative_instance_offset(test_db):

patient_ids = set([item["subject_id"] for item in cohort.data])
assert_equal(len(patient_ids), 6)
assert_equal(patient_ids, {'1', '2', '3', '5', '6', '7'})
assert_equal(patient_ids, {"1", "2", "3", "5", "6", "7"})

# Verify dates for a specific patient (e.g., patient 1 with last diabetes and offset)
patient_1 = next(item for item in cohort.data if item["subject_id"] == '1')
patient_1 = next(item for item in cohort.data if item["subject_id"] == "1")
# Last diabetes on 2020-06-01: +180 days = 2020-11-28
assert_equal(
patient_1["cohort_start_date"],
Expand Down
Loading