Coverage for apis_core / utils / rdf.py: 83%
116 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 11:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 11:37 +0000
1# SPDX-FileCopyrightText: 2025 Birger Schacht
2# SPDX-License-Identifier: MIT
4import inspect
5import logging
6import tomllib
7from collections import defaultdict
8from copy import copy
9from dataclasses import dataclass
10from pathlib import Path
11from typing import Dict, List, Tuple, Union
12from warnings import deprecated
14from AcdhArcheAssets.uri_norm_rules import get_normalized_uri
15from django.template.utils import get_app_template_dirs
16from rdflib import RDF, BNode, Graph, URIRef
17from rdflib.exceptions import ParserError
19logger = logging.getLogger(__name__)
22@dataclass
23class Attribute:
24 value: Union[str | list[str]]
27@dataclass
28class Relation:
29 name: str
30 value: Dict[str, str]
33@dataclass
34class Filter:
35 value: List[Tuple[str:str]]
38def resolve(obj, graph):
39 """
40 Look at the value of object and return the parsed
41 value. If the value starts and ens with angle brackets,
42 we interpret it as and transform it to an URI.
43 If the value is simple text we interpret it as an curie
44 and we expand it using the graphs namespace manager.
45 Otherwise we simply return the value
46 """
47 if isinstance(obj, str):
48 if obj.startswith("<") and obj.endswith(">"):
49 return URIRef(obj[1:-1])
50 return graph.namespace_manager.expand_curie(obj)
51 if isinstance(obj, bool) and obj is True:
52 return None
53 return obj
56@deprecated("Please switch to using objects instead of toml files.")
57def load_path(path: str | Path) -> dict:
58 """
59 Load a tomlfile either from a path or from the directory
60 `triple_configs` in any of the app directories.
61 """
62 if isinstance(path, str):
63 files = [
64 directory / path for directory in get_app_template_dirs("triple_configs")
65 ]
66 files = list(filter(lambda file: file.exists(), files))
67 if files:
68 path = files[0]
69 else:
70 raise ValueError(f"Could not find {path}")
71 return tomllib.loads(Path(path).read_text())
74def graph_matches_config(graph: Graph, configfile: Path) -> dict:
75 """
76 Check if a file contains a config that matches this
77 graph and if so, return the config as dict. Otherwise
78 return False
79 """
80 if inspect.isclass(configfile):
81 config = {"relations": {}, "filters": [], "attributes": {}}
82 for key in [att for att in dir(configfile) if not att.startswith("__")]:
83 value = getattr(configfile, key)
84 match value:
85 case Filter():
86 config["filters"].append(copy(value.value))
87 case Attribute():
88 config["attributes"][key.lower()] = copy(value.value)
89 case Relation():
90 config["relations"][value.name] = copy(value.value)
91 else:
92 config = load_path(configfile)
93 for _filter in config.get("filters", [[(None, None)]]):
94 if isinstance(_filter, dict):
95 _filter = _filter.items()
96 try:
97 triples = []
98 for predicate, obj in _filter:
99 triples.append((None, resolve(predicate, graph), resolve(obj, graph)))
100 triples = [triple in graph for triple in triples]
101 if all(triples):
102 logger.debug("Using %s for parsing graph", configfile)
103 return config
104 except ValueError as e:
105 logger.debug("Filter %s does not match: %s", _filter, e)
106 return {}
109def build_sparql_query(curie: str) -> str:
110 """
111 Build a SPARQL query with language preferences.
113 Args:
114 curie: predicate to filter on as defined in the toml.
115 needs to include the predicate and optionally
116 a lang tag to filter for separated with a comma.
117 Eg "wdt:P122,en".
119 Returns:
120 A SPARQL query string
121 """
122 if curie.lower().strip().startswith(("select", "prefix")):
123 return curie
124 lang_tag = ""
125 if "," in curie:
126 curie, lang_tag = curie.split(",", 1)
127 lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")'
128 query = f"""
129 SELECT ?object
130 WHERE {{
131 ?subject {curie} ?object {lang_tag}
132 }}
133 """
135 logger.debug("Generated SPARQL query: %s", query)
136 return query
139def get_value_graph(graph: Graph, curies: str | list[str]) -> list:
140 values = []
141 if curies is None:
142 return []
143 if isinstance(curies, str):
144 curies = [curies]
145 for curie in curies:
146 try:
147 results = graph.query(build_sparql_query(curie))
148 except Exception as e:
149 logger.debug("Could not parse query: %s", e)
150 results = []
151 objects = [result[0] for result in results]
152 for obj in objects:
153 if isinstance(obj, BNode):
154 values.extend(
155 [
156 str(value)
157 for value in graph.objects(subject=obj)
158 if value != RDF.Seq
159 ]
160 )
161 else:
162 values.append(str(obj))
163 return list(dict.fromkeys(values))
166def load_uri_using_path(uri, configfile: Path | str) -> dict:
167 uri = get_normalized_uri(uri)
168 graph = Graph()
169 # workaround for a bug in d-nb: with the default list of accept
170 # headers of rdflib, d-nb sometimes returns json-ld and sometimes turtle
171 # with json-ld, rdflib has problems finding the namespaces
172 format = "turtle" if uri.startswith("https://d-nb.info/gnd/") else None
173 try:
174 graph.parse(uri, format=format)
175 except ParserError as e:
176 logger.info(e)
178 if config := graph_matches_config(graph, configfile):
179 result = defaultdict(list)
180 result["same_as"] = [uri]
181 result["relations"] = defaultdict(list)
182 for attribute, curies in config.get("attributes", {}).items():
183 values = get_value_graph(graph, curies)
184 result[attribute].extend(values)
185 for relation, details in config.get("relations", {}).items():
186 details["curies"] = get_value_graph(graph, details.get("curies", []))
187 result["relations"][relation] = details
188 return dict(result)
189 return None