Coverage for apis_core/utils/rdf.py: 87%

79 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-10-30 12:03 +0000

1# SPDX-FileCopyrightText: 2025 Birger Schacht 

2# SPDX-License-Identifier: MIT 

3 

4import logging 

5import tomllib 

6from collections import defaultdict 

7from pathlib import Path 

8 

9from AcdhArcheAssets.uri_norm_rules import get_normalized_uri 

10from django.template.utils import get_app_template_dirs 

11from rdflib import RDF, BNode, Graph, URIRef 

12from rdflib.exceptions import ParserError 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17def resolve(obj, graph): 

18 """ 

19 Look at the value of object and return the parsed 

20 value. If the value starts and ens with angle brackets, 

21 we interpret it as and transform it to an URI. 

22 If the value is simple text we interpret it as an curie 

23 and we expand it using the graphs namespace manager. 

24 Otherwise we simply return the value 

25 """ 

26 if isinstance(obj, str): 

27 if obj.startswith("<") and obj.endswith(">"): 

28 return URIRef(obj[1:-1]) 

29 return graph.namespace_manager.expand_curie(obj) 

30 if isinstance(obj, bool) and obj is True: 

31 return None 

32 return obj 

33 

34 

35def load_path(path: str | Path) -> dict: 

36 """ 

37 Load a tomlfile either from a path or from the directory 

38 `triple_configs` in any of the app directories. 

39 """ 

40 if isinstance(path, str): 

41 files = [ 

42 directory / path for directory in get_app_template_dirs("triple_configs") 

43 ] 

44 files = list(filter(lambda file: file.exists(), files)) 

45 if files: 

46 path = files[0] 

47 else: 

48 raise ValueError(f"Could not find {path}") 

49 return tomllib.loads(Path(path).read_text()) 

50 

51 

52def graph_matches_config(graph: Graph, configfile: Path) -> dict: 

53 """ 

54 Check if a file contains a config that matches this 

55 graph and if so, return the config as dict. Otherwise 

56 return False 

57 """ 

58 config = load_path(configfile) 

59 for _filter in config.get("filters", [{None: None}]): 

60 try: 

61 triples = [] 

62 for predicate, obj in _filter.items(): 

63 triples.append((None, resolve(predicate, graph), resolve(obj, graph))) 

64 triples = [triple in graph for triple in triples] 

65 if all(triples): 

66 logger.debug("Using %s for parsing graph", configfile) 

67 return config 

68 except ValueError as e: 

69 logger.debug("Filter %s does not match: %s", _filter, e) 

70 return {} 

71 

72 

73def build_sparql_query(curie: str) -> str: 

74 """ 

75 Build a SPARQL query with language preferences. 

76 

77 Args: 

78 curie: predicate to filter on as defined in the toml. 

79 needs to include the predicate and optionally 

80 a lang tag to filter for separated with a comma. 

81 Eg "wdt:P122,en". 

82 

83 Returns: 

84 A SPARQL query string 

85 """ 

86 if curie.lower().strip().startswith(("select", "prefix")): 

87 return curie 

88 lang_tag = "" 

89 if "," in curie: 

90 curie, lang_tag = curie.split(",", 1) 

91 lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")' 

92 query = f""" 

93 SELECT ?object  

94 WHERE {  

95 ?subject {curie} ?object {lang_tag} 

96 } 

97 """ 

98 

99 logger.debug("Generated SPARQL query: %s", query) 

100 return query 

101 

102 

103def get_value_graph(graph: Graph, curies: str | list[str]) -> list: 

104 values = [] 

105 if isinstance(curies, str): 

106 curies = [curies] 

107 for curie in curies: 

108 results = graph.query(build_sparql_query(curie)) 

109 objects = [result[0] for result in results] 

110 for obj in objects: 

111 if isinstance(obj, BNode): 

112 values.extend( 

113 [ 

114 str(value) 

115 for value in graph.objects(subject=obj) 

116 if value != RDF.Seq 

117 ] 

118 ) 

119 else: 

120 values.append(str(obj)) 

121 return list(dict.fromkeys(values)) 

122 

123 

124def load_uri_using_path(uri, configfile: Path) -> dict: 

125 uri = get_normalized_uri(uri) 

126 graph = Graph() 

127 try: 

128 graph.parse(uri) 

129 except ParserError as e: 

130 logger.info(e) 

131 

132 if config := graph_matches_config(graph, configfile): 

133 result = defaultdict(list) 

134 result["relations"] = defaultdict(list) 

135 for attribute, curies in config.get("attributes", {}).items(): 

136 values = get_value_graph(graph, curies) 

137 result[attribute].extend(values) 

138 for relation, details in config.get("relations", {}).items(): 

139 details["curies"] = get_value_graph(graph, details.get("curies", [])) 

140 result["relations"][relation] = details 

141 return dict(result) 

142 return None