Coverage for apis_core/utils/rdf.py: 86%

93 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-06-25 10:00 +0000

1# SPDX-FileCopyrightText: 2025 Birger Schacht 

2# SPDX-License-Identifier: MIT 

3 

4import logging 

5import re 

6import tomllib 

7from collections import defaultdict 

8from pathlib import Path 

9 

10from AcdhArcheAssets.uri_norm_rules import get_normalized_uri 

11from django.apps import apps 

12from django.template.utils import get_app_template_dirs 

13from rdflib import RDF, BNode, Graph, URIRef 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18def resolve(obj, graph): 

19 """ 

20 Look at the value of object and return the parsed 

21 value. If the value starts and ens with angle brackets, 

22 we interpret it as and transform it to an URI. 

23 If the value is simple text we interpret it as an curie 

24 and we expand it using the graphs namespace manager. 

25 Otherwise we simply return the value 

26 """ 

27 if isinstance(obj, str): 

28 if obj.startswith("<") and obj.endswith(">"): 

29 return URIRef(obj[1:-1]) 

30 return graph.namespace_manager.expand_curie(obj) 

31 if isinstance(obj, bool) and obj is True: 

32 return None 

33 return obj 

34 

35 

36def load_path(path: str | Path) -> dict: 

37 """ 

38 Load a tomlfile either from a path or from the directory 

39 `triple_configs` in any of the app directories. 

40 """ 

41 if isinstance(path, str): 

42 files = [ 

43 directory / path for directory in get_app_template_dirs("triple_configs") 

44 ] 

45 files = list(filter(lambda file: file.exists(), files)) 

46 if files: 

47 path = files[0] 

48 else: 

49 raise ValueError(f"Could not find {path}") 

50 return tomllib.loads(Path(path).read_text()) 

51 

52 

53def find_regex_matching_configs(uri: str, models: list | None = None) -> dict | None: 

54 """ 

55 Go through a list of models and return all the rdf configs 

56 that are configured in those models that have a regex that 

57 matches the given URI. 

58 """ 

59 models = models or apps.get_models() 

60 models_with_config = [model for model in models if hasattr(model, "rdf_configs")] 

61 configs = [] 

62 for model in models_with_config: 

63 for regex, path in model.rdf_configs().items(): 

64 if re.match(regex, uri): 

65 logger.debug(f"{uri} matched {regex}") 

66 config = load_path(path) 

67 config["path"] = path 

68 config["model"] = model 

69 configs.append(config) 

70 else: 

71 logger.debug(f"{uri} did not match {regex}") 

72 return configs 

73 

74 

75def find_graph_matching_config(graph: Graph, configs: list[dict] = []) -> dict | None: 

76 """ 

77 Go through al list of RDF import configs and return the 

78 ones that have filters defined that match the given graph. 

79 """ 

80 for config in configs: 

81 for _filter in config.get("filters", [{None: None}]): 

82 try: 

83 triples = [] 

84 for predicate, obj in _filter.items(): 

85 triples.append( 

86 (None, resolve(predicate, graph), resolve(obj, graph)) 

87 ) 

88 triples = [triple in graph for triple in triples] 

89 if all(triples): 

90 logger.debug("Using %s for parsing graph", config["path"]) 

91 return config 

92 except ValueError as e: 

93 logger.debug("Filter %s does not match: %s", _filter, e) 

94 return None 

95 

96 

97def build_sparql_query(curie: str) -> str: 

98 """ 

99 Build a SPARQL query with language preferences. 

100 

101 Args: 

102 curie: predicate to filter on as defined in the toml. 

103 needs to include the predicate and optionally 

104 a lang tag to filter for separated with a comma. 

105 Eg "wdt:P122,en". 

106 

107 Returns: 

108 A SPARQL query string 

109 """ 

110 if curie.lower().strip().startswith(("select", "prefix")): 

111 return curie 

112 lang_tag = "" 

113 if "," in curie: 

114 curie, lang_tag = curie.split(",", 1) 

115 lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")' 

116 query = f""" 

117 SELECT ?object  

118 WHERE {{  

119 ?subject {curie} ?object {lang_tag} 

120 }} 

121 """ 

122 

123 logger.debug("Generated SPARQL query: %s", query) 

124 return query 

125 

126 

127def get_value_graph(graph: Graph, curies: str | list[str]) -> list: 

128 values = [] 

129 if isinstance(curies, str): 

130 curies = [curies] 

131 for curie in curies: 

132 results = graph.query(build_sparql_query(curie)) 

133 objects = [result[0] for result in results] 

134 for obj in objects: 

135 if isinstance(obj, BNode): 

136 values.extend( 

137 [ 

138 str(value) 

139 for value in graph.objects(subject=obj) 

140 if value != RDF.Seq 

141 ] 

142 ) 

143 else: 

144 values.append(str(obj)) 

145 return list(dict.fromkeys(values)) 

146 

147 

148def get_something_from_uri(uri: str, models: list | None = None) -> dict | None: 

149 uri = get_normalized_uri(uri) 

150 graph = Graph() 

151 graph.parse(uri) 

152 

153 configs = find_regex_matching_configs(uri, models) 

154 

155 if config := find_graph_matching_config(graph, configs): 

156 result = defaultdict(list) 

157 result["model"] = config["model"] 

158 result["relations"] = defaultdict(list) 

159 

160 for attribute, curies in config.get("attributes", {}).items(): 

161 values = get_value_graph(graph, curies) 

162 result[attribute].extend(values) 

163 for relation, details in config.get("relations", {}).items(): 

164 details["curies"] = get_value_graph(graph, details.get("curies", [])) 

165 result["relations"][relation] = details 

166 return dict(result) 

167 return None