Coverage for apis_core/utils/rdf.py: 76%

103 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-10-10 13:36 +0000

1# SPDX-FileCopyrightText: 2025 Birger Schacht 

2# SPDX-License-Identifier: MIT 

3 

4import logging 

5import re 

6import tomllib 

7from collections import defaultdict 

8from pathlib import Path 

9 

10from AcdhArcheAssets.uri_norm_rules import get_normalized_uri 

11from django.apps import apps 

12from django.template.utils import get_app_template_dirs 

13from rdflib import RDF, BNode, Graph, URIRef 

14from rdflib.exceptions import ParserError 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def resolve(obj, graph): 

20 """ 

21 Look at the value of object and return the parsed 

22 value. If the value starts and ens with angle brackets, 

23 we interpret it as and transform it to an URI. 

24 If the value is simple text we interpret it as an curie 

25 and we expand it using the graphs namespace manager. 

26 Otherwise we simply return the value 

27 """ 

28 if isinstance(obj, str): 

29 if obj.startswith("<") and obj.endswith(">"): 

30 return URIRef(obj[1:-1]) 

31 return graph.namespace_manager.expand_curie(obj) 

32 if isinstance(obj, bool) and obj is True: 

33 return None 

34 return obj 

35 

36 

37def load_path(path: str | Path) -> dict: 

38 """ 

39 Load a tomlfile either from a path or from the directory 

40 `triple_configs` in any of the app directories. 

41 """ 

42 if isinstance(path, str): 

43 files = [ 

44 directory / path for directory in get_app_template_dirs("triple_configs") 

45 ] 

46 files = list(filter(lambda file: file.exists(), files)) 

47 if files: 

48 path = files[0] 

49 else: 

50 raise ValueError(f"Could not find {path}") 

51 return tomllib.loads(Path(path).read_text()) 

52 

53 

54def find_regex_matching_configs(uri: str, models: list | None = None) -> dict | None: 

55 """ 

56 Go through a list of models and return all the rdf configs 

57 that are configured in those models that have a regex that 

58 matches the given URI. 

59 """ 

60 models = models or apps.get_models() 

61 models_with_config = [model for model in models if hasattr(model, "rdf_configs")] 

62 configs = [] 

63 for model in models_with_config: 

64 for regex, path in model.rdf_configs().items(): 

65 if re.match(regex, uri): 

66 logger.debug(f"{uri} matched {regex}") 

67 config = load_path(path) 

68 config["path"] = path 

69 config["model"] = model 

70 configs.append(config) 

71 else: 

72 logger.debug(f"{uri} did not match {regex}") 

73 return configs 

74 

75 

76def find_graph_matching_config(graph: Graph, configs: list[dict] = []) -> dict | None: 

77 """ 

78 Go through al list of RDF import configs and return the 

79 ones that have filters defined that match the given graph. 

80 """ 

81 for config in configs: 

82 for _filter in config.get("filters", [{None: None}]): 

83 try: 

84 triples = [] 

85 for predicate, obj in _filter.items(): 

86 triples.append( 

87 (None, resolve(predicate, graph), resolve(obj, graph)) 

88 ) 

89 triples = [triple in graph for triple in triples] 

90 if all(triples): 

91 logger.debug("Using %s for parsing graph", config["path"]) 

92 return config 

93 except ValueError as e: 

94 logger.debug("Filter %s does not match: %s", _filter, e) 

95 return None 

96 

97 

98def build_sparql_query(curie: str) -> str: 

99 """ 

100 Build a SPARQL query with language preferences. 

101 

102 Args: 

103 curie: predicate to filter on as defined in the toml. 

104 needs to include the predicate and optionally 

105 a lang tag to filter for separated with a comma. 

106 Eg "wdt:P122,en". 

107 

108 Returns: 

109 A SPARQL query string 

110 """ 

111 if curie.lower().strip().startswith(("select", "prefix")): 

112 return curie 

113 lang_tag = "" 

114 if "," in curie: 

115 curie, lang_tag = curie.split(",", 1) 

116 lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")' 

117 query = f""" 

118 SELECT ?object  

119 WHERE {  

120 ?subject {curie} ?object {lang_tag} 

121 } 

122 """ 

123 

124 logger.debug("Generated SPARQL query: %s", query) 

125 return query 

126 

127 

128def get_value_graph(graph: Graph, curies: str | list[str]) -> list: 

129 values = [] 

130 if isinstance(curies, str): 

131 curies = [curies] 

132 for curie in curies: 

133 results = graph.query(build_sparql_query(curie)) 

134 objects = [result[0] for result in results] 

135 for obj in objects: 

136 if isinstance(obj, BNode): 

137 values.extend( 

138 [ 

139 str(value) 

140 for value in graph.objects(subject=obj) 

141 if value != RDF.Seq 

142 ] 

143 ) 

144 else: 

145 values.append(str(obj)) 

146 return list(dict.fromkeys(values)) 

147 

148 

149def get_something_from_uri( 

150 uri: str, models: list | None = None, configs=[] 

151) -> dict | None: 

152 uri = get_normalized_uri(uri) 

153 graph = Graph() 

154 try: 

155 graph.parse(uri) 

156 except ParserError as e: 

157 logger.info(e) 

158 

159 if not configs: 

160 configs = find_regex_matching_configs(uri, models) 

161 

162 if config := find_graph_matching_config(graph, configs): 

163 result = defaultdict(list) 

164 if model := config.get("model", False): 

165 result["model"] = model 

166 result["relations"] = defaultdict(list) 

167 

168 for attribute, curies in config.get("attributes", {}).items(): 

169 values = get_value_graph(graph, curies) 

170 result[attribute].extend(values) 

171 for relation, details in config.get("relations", {}).items(): 

172 details["curies"] = get_value_graph(graph, details.get("curies", [])) 

173 result["relations"][relation] = details 

174 return dict(result) 

175 return None 

176 

177 

178def load_uri_using_path(uri, configfile: Path) -> dict: 

179 config = load_path(configfile) 

180 config["path"] = configfile 

181 return get_something_from_uri(uri=uri, configs=[config])