Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions import/migrate/neo4j/complete_migration/complete_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@ def ensure_neo4j_has_data():
session.run("MATCH (n) DETACH DELETE n")

print("Creating complete dataset in Neo4j...")

with driver.session() as session:
try:
session.run("CREATE CONSTRAINT my_constraint FOR (n:Label1) REQUIRE n.id IS UNIQUE")
except Exception as e:
print(f"Constraint creation failed: {e}")
try:
session.run("CREATE CONSTRAINT my_constraint FOR (n:Label1) REQUIRE n.name IS NOT NULL")
except Exception as e:
print(f"Constraint creation failed: {e}")
try:
session.run("CREATE INDEX node_range_index_name FOR (n:Label1) ON (n.created_at)")
except Exception as e:
print(f"Index creation failed: {e}")

# Insert all nodes for all labels
with driver.session() as session:
for label in LABELS:
Expand Down Expand Up @@ -132,7 +145,7 @@ def execute_query(query):
# Get comprehensive schema information using apoc.meta.schema()
schema_result = list(memgraph.execute_and_fetch(
"""
call migrate_neo4j_driver2.neo4j("CALL apoc.meta.schema() YIELD value RETURN value", {host: "neo4j", port: 7687}) YIELD row RETURN row.value as schema
CALL migrate_neo4j_driver2.neo4j("CALL apoc.meta.schema() YIELD value RETURN value", {host: "neo4j", port: 7687}) YIELD row RETURN row.value AS schema
"""
))

Expand All @@ -152,8 +165,8 @@ def execute_query(query):

# Create indexes for all discovered labels
for label in discovered_labels:
# Later we will inspect the schema for constraints and indices
memgraph.execute(f"CREATE INDEX ON :{label}")
memgraph.execute(f"CREATE INDEX ON :{label}(id)")

print("[Worker 1] Starting migration of nodes...")

Expand All @@ -163,7 +176,7 @@ def execute_query(query):
execute_query(
f"""
call migrate_neo4j_driver2.neo4j(
"MATCH (n:{label}) RETURN elementId(n) AS elementId, labels(n) as labels, properties(n) AS props",
"MATCH (n:{label}) RETURN elementId(n) AS elementId, labels(n) AS labels, properties(n) AS props",
{{host: "neo4j", port: 7687}}
) YIELD row
MERGE (n:{label}:__MigrationNode__ {{__elementId__: row.elementId}})
Expand All @@ -180,7 +193,7 @@ def execute_query(query):
print(f"[Worker 1] Migrating {rel_type} relationships...")
execute_query(
f"""
call migrate_neo4j_driver2.neo4j(
CALL migrate_neo4j_driver2.neo4j(
"MATCH (a)-[r:{rel_type}]->(b) RETURN elementId(a) AS from_elementId, elementId(b) AS to_elementId, properties(r) AS rel_props",
{{host: "neo4j", port: 7687}}
) YIELD row
Expand All @@ -192,7 +205,51 @@ def execute_query(query):
)
print(f"[Worker 1] Completed migration of {rel_type} relationships")

print("[Worker 1] Migration complete.")
print("[Worker 1] Completed migration of relationships.")

print("[Worker 1] Starting migration of indices...")
indexes_result = list(memgraph.execute_and_fetch(
"""
CALL migrate_neo4j_driver2.neo4j("SHOW INDEXES", {host: "neo4j", port: 7687}) YIELD row RETURN row
"""
))
range_indexes_result = [x["row"] for x in indexes_result if x["row"]["type"] in ["RANGE", "TEXT"] and len(x["row"]["labelsOrTypes"]) == 1]
for index in range_indexes_result:
props = ",".join(index["properties"])
label_or_type = index["labelsOrTypes"][0]
if index["entityType"] == "NODE":
memgraph.execute(f"CREATE INDEX ON :{label_or_type}({props})")
elif index["entityType"] == "RELATIONSHIP" and len(index["properties"]) == 1:
memgraph.execute(f"CREATE EDGE INDEX ON :{label_or_type}({props})")

fulltext_indexes_result = [x["row"] for x in indexes_result if x["row"]["type"] == "FULLTEXT" and len(x["row"]["labelsOrTypes"]) == 1]
for index in fulltext_indexes_result:
text_index_name = index["name"]
props = ",".join(index["properties"])
label_or_type = index["labelsOrTypes"][0]
if index["entityType"] == "NODE":
memgraph.execute(f"CREATE TEXT INDEX {text_index_name} ON :{label_or_type}({props})")

print("[Worker 1] Completed migration of indices.")

print("[Worker 1] Starting migration of constraints...")
constraints_result = list(memgraph.execute_and_fetch(
"""
CALL migrate_neo4j_driver2.neo4j("SHOW CONSTRAINTS", {host: "neo4j", port: 7687}) YIELD row RETURN row
"""
))
constraints_result = [x["row"] for x in constraints_result if x["row"]["entityType"] == "NODE" and x["row"]["type"] in ["UNIQUENESS", "NODE_PROPERTY_EXISTENCE", "NODE_PROPERTY_TYPE", "NODE_KEY"] and len(x["row"]["labelsOrTypes"]) == 1]
for constraint in constraints_result:
label = constraint["labelsOrTypes"][0]
props = [f"n.{x}" for x in constraint["properties"]]
joined_props = ", ".join(props)
if constraint["type"] == "UNIQUENESS":
memgraph.execute(f"CREATE CONSTRAINT ON (n:{label}) ASSERT {joined_props} IS UNIQUE")
elif constraint["type"] == "NODE_PROPERTY_EXISTENCE":
memgraph.execute(f"CREATE CONSTRAINT ON (n:{label}) ASSERT EXISTS ({joined_props})")


print("[Worker 1] Completed migration of constraints.")

# Verify migration results
print("[Worker 1] Verifying migration results...")
Expand All @@ -209,6 +266,9 @@ def execute_query(query):
print("[Worker 1] Creating snapshot...")
memgraph.execute("CREATE SNAPSHOT")

print("[Worker 1] Created snapshot.")
print("[Worker 1] Migration complete.")

except Exception as e:
print(f"[Worker 1] Error during migration: {e}")

Expand Down