diff --git a/build.py b/build.py index eb9f9d5..d8e0af2 100644 --- a/build.py +++ b/build.py @@ -22,7 +22,3 @@ for schema_file_path in schemas_file_paths: # Step 3 - translate and build each openMINDS schema as JSON-Schema JSONSchemaBuilder(schema_file_path, schema_loader.schemas_sources).build() - - - - diff --git a/pipeline/translator.py b/pipeline/translator.py index 2ba5baa..758ebcf 100644 --- a/pipeline/translator.py +++ b/pipeline/translator.py @@ -1,8 +1,14 @@ import json import os.path + +from pathlib import Path from typing import List, Optional, Dict +from pipeline.utils import find_embedded_schema, find_source_schema_path + class JSONSchemaBuilder(object): + property_name = None + required_properties = [] def __init__(self, schema_file_path:str, root_path:str): _relative_path_without_extension = schema_file_path[len(root_path)+1:].replace(".schema.omi.json", "").split("/") @@ -14,12 +20,14 @@ def __init__(self, schema_file_path:str, root_path:str): def _target_file_without_extension(self) -> str: return os.path.join(self.version, "/".join(self.relative_path_without_extension)) - def _resolve_string_property(self, property) -> Dict: + def _resolve_string_property(self, property: dict) -> Dict: string_property_spec = {} if "_formats" in property and property["_formats"]: if len(property["_formats"]) == 1: string_property_spec["format"] = property["_formats"][0] string_property_spec["type"] = property["type"] + if self.property_name not in self.required_properties: + string_property_spec["type"] = [string_property_spec["type"], "null"] elif len(property["_formats"]) > 1: string_property_spec["anyOf"] = [] for format in property["_formats"]: @@ -27,8 +35,14 @@ def _resolve_string_property(self, property) -> Dict: "type": property["type"], "format": format }) + if self.property_name not in self.required_properties: + string_property_spec["anyOf"].append({ + "type": "null" + }) else: string_property_spec["type"] = property["type"] + if self.property_name not in self.required_properties: + string_property_spec["type"] = [string_property_spec["type"], "null"] if "pattern" in property and property["pattern"]: string_property_spec["pattern"] = property["pattern"] if "minLength" in property and property["minLength"]: @@ -37,8 +51,11 @@ def _resolve_string_property(self, property) -> Dict: string_property_spec["maxLength"] = property["maxLength"] return string_property_spec - def _resolve_number_property(self, property) -> Dict: - number_property_spec = {"type": property["type"]} + def _resolve_number_property(self, property: dict) -> Dict: + if self.property_name not in self.required_properties: + number_property_spec = {"type": [property["type"], "null"]} + else: + number_property_spec = {"type": property["type"]} if "multipleOf" in property and property["multipleOf"]: number_property_spec["multipleOf"] = property["multipleOf"] if "minimum" in property and property["minimum"]: @@ -47,37 +64,73 @@ def _resolve_number_property(self, property) -> Dict: number_property_spec["maximum"] = property["maximum"] return number_property_spec - def _resolve_object_property(self, property): + def _resolve_embedded_schema(self, embedded_type:str): + """ + Retrieves the embedded JSON Schema for the given type. + + If a pre-generated embedded schema is found, it is returned directly. + Otherwise, the function attempts to locate the source schema and generates the embedded schema file. + """ + embedded = find_embedded_schema(self.version, embedded_type.split('/')[-1]) + if embedded: + return embedded + else: + embedded_path = find_source_schema_path(self.version, embedded_type.split('/')[-1]) + if embedded_path: + return JSONSchemaBuilder(embedded_path, os.path.join(os.path.realpath("."), "sources", "schemas")).build() + return None + + def _resolve_object_property(self, property: dict): object_property_spec = {} if "_embeddedTypes" in property and property["_embeddedTypes"]: if len(property["_embeddedTypes"]) == 1: - object_property_spec["type"] = "object" - object_property_spec["$ref"] = f"{property['_embeddedTypes'][0]}?format=json-schema" + embedded_schema = self._resolve_embedded_schema(property['_embeddedTypes'][0]) + del embedded_schema['$schema'] + object_property_spec = embedded_schema elif len(property["_embeddedTypes"]) > 1: object_property_spec["anyOf"] = [] for embedded_type in property["_embeddedTypes"]: + embedded_schema = self._resolve_embedded_schema(embedded_type) + del embedded_schema['$schema'] + object_property_spec["anyOf"].append(embedded_schema) + + if self.property_name not in self.required_properties: object_property_spec["anyOf"].append({ - "type": "object", - "$ref": f"{embedded_type}?format=json-schema" + "type": "null" }) elif "_linkedTypes" in property and property["_linkedTypes"]: object_property_spec["type"] = "object" object_property_spec["if"] = { "required": ["@type"] } - object_property_spec["then"] = { - "properties": { - "@id": { - "type": "string", - "format": "iri" - }, - "@type": { - "type": "string", - "format": "iri", - "enum": property["_linkedTypes"] - }}, - "required": ["@id"] - } + if self.property_name not in self.required_properties: + object_property_spec["then"] = { + "properties": { + "@id": { + "type": "string", + "format": "iri" + }, + "@type": { + "type": ["string", "null"], + "format": "iri", + "enum": property["_linkedTypes"] + [ "null" ] + }}, + "required": ["@id"] + } + else: + object_property_spec["then"] = { + "properties": { + "@id": { + "type": "string", + "format": "iri" + }, + "@type": { + "type": "string", + "format": "iri", + "enum": property["_linkedTypes"] + }}, + "required": ["@id"] + } object_property_spec["else"] = { "properties": { "@id": { @@ -92,6 +145,8 @@ def _resolve_object_property(self, property): def _resolve_array_property(self, property): array_property_spec = {"type": property["type"]} + if self.property_name not in self.required_properties: + array_property_spec["type"] = [property["type"], "null"] if "items" in property and property["items"]: if "type" in property["items"] and property["items"]["type"]: if property["items"]["type"] == "string": @@ -114,7 +169,7 @@ def _resolve_array_property(self, property): def _translate_property_specifications(self, property) -> Dict: self._translated_prop_spec = {} - prop_description = property["description"] if "description" in property else None + prop_description = property["description"] if "description" in property else "null" self._translated_prop_spec["description"] = prop_description self._translated_prop_spec["name"] = property["name"] @@ -137,9 +192,10 @@ def _translate_property_specifications(self, property) -> Dict: def translate(self): # set required properties required_prop = self._schema_payload["required"] if "required" in self._schema_payload else [] + self.required_properties = required_prop required_prop.extend(["@id", "@type"]) # set description - description = self._schema_payload['description'] if "description" in self._schema_payload else None + description = self._schema_payload['description'] if "description" in self._schema_payload else "null" self._translated_schema = { "$id": f"{self._schema_payload['_type']}?format=json-schema", @@ -163,13 +219,20 @@ def translate(self): if "properties" in self._schema_payload and self._schema_payload["properties"]: for prop_name, prop_spec in self._schema_payload["properties"].items(): + self.property_name = prop_name self._translated_schema["properties"][prop_name] = self._translate_property_specifications(prop_spec) def build(self): target_file = os.path.join("target", "schemas", f"{self._target_file_without_extension()}.schema.json") + path_target_file = Path(target_file) + if path_target_file.exists(): + return json.loads(path_target_file.read_text()) + os.makedirs(os.path.dirname(target_file), exist_ok=True) self.translate() with open(target_file, "w") as target_file: - target_file.write(json.dumps(self._translated_schema, indent=2, sort_keys=True)) \ No newline at end of file + target_file.write(json.dumps(self._translated_schema, indent=2, sort_keys=True)) + + return self._translated_schema diff --git a/pipeline/utils.py b/pipeline/utils.py index 2c58591..dbb290e 100644 --- a/pipeline/utils.py +++ b/pipeline/utils.py @@ -1,6 +1,8 @@ import glob +import json import os import shutil +from pathlib import Path from typing import List from git import Repo, GitCommandError @@ -10,6 +12,32 @@ def clone_sources(): shutil.rmtree("sources") Repo.clone_from("https://github.com/openMetadataInitiative/openMINDS.git", to_path="sources", depth=1) +def find_embedded_schema(version, class_name): + """ + Searches a returns a pre-generated (embedded) JSON Schema file for a given class name and version. + """ + directory = Path(f"./target/schemas/{version}/") + camel_case = class_name[:1].lower() + class_name[1:] + + # Search for exact filename matches + for case in [camel_case, class_name]: + for file_path in directory.glob(f"**/{case}.schema.json"): + return json.loads(file_path.read_text()) + return None + +def find_source_schema_path(version, class_name): + """ + Locates the source JSON Schema file path for a given class name and version. + """ + directory = Path(f"./sources/schemas/{version}/") + camel_case = class_name[:1].lower() + class_name[1:] + + # Search for exact filename matches + for case in [camel_case, class_name]: + for file_path in directory.glob(f"**/{case}.schema.omi.json"): + return os.path.abspath(file_path) + return None + class SchemaLoader(object): def __init__(self):