Coverage for apis_core/utils/rdf.py: 86%
93 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-06-25 10:00 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-06-25 10:00 +0000
1# SPDX-FileCopyrightText: 2025 Birger Schacht
2# SPDX-License-Identifier: MIT
4import logging
5import re
6import tomllib
7from collections import defaultdict
8from pathlib import Path
10from AcdhArcheAssets.uri_norm_rules import get_normalized_uri
11from django.apps import apps
12from django.template.utils import get_app_template_dirs
13from rdflib import RDF, BNode, Graph, URIRef
15logger = logging.getLogger(__name__)
18def resolve(obj, graph):
19 """
20 Look at the value of object and return the parsed
21 value. If the value starts and ens with angle brackets,
22 we interpret it as and transform it to an URI.
23 If the value is simple text we interpret it as an curie
24 and we expand it using the graphs namespace manager.
25 Otherwise we simply return the value
26 """
27 if isinstance(obj, str):
28 if obj.startswith("<") and obj.endswith(">"):
29 return URIRef(obj[1:-1])
30 return graph.namespace_manager.expand_curie(obj)
31 if isinstance(obj, bool) and obj is True:
32 return None
33 return obj
36def load_path(path: str | Path) -> dict:
37 """
38 Load a tomlfile either from a path or from the directory
39 `triple_configs` in any of the app directories.
40 """
41 if isinstance(path, str):
42 files = [
43 directory / path for directory in get_app_template_dirs("triple_configs")
44 ]
45 files = list(filter(lambda file: file.exists(), files))
46 if files:
47 path = files[0]
48 else:
49 raise ValueError(f"Could not find {path}")
50 return tomllib.loads(Path(path).read_text())
53def find_regex_matching_configs(uri: str, models: list | None = None) -> dict | None:
54 """
55 Go through a list of models and return all the rdf configs
56 that are configured in those models that have a regex that
57 matches the given URI.
58 """
59 models = models or apps.get_models()
60 models_with_config = [model for model in models if hasattr(model, "rdf_configs")]
61 configs = []
62 for model in models_with_config:
63 for regex, path in model.rdf_configs().items():
64 if re.match(regex, uri):
65 logger.debug(f"{uri} matched {regex}")
66 config = load_path(path)
67 config["path"] = path
68 config["model"] = model
69 configs.append(config)
70 else:
71 logger.debug(f"{uri} did not match {regex}")
72 return configs
75def find_graph_matching_config(graph: Graph, configs: list[dict] = []) -> dict | None:
76 """
77 Go through al list of RDF import configs and return the
78 ones that have filters defined that match the given graph.
79 """
80 for config in configs:
81 for _filter in config.get("filters", [{None: None}]):
82 try:
83 triples = []
84 for predicate, obj in _filter.items():
85 triples.append(
86 (None, resolve(predicate, graph), resolve(obj, graph))
87 )
88 triples = [triple in graph for triple in triples]
89 if all(triples):
90 logger.debug("Using %s for parsing graph", config["path"])
91 return config
92 except ValueError as e:
93 logger.debug("Filter %s does not match: %s", _filter, e)
94 return None
97def build_sparql_query(curie: str) -> str:
98 """
99 Build a SPARQL query with language preferences.
101 Args:
102 curie: predicate to filter on as defined in the toml.
103 needs to include the predicate and optionally
104 a lang tag to filter for separated with a comma.
105 Eg "wdt:P122,en".
107 Returns:
108 A SPARQL query string
109 """
110 if curie.lower().strip().startswith(("select", "prefix")):
111 return curie
112 lang_tag = ""
113 if "," in curie:
114 curie, lang_tag = curie.split(",", 1)
115 lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")'
116 query = f"""
117 SELECT ?object
118 WHERE {{
119 ?subject {curie} ?object {lang_tag}
120 }}
121 """
123 logger.debug("Generated SPARQL query: %s", query)
124 return query
127def get_value_graph(graph: Graph, curies: str | list[str]) -> list:
128 values = []
129 if isinstance(curies, str):
130 curies = [curies]
131 for curie in curies:
132 results = graph.query(build_sparql_query(curie))
133 objects = [result[0] for result in results]
134 for obj in objects:
135 if isinstance(obj, BNode):
136 values.extend(
137 [
138 str(value)
139 for value in graph.objects(subject=obj)
140 if value != RDF.Seq
141 ]
142 )
143 else:
144 values.append(str(obj))
145 return list(dict.fromkeys(values))
148def get_something_from_uri(uri: str, models: list | None = None) -> dict | None:
149 uri = get_normalized_uri(uri)
150 graph = Graph()
151 graph.parse(uri)
153 configs = find_regex_matching_configs(uri, models)
155 if config := find_graph_matching_config(graph, configs):
156 result = defaultdict(list)
157 result["model"] = config["model"]
158 result["relations"] = defaultdict(list)
160 for attribute, curies in config.get("attributes", {}).items():
161 values = get_value_graph(graph, curies)
162 result[attribute].extend(values)
163 for relation, details in config.get("relations", {}).items():
164 details["curies"] = get_value_graph(graph, details.get("curies", []))
165 result["relations"][relation] = details
166 return dict(result)
167 return None