Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e94e6ec
Merge pull request #5 from datacontract/main
dmaresma Jun 11, 2025
657b68d
init. snowflake sql ddl import to datacontract
dmaresma Jun 11, 2025
a224aba
apply ruff check and format
dmaresma Jun 11, 2025
327c21a
align import
dmaresma Jun 11, 2025
234c2fb
add dialect
dmaresma Jun 11, 2025
5d412fd
sqlglot ${} token bypass and waiting for NOORDER ORDER AUTOINCREMENT …
dmaresma Jun 13, 2025
76d53b8
fix regression on sql server side (no formal or declarative comments)
dmaresma Jun 13, 2025
020d879
type variant not allow in lint DataContract(data_contract_str=expect…
dmaresma Jun 14, 2025
e2ee1e8
remove simple-ddl-parser dependency
dmaresma Jun 14, 2025
ab60f5c
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jun 29, 2025
dd2a399
fix error message
dmaresma Jul 10, 2025
6d2a8df
Merge branch 'feat/snowflake_ddl_sql_import' of https://github.com/dm…
dmaresma Jul 10, 2025
d3759c9
fix specification version in test
dmaresma Jul 10, 2025
d29d770
refactor get_model_form_parsed add table desc, table tag
dmaresma Jul 10, 2025
cff64f8
fix format issue
dmaresma Jul 10, 2025
c6bf517
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jul 10, 2025
f569c9f
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jul 11, 2025
1186bb3
add script token remover function
dmaresma Jul 27, 2025
eb718c5
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jul 28, 2025
593358c
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Aug 5, 2025
1b44135
add money datatype #751
dmaresma Aug 25, 2025
29f371e
ignoe jinja
dmaresma Aug 27, 2025
9669bc1
Merge branch 'current_main' into feat/snowflake_ddl_sql_import
dmaresma Jan 26, 2026
e55b4af
odcs 3.1 introduce timestamp as logicalType, and fix typos
dmaresma Jan 26, 2026
8202145
fix dbml logicalType with 3.1 timestamp allow instead of date
dmaresma Jan 26, 2026
14ac615
logicalType: date to timestamp when physicalType is timestamp
dmaresma Jan 26, 2026
ecb5caa
Merge remote-tracking branch 'origin/main' into fork/dmaresma/feat/sn…
jschoedl Mar 23, 2026
2ae8970
fix: clean up SQL importer — fix variable token substitution, remove …
jschoedl Mar 23, 2026
58a4eec
run ruff format
jschoedl Mar 23, 2026
3f9207f
fix: use tags.expressions directly instead of find(Property) for tag …
jschoedl Mar 23, 2026
fd300b3
fix: return (logicalType, format) from map_type_from_sql per ODCS v3.1.0
jschoedl Mar 24, 2026
18f1749
add changelog entries
jschoedl Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Escape single quotes in string values for SodaCL checks (#1090)
- Fixed catalog export SpecView not having a tags property for the index.html template (#1059)
- Fix SQL importer type mappings: binary types, datetime/time, uuid now map to correct ODCS logicalType and format (#790)

### Added
- Support additional PyArrow types in Parquet importer (#1091)
- Populate `logicalTypeOptions.format` for SQL import from binary and uuid types (#790)
- Snowflake DDL import with tags, descriptions, and template variable handling (#790)

## [0.11.6] - 2026-03-17

Expand Down
3 changes: 2 additions & 1 deletion datacontract/imports/dbml_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def import_table_fields(table, references) -> List[SchemaProperty]:
description = field.note.text if field.note else None
is_primary_key = field.pk
is_unique = field.unique
logical_type = map_type_from_sql(field.type)
logical_type, format = map_type_from_sql(field.type)

ref = get_reference(field, references)

Expand All @@ -109,6 +109,7 @@ def import_table_fields(table, references) -> List[SchemaProperty]:
name=field_name,
logical_type=logical_type if logical_type else "string",
physical_type=field.type,
format=format,
description=description,
required=required if required else None,
primary_key=is_primary_key if is_primary_key else None,
Expand Down
3 changes: 3 additions & 0 deletions datacontract/imports/odcs_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def create_schema_object(
description: str = None,
business_name: str = None,
properties: List[SchemaProperty] = None,
tags: List[str] = None,
) -> SchemaObject:
"""Create a SchemaObject (equivalent to DCS Model)."""
schema = SchemaObject(
Expand All @@ -48,6 +49,8 @@ def create_schema_object(
schema.businessName = business_name
if properties:
schema.properties = properties
if tags:
schema.tags = tags
return schema


Expand Down
170 changes: 101 additions & 69 deletions datacontract/imports/sql_importer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import re

import sqlglot
from open_data_contract_standard.model import OpenDataContractStandard
Expand All @@ -18,17 +19,17 @@

class SqlImporter(Importer):
def import_source(self, source: str, import_args: dict) -> OpenDataContractStandard:
return import_sql(self.import_format, source, import_args)
return import_sql(source, import_args)


def import_sql(format: str, source: str, import_args: dict = None) -> OpenDataContractStandard:
def import_sql(source: str, import_args: dict = None) -> OpenDataContractStandard:
sql = read_file(source)
dialect = to_dialect(import_args)

try:
parsed = sqlglot.parse_one(sql=sql, read=dialect)
except Exception as e:
logging.error(f"Error parsing SQL: {str(e)}")
logging.error(f"Error sqlglot SQL: {str(e)}")
raise DataContractException(
type="import",
name=f"Reading source from {source}",
Expand Down Expand Up @@ -57,12 +58,13 @@ def import_sql(format: str, source: str, import_args: dict = None) -> OpenDataCo

col_name = column.this.name
col_type = to_col_type(column, dialect)
logical_type = map_type_from_sql(col_type)
logical_type, format = map_type_from_sql(col_type)
col_description = get_description(column)
max_length = get_max_length(column)
precision, scale = get_precision_scale(column)
is_primary_key = get_primary_key(column)
is_required = column.find(sqlglot.exp.NotNullColumnConstraint) is not None or None
tags = get_tags(column)

prop = create_property(
name=col_name,
Expand All @@ -72,19 +74,36 @@ def import_sql(format: str, source: str, import_args: dict = None) -> OpenDataCo
max_length=max_length,
precision=precision,
scale=scale,
format=format,
primary_key=is_primary_key,
primary_key_position=primary_key_position if is_primary_key else None,
required=is_required if is_required else None,
tags=tags,
)

if is_primary_key:
primary_key_position += 1

properties.append(prop)

table_comment_property = parsed.find(sqlglot.expressions.SchemaCommentProperty)

table_description = None
if table_comment_property:
table_description = table_comment_property.this.this

table_tags = None
table_props = parsed.find(sqlglot.expressions.Properties)
if table_props:
tags = table_props.find(sqlglot.expressions.Tags)
if tags:
table_tags = [str(t) for t in tags.expressions]

schema_obj = create_schema_object(
name=table_name,
physical_type="table",
description=table_description,
tags=table_tags,
properties=properties,
)
odcs.schema_.append(schema_obj)
Expand Down Expand Up @@ -112,27 +131,9 @@ def to_dialect(import_args: dict) -> Dialects | None:
return Dialects.TSQL
if dialect.upper() in Dialects.__members__:
return Dialects[dialect.upper()]
if dialect == "sqlserver":
return Dialects.TSQL
return None


def to_physical_type_key(dialect: Dialects | str | None) -> str:
dialect_map = {
Dialects.TSQL: "sqlserverType",
Dialects.POSTGRES: "postgresType",
Dialects.BIGQUERY: "bigqueryType",
Dialects.SNOWFLAKE: "snowflakeType",
Dialects.REDSHIFT: "redshiftType",
Dialects.ORACLE: "oracleType",
Dialects.MYSQL: "mysqlType",
Dialects.DATABRICKS: "databricksType",
}
if isinstance(dialect, str):
dialect = Dialects[dialect.upper()] if dialect.upper() in Dialects.__members__ else None
return dialect_map.get(dialect, "physicalType")


def to_server_type(source, dialect: Dialects | None) -> str | None:
if dialect is None:
return None
Expand Down Expand Up @@ -166,10 +167,22 @@ def to_col_type_normalized(column):

def get_description(column: sqlglot.expressions.ColumnDef) -> str | None:
if column.comments is None:
return None
description = column.find(sqlglot.expressions.CommentColumnConstraint)
if description:
return description.this.this
else:
return None
return " ".join(comment.strip() for comment in column.comments)


def get_tags(column: sqlglot.expressions.ColumnDef) -> list[str] | None:
tags = column.find(sqlglot.expressions.Tags)
if tags:
return [str(t) for t in tags.expressions]
else:
return None


def get_max_length(column: sqlglot.expressions.ColumnDef) -> int | None:
col_type = to_col_type_normalized(column)
if col_type is None:
Expand Down Expand Up @@ -212,79 +225,97 @@ def get_precision_scale(column):
return None, None


def map_type_from_sql(sql_type: str) -> str | None:
"""Map SQL type to ODCS logical type."""
def map_type_from_sql(sql_type: str) -> tuple[str, str | None]:
"""Map SQL type to ODCS logical type and optional format.

Returns (logicalType, format).
The format corresponds to ODCS logicalTypeOptions.format (e.g. "binary", "uuid").
"""
if sql_type is None:
return None
return ("string", None)

sql_type_normed = sql_type.lower().strip()

if sql_type_normed.startswith("varchar"):
return "string"
return ("string", None)
elif sql_type_normed.startswith("char"):
return "string"
return ("string", None)
elif sql_type_normed.startswith("string"):
return "string"
return ("string", None)
elif sql_type_normed.startswith("nchar"):
return "string"
return ("string", None)
elif sql_type_normed.startswith("text"):
return "string"
return ("string", None)
elif sql_type_normed.startswith("nvarchar"):
return "string"
return ("string", None)
elif sql_type_normed.startswith("ntext"):
return "string"
elif sql_type_normed.startswith("int") and not sql_type_normed.startswith("interval"):
return "integer"
elif sql_type_normed.startswith("bigint"):
return "integer"
elif sql_type_normed.startswith("tinyint"):
return "integer"
elif sql_type_normed.startswith("smallint"):
return "integer"
return ("string", None)
elif sql_type_normed.endswith("int") and not sql_type_normed.endswith("point"):
return ("integer", None)
elif sql_type_normed.endswith("integer"):
return ("integer", None)
elif sql_type_normed.startswith("float"):
return "number"
return ("number", None)
elif sql_type_normed.startswith("double"):
return "number"
elif sql_type_normed.startswith("decimal"):
return "number"
return ("number", None)
elif sql_type_normed == "real":
return ("number", None)
elif sql_type_normed.startswith("number"):
return ("number", None)
elif sql_type_normed.startswith("numeric"):
return "number"
return ("number", None)
elif sql_type_normed.startswith("decimal"):
return ("number", None)
elif sql_type_normed.startswith("money"):
return ("number", None)
elif sql_type_normed.startswith("bool"):
return "boolean"
return ("boolean", None)
elif sql_type_normed.startswith("bit"):
return "boolean"
return ("boolean", None)
elif sql_type_normed.startswith("binary"):
return "array"
return ("string", "binary")
elif sql_type_normed.startswith("varbinary"):
return "array"
return ("string", "binary")
elif sql_type_normed.startswith("raw"):
return "array"
elif sql_type_normed == "blob" or sql_type_normed == "bfile":
return "array"
return ("string", "binary")
elif sql_type_normed == "blob":
return ("string", "binary")
elif sql_type_normed == "bfile":
return ("string", "binary")
elif sql_type_normed.startswith("bytea"):
return ("string", "binary")
elif sql_type_normed == "image":
return ("string", "binary")
elif sql_type_normed == "date":
return "date"
return ("date", None)
elif sql_type_normed == "time":
return "string"
return ("time", None)
elif sql_type_normed.startswith("timestamp"):
return "date"
elif sql_type_normed == "datetime" or sql_type_normed == "datetime2":
return "date"
return ("timestamp", None)
elif sql_type_normed == "smalldatetime":
return "date"
elif sql_type_normed == "datetimeoffset":
return "date"
return ("timestamp", None)
elif sql_type_normed.startswith("datetime"): # tsql datetime2, datetimeoffset
return ("timestamp", None)
elif sql_type_normed == "uniqueidentifier": # tsql
return "string"
return ("string", "uuid")
elif sql_type_normed == "json":
return "object"
return ("object", None)
elif sql_type_normed == "xml": # tsql
return "string"
elif sql_type_normed.startswith("number"):
return "number"
return ("string", None)
elif sql_type_normed == "clob" or sql_type_normed == "nclob":
return "string"
return ("string", None)
else:
return "object"
return ("object", None)


def remove_variable_tokens(sql_script: str) -> str:
"""Replace templating placeholders with bare variable names so sqlglot can parse the SQL."""
variable_pattern = re.compile(
r"\$\((\w+)\)" # $(var) — sqlcmd (T-SQL)
r"|\$\{(\w+)\}" # ${var} — Liquibase
r"|\{\{(\w+)\}\}" # {{var}} — Jinja / dbt
)
return variable_pattern.sub(lambda m: m.group(1) or m.group(2) or m.group(3), sql_script)


def read_file(path):
Expand All @@ -298,4 +329,5 @@ def read_file(path):
)
with open(path, "r") as file:
file_content = file.read()
return file_content

return remove_variable_tokens(file_content)
3 changes: 2 additions & 1 deletion datacontract/imports/unity_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,15 @@ def import_table_fields(columns: List[ColumnInfo]) -> List[SchemaProperty]:
def _to_property(column: ColumnInfo) -> SchemaProperty:
"""Convert a Unity ColumnInfo to an ODCS SchemaProperty."""
sql_type = str(column.type_text) if column.type_text else "string"
logical_type = map_type_from_sql(sql_type)
logical_type, format = map_type_from_sql(sql_type)
required = column.nullable is None or not column.nullable

return create_property(
name=column.name,
logical_type=logical_type if logical_type else "string",
physical_type=sql_type,
description=column.comment,
format=format,
required=required if required else None,
custom_properties={"databricksType": sql_type} if sql_type else None,
)
2 changes: 1 addition & 1 deletion tests/fixtures/databricks-unity/import/datacontract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ schema:
customProperties:
- property: databricksType
value: timestamp
logicalType: date
logicalType: timestamp
- name: is_active
physicalType: boolean
customProperties:
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/dbml/import/datacontract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ schema:
physicalType: timestamp
description: The business timestamp in UTC when the order was successfully registered
in the source system and the payment was successful.
logicalType: date
logicalType: timestamp
required: true
- name: order_total
physicalType: record
Expand All @@ -46,7 +46,7 @@ schema:
- name: processed_timestamp
physicalType: timestamp
description: The timestamp when the record was processed by the data platform.
logicalType: date
logicalType: timestamp
required: true
- name: line_items
physicalType: table
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/dbml/import/datacontract_table_filtered.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ schema:
physicalType: timestamp
description: The business timestamp in UTC when the order was successfully registered
in the source system and the payment was successful.
logicalType: date
logicalType: timestamp
required: true
- name: order_total
physicalType: record
Expand All @@ -46,5 +46,5 @@ schema:
- name: processed_timestamp
physicalType: timestamp
description: The timestamp when the record was processed by the data platform.
logicalType: date
logicalType: timestamp
required: true
Loading
Loading