Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions src/qlever/commands/update_wikidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from typing import Tuple

import rdflib.term
import requests_sse
from rdflib import Graph
from rdflib import Graph, URIRef
from termcolor import colored
from tqdm.contrib.logging import tqdm_logging_redirect

Expand Down Expand Up @@ -266,6 +267,40 @@ def handle_ctrl_c(self, signal_received, frame):
else:
self.ctrl_c_pressed = True

"""
The wikidata data model describes entities using data nodes and entity nodes where the data nodes contain metadata.
For the WDQS the data nodes are integrated into the entity nodes. The updates also emit these combined nodes.
Try to undo this combination to get the correct data.
"""

def unmunge_wdqs(self, triple):
from rdflib import Namespace
wikibase = Namespace("http://wikiba.se/ontology#")
wd = Namespace("http://www.wikidata.org/entity/")
data = Namespace("https://www.wikidata.org/wiki/Special:EntityData/")
schema = Namespace("http://schema.org/")
from rdflib.namespace import RDF as rdf

def should_rewrite(s: rdflib.Node, p: rdflib.Node, o: rdflib.Node) -> bool:
if not isinstance(s, rdflib.URIRef):
return False
return p in [wikibase.identifiers, wikibase.sitelinks, wikibase.statements, schema.version] or (
p == schema.dateModified and s != wikibase.Dump)

s, p, o = triple
triples = []
data_node = URIRef(s.replace(str(wd), str(data)))
if should_rewrite(s, p, o):
triples += [(data_node, p, o)]
else:
triples += [(s, p, o)]
# Data nodes have additional triples `{data node} rdf:type schema:Dataset` and `{data node} schema:about {entity node}`.
# Due to the merging these are missing from the update stream. `schema:version` is only for all data nodes and only for data nodes.
# Delete/insert these two triples depending on what happens to `schema:version`.
if p == schema.version:
triples += [(data_node, rdf.type, schema.Dataset), (data_node, schema.about, s)]
return triples

def execute(self, args) -> bool:
# cURL command to get the date until which the updates of the
# SPARQL endpoint are complete.
Expand Down Expand Up @@ -776,15 +811,16 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
data=rdf_to_be_deleted_data,
format="turtle",
)
for s, p, o in graph:
triple = f"{s.n3()} {p.n3()} {node_to_sparql(o)}"
# NOTE: In case there was a previous `insert` of that
# triple, it is safe to remove that `insert`, but not
# the `delete` (in case the triple is contained in the
# original data).
if triple in insert_triples:
insert_triples.remove(triple)
delete_triples.add(triple)
for triple in graph:
for s, p, o in self.unmunge_wdqs(triple):
triple = f"{s.n3()} {p.n3()} {node_to_sparql(o)}"
# NOTE: In case there was a previous `insert` of that
# triple, it is safe to remove that `insert`, but not
# the `delete` (in case the triple is contained in the
# original data).
if triple in insert_triples:
insert_triples.remove(triple)
delete_triples.add(triple)
except Exception as e:
log.error(
f"Error reading `rdf_to_be_deleted_data`: {e}"
Expand All @@ -803,21 +839,22 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
)
graph = Graph()
log.debug(
"RDF to be added data: {rdf_to_be_added_data}"
f"RDF to be added data: {rdf_to_be_added_data}"
)
graph.parse(
data=rdf_to_be_added_data,
format="turtle",
)
for s, p, o in graph:
triple = f"{s.n3()} {p.n3()} {node_to_sparql(o)}"
# NOTE: In case there was a previous `delete` of that
# triple, it is safe to remove that `delete`, but not
# the `insert` (in case the triple is not contained in
# the original data).
if triple in delete_triples:
delete_triples.remove(triple)
insert_triples.add(triple)
for triple in graph:
for s, p, o in self.unmunge_wdqs(triple):
triple = f"{s.n3()} {p.n3()} {node_to_sparql(o)}"
# NOTE: In case there was a previous `delete` of that
# triple, it is safe to remove that `delete`, but not
# the `insert` (in case the triple is not contained in
# the original data).
if triple in delete_triples:
delete_triples.remove(triple)
insert_triples.add(triple)
except Exception as e:
log.error(
f"Error reading `rdf_to_be_added_data`: {e}"
Expand Down