Coverage for apis_core/utils/rdf.py: 76%
103 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-10 13:36 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-10 13:36 +0000
1# SPDX-FileCopyrightText: 2025 Birger Schacht
2# SPDX-License-Identifier: MIT
4import logging
5import re
6import tomllib
7from collections import defaultdict
8from pathlib import Path
10from AcdhArcheAssets.uri_norm_rules import get_normalized_uri
11from django.apps import apps
12from django.template.utils import get_app_template_dirs
13from rdflib import RDF, BNode, Graph, URIRef
14from rdflib.exceptions import ParserError
16logger = logging.getLogger(__name__)
19def resolve(obj, graph):
20 """
21 Look at the value of object and return the parsed
22 value. If the value starts and ens with angle brackets,
23 we interpret it as and transform it to an URI.
24 If the value is simple text we interpret it as an curie
25 and we expand it using the graphs namespace manager.
26 Otherwise we simply return the value
27 """
28 if isinstance(obj, str):
29 if obj.startswith("<") and obj.endswith(">"):
30 return URIRef(obj[1:-1])
31 return graph.namespace_manager.expand_curie(obj)
32 if isinstance(obj, bool) and obj is True:
33 return None
34 return obj
37def load_path(path: str | Path) -> dict:
38 """
39 Load a tomlfile either from a path or from the directory
40 `triple_configs` in any of the app directories.
41 """
42 if isinstance(path, str):
43 files = [
44 directory / path for directory in get_app_template_dirs("triple_configs")
45 ]
46 files = list(filter(lambda file: file.exists(), files))
47 if files:
48 path = files[0]
49 else:
50 raise ValueError(f"Could not find {path}")
51 return tomllib.loads(Path(path).read_text())
54def find_regex_matching_configs(uri: str, models: list | None = None) -> dict | None:
55 """
56 Go through a list of models and return all the rdf configs
57 that are configured in those models that have a regex that
58 matches the given URI.
59 """
60 models = models or apps.get_models()
61 models_with_config = [model for model in models if hasattr(model, "rdf_configs")]
62 configs = []
63 for model in models_with_config:
64 for regex, path in model.rdf_configs().items():
65 if re.match(regex, uri):
66 logger.debug(f"{uri} matched {regex}")
67 config = load_path(path)
68 config["path"] = path
69 config["model"] = model
70 configs.append(config)
71 else:
72 logger.debug(f"{uri} did not match {regex}")
73 return configs
76def find_graph_matching_config(graph: Graph, configs: list[dict] = []) -> dict | None:
77 """
78 Go through al list of RDF import configs and return the
79 ones that have filters defined that match the given graph.
80 """
81 for config in configs:
82 for _filter in config.get("filters", [{None: None}]):
83 try:
84 triples = []
85 for predicate, obj in _filter.items():
86 triples.append(
87 (None, resolve(predicate, graph), resolve(obj, graph))
88 )
89 triples = [triple in graph for triple in triples]
90 if all(triples):
91 logger.debug("Using %s for parsing graph", config["path"])
92 return config
93 except ValueError as e:
94 logger.debug("Filter %s does not match: %s", _filter, e)
95 return None
98def build_sparql_query(curie: str) -> str:
99 """
100 Build a SPARQL query with language preferences.
102 Args:
103 curie: predicate to filter on as defined in the toml.
104 needs to include the predicate and optionally
105 a lang tag to filter for separated with a comma.
106 Eg "wdt:P122,en".
108 Returns:
109 A SPARQL query string
110 """
111 if curie.lower().strip().startswith(("select", "prefix")):
112 return curie
113 lang_tag = ""
114 if "," in curie:
115 curie, lang_tag = curie.split(",", 1)
116 lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")'
117 query = f"""
118 SELECT ?object
119 WHERE {
120 ?subject {curie} ?object {lang_tag}
121 }
122 """
124 logger.debug("Generated SPARQL query: %s", query)
125 return query
128def get_value_graph(graph: Graph, curies: str | list[str]) -> list:
129 values = []
130 if isinstance(curies, str):
131 curies = [curies]
132 for curie in curies:
133 results = graph.query(build_sparql_query(curie))
134 objects = [result[0] for result in results]
135 for obj in objects:
136 if isinstance(obj, BNode):
137 values.extend(
138 [
139 str(value)
140 for value in graph.objects(subject=obj)
141 if value != RDF.Seq
142 ]
143 )
144 else:
145 values.append(str(obj))
146 return list(dict.fromkeys(values))
149def get_something_from_uri(
150 uri: str, models: list | None = None, configs=[]
151) -> dict | None:
152 uri = get_normalized_uri(uri)
153 graph = Graph()
154 try:
155 graph.parse(uri)
156 except ParserError as e:
157 logger.info(e)
159 if not configs:
160 configs = find_regex_matching_configs(uri, models)
162 if config := find_graph_matching_config(graph, configs):
163 result = defaultdict(list)
164 if model := config.get("model", False):
165 result["model"] = model
166 result["relations"] = defaultdict(list)
168 for attribute, curies in config.get("attributes", {}).items():
169 values = get_value_graph(graph, curies)
170 result[attribute].extend(values)
171 for relation, details in config.get("relations", {}).items():
172 details["curies"] = get_value_graph(graph, details.get("curies", []))
173 result["relations"][relation] = details
174 return dict(result)
175 return None
178def load_uri_using_path(uri, configfile: Path) -> dict:
179 config = load_path(configfile)
180 config["path"] = configfile
181 return get_something_from_uri(uri=uri, configs=[config])