Skip to content

Reference

This part of the documentation provides automatically generated information about RDFProxy symbols.


rdfproxy.adapter.SPARQLModelAdapter

Bases: Generic[_TModelInstance]

Adapter/Mapper for SPARQL query result set to Pydantic model conversions.

The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint and map a flat SPARQL query result set to a potentially nested Pydantic model.

SPARQLModelAdapter.query returns a Page model object with a default pagination size of 100 results.

SPARQL bindings are implicitly assigned to model fields of the same name, explicit SPARQL binding to model field allocation is available with rdfproxy.SPARQLBinding.

Result grouping is controlled through the model, i.e. grouping is triggered when a field of list[pydantic.BaseModel] is encountered.

See https://github.com/acdh-oeaw/rdfproxy/tree/main/examples for examples.

Source code in rdfproxy/adapter.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class SPARQLModelAdapter(Generic[_TModelInstance]):
    """Adapter/Mapper for SPARQL query result set to Pydantic model conversions.

    The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint
    and map a flat SPARQL query result set to a potentially nested Pydantic model.

    SPARQLModelAdapter.query returns a Page model object with a default pagination size of 100 results.

    SPARQL bindings are implicitly assigned to model fields of the same name,
    explicit SPARQL binding to model field allocation is available with rdfproxy.SPARQLBinding.

    Result grouping is controlled through the model,
    i.e. grouping is triggered when a field of list[pydantic.BaseModel] is encountered.

    See https://github.com/acdh-oeaw/rdfproxy/tree/main/examples for examples.
    """

    def __init__(
        self,
        target: str,
        query: str,
        model: type[_TModelInstance],
    ) -> None:
        self._target = target
        self._query = check_query(query)
        self._model = check_model(model)

        self.sparqlwrapper = SPARQLWrapper(self._target)

        logger.info("Initialized SPARQLModelAdapter.")
        logger.debug("Endpoint: %s", self._target)
        logger.debug("Model: %s", self._model)
        logger.debug("Query: \n%s", self._query)

    def query(
        self, query_parameters: QueryParameters = QueryParameters()
    ) -> Page[_TModelInstance]:
        """Run a query against an endpoint and return a Page model object."""
        logger.info(
            "Running SPARQLModelAdapter.query against endpoint '%'", self._target
        )

        query_constructor = _QueryConstructor(
            query=self._query,
            query_parameters=query_parameters,
            model=self._model,
        )

        count_query = query_constructor.get_count_query()
        items_query = query_constructor.get_items_query()

        logger.debug("Running items query: \n%s", items_query)

        items_query_bindings: Iterator[dict] = self.sparqlwrapper.query(items_query)
        mapper = _ModelBindingsMapper(self._model, items_query_bindings)
        items: list[_TModelInstance] = mapper.get_models()

        logger.debug("Running count query: \n%s", count_query)

        count_query_bindings: Iterator[dict] = self.sparqlwrapper.query(count_query)
        total: int = int(next(count_query_bindings)["cnt"])
        pages: int = math.ceil(total / query_parameters.size)

        return Page(
            items=items,
            page=query_parameters.page,
            size=query_parameters.size,
            total=total,
            pages=pages,
        )

query(query_parameters=QueryParameters())

Run a query against an endpoint and return a Page model object.

Source code in rdfproxy/adapter.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def query(
    self, query_parameters: QueryParameters = QueryParameters()
) -> Page[_TModelInstance]:
    """Run a query against an endpoint and return a Page model object."""
    logger.info(
        "Running SPARQLModelAdapter.query against endpoint '%'", self._target
    )

    query_constructor = _QueryConstructor(
        query=self._query,
        query_parameters=query_parameters,
        model=self._model,
    )

    count_query = query_constructor.get_count_query()
    items_query = query_constructor.get_items_query()

    logger.debug("Running items query: \n%s", items_query)

    items_query_bindings: Iterator[dict] = self.sparqlwrapper.query(items_query)
    mapper = _ModelBindingsMapper(self._model, items_query_bindings)
    items: list[_TModelInstance] = mapper.get_models()

    logger.debug("Running count query: \n%s", count_query)

    count_query_bindings: Iterator[dict] = self.sparqlwrapper.query(count_query)
    total: int = int(next(count_query_bindings)["cnt"])
    pages: int = math.ceil(total / query_parameters.size)

    return Page(
        items=items,
        page=query_parameters.page,
        size=query_parameters.size,
        total=total,
        pages=pages,
    )

rdfproxy.constructor._QueryConstructor

The class encapsulates dynamic SPARQL query modification logic for implementing purely SPARQL-based, deterministic pagination.

Public methods get_items_query and get_count_query are used in rdfproxy.SPARQLModelAdapter to construct queries for retrieving arguments for Page object instantiation.

Source code in rdfproxy/constructor.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class _QueryConstructor:
    """The class encapsulates dynamic SPARQL query modification logic
    for implementing purely SPARQL-based, deterministic pagination.

    Public methods get_items_query and get_count_query are used in rdfproxy.SPARQLModelAdapter
    to construct queries for retrieving arguments for Page object instantiation.
    """

    def __init__(
        self,
        query: str,
        query_parameters: QueryParameters,
        model: type[_TModelInstance],
    ) -> None:
        self.query = query
        self.query_parameters = query_parameters
        self.model = model

        self.bindings_map = FieldsBindingsMap(model)
        self.orderable_bindings_map = OrderableFieldsBindingsMap(model)

        self.group_by: str | None = self.bindings_map.get(
            model.model_config.get("group_by")
        )
        self.order_by: str | None = (
            None
            if self.query_parameters.order_by is None
            else self.orderable_bindings_map[self.query_parameters.order_by]
        )

    def get_items_query(self) -> str:
        """Construct a SPARQL items query for use in rdfproxy.SPARQLModelAdapter."""
        if self.group_by is None:
            return self._get_ungrouped_items_query()
        return self._get_grouped_items_query()

    def get_count_query(self) -> str:
        """Construct a SPARQL count query for use in rdfproxy.SPARQLModelAdapter"""
        if self.group_by is None:
            select_clause = "select (count(*) as ?cnt)"
        else:
            select_clause = f"select (count(distinct ?{self.group_by}) as ?cnt)"

        return replace_query_select_clause(self.query, select_clause)

    @staticmethod
    def _calculate_offset(page: int, size: int) -> int:
        """Calculate the offset value for paginated SPARQL templates."""
        match page:
            case 1:
                return 0
            case 2:
                return size
            case _:
                return size * (page - 1)

    def _get_grouped_items_query(self) -> str:
        """Construct a SPARQL items query for grouped models."""
        filter_clause: str | None = self._compute_filter_clause()
        select_clause: str = self._compute_select_clause()
        order_by_value: str = self._compute_order_by_value()
        limit, offset = self._compute_limit_offset()

        subquery = compose_left(
            remove_sparql_prefixes,
            component(replace_query_select_clause, repl=select_clause),
            component(inject_into_query, injectant=filter_clause),
            component(
                add_solution_modifier,
                order_by=order_by_value,
                limit=limit,
                offset=offset,
            ),
        )(self.query)

        return inject_into_query(self.query, subquery)

    def _get_ungrouped_items_query(self) -> str:
        """Construct a SPARQL items query for ungrouped models."""
        filter_clause: str | None = self._compute_filter_clause()
        order_by_value: str = self._compute_order_by_value()
        limit, offset = self._compute_limit_offset()

        return compose_left(
            component(inject_into_query, injectant=filter_clause),
            component(
                add_solution_modifier,
                order_by=order_by_value,
                limit=limit,
                offset=offset,
            ),
        )(self.query)

    def _compute_limit_offset(self) -> tuple[int, int]:
        """Calculate limit and offset values for SPARQL-based pagination."""
        limit = self.query_parameters.size
        offset = self._calculate_offset(
            self.query_parameters.page, self.query_parameters.size
        )

        return limit, offset

    def _compute_filter_clause(self) -> str | None:
        """Stub: Always None for now."""
        return None

    def _compute_select_clause(self):
        """Stub: Static SELECT clause for now."""
        return f"select distinct ?{self.group_by}"

    def _compute_order_by_value(self):
        """Compute a value for ORDER BY used in RDFProxy query modification."""
        match self.group_by, self.order_by:
            case None, None:
                return f"?{get_query_projection(self.query)[0]}"
            case group_by, None:
                return f"?{group_by}"

            case _, order_by:
                return f"{'DESC' if self.query_parameters.desc else 'ASC'}(?{order_by})"

            case _:  # pragma: no cover
                assert False, "Unreachable case in _compute_order_by_value"

get_count_query()

Construct a SPARQL count query for use in rdfproxy.SPARQLModelAdapter

Source code in rdfproxy/constructor.py
54
55
56
57
58
59
60
61
def get_count_query(self) -> str:
    """Construct a SPARQL count query for use in rdfproxy.SPARQLModelAdapter"""
    if self.group_by is None:
        select_clause = "select (count(*) as ?cnt)"
    else:
        select_clause = f"select (count(distinct ?{self.group_by}) as ?cnt)"

    return replace_query_select_clause(self.query, select_clause)

get_items_query()

Construct a SPARQL items query for use in rdfproxy.SPARQLModelAdapter.

Source code in rdfproxy/constructor.py
48
49
50
51
52
def get_items_query(self) -> str:
    """Construct a SPARQL items query for use in rdfproxy.SPARQLModelAdapter."""
    if self.group_by is None:
        return self._get_ungrouped_items_query()
    return self._get_grouped_items_query()

rdfproxy.mapper._ModelBindingsMapper

Bases: Generic[_TModelInstance]

Utility class for mapping bindings to nested/grouped Pydantic models.

RDFProxy utilizes Pydantic models also as a modelling grammar for grouping and aggregation, mainly by treating the 'group_by' entry in ConfigDict in combination with list-type annoted model fields as grouping and aggregation indicators. _ModelBindingsMapper applies this grammar for mapping flat bindings to potentially nested and grouped Pydantic models.

Note: _ModelBindingsMapper is intended for use in rdfproxy.SPARQLModelAdapter and - since no model sanity checking runs in the mapper itself - somewhat coupled to SPARQLModelAdapter. The mapper can be useful in its own right though. For standalone use, the initializer should be overwritten and model sanity checking should be added to the _ModelBindingsMapper subclass.

Source code in rdfproxy/mapper.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class _ModelBindingsMapper(Generic[_TModelInstance]):
    """Utility class for mapping bindings to nested/grouped Pydantic models.

    RDFProxy utilizes Pydantic models also as a modelling grammar for grouping
    and aggregation, mainly by treating the 'group_by' entry in ConfigDict in
    combination with list-type annoted model fields as grouping
    and aggregation indicators. _ModelBindingsMapper applies this grammar
    for mapping flat bindings to potentially nested and grouped Pydantic models.

    Note: _ModelBindingsMapper is intended for use in rdfproxy.SPARQLModelAdapter and -
    since no model sanity checking runs in the mapper itself - somewhat coupled to
    SPARQLModelAdapter. The mapper can be useful in its own right though.
    For standalone use, the initializer should be overwritten and model sanity checking
    should be added to the _ModelBindingsMapper subclass.
    """

    def __init__(self, model: type[_TModelInstance], bindings: Iterable[dict]):
        self.model = model
        self.bindings = bindings

        self.df = pd.DataFrame(data=self.bindings, dtype=object)

    def get_models(self) -> list[_TModelInstance]:
        """Run the model mapping logic against bindings and collect a list of model instances."""
        return list(self._instantiate_models(self.df, self.model))

    def _instantiate_models(
        self, df: pd.DataFrame, model: type[_TModelInstance]
    ) -> Iterator[_TModelInstance]:
        """Generate potentially nested and grouped model instances from a dataframe.

        Note: The DataFrameGroupBy object must not be sorted,
        else the result set order will not be maintained.
        """
        alias_map = FieldsBindingsMap(model=model)

        if (_group_by := model.model_config.get("group_by")) is None:
            for _, row in df.iterrows():
                yield self._instantiate_ungrouped_model_from_row(row, model)
        else:
            group_by = alias_map[_group_by]
            group_by_object: DataFrameGroupBy = df.groupby(group_by, sort=False)

            for _, group_df in group_by_object:
                yield self._instantiate_grouped_model_from_df(group_df, model)

    def _get_model_union_field_value(self, field_info: FieldInfo, row: pd.Series):
        """Compute the value for model union fields.

        The method instantiates the first model of a model union type
        and runs model_bool against that model instance. If model_bool is falsey
        the required default value is returned instead of the model instance.
        """
        assert not field_info.is_required(), "Default value required."

        model_union = field_info.annotation
        nested_model: type[BaseModel] = next(
            filter(_is_pydantic_model_static_type, get_args(model_union))
        )
        nested_model_instance: BaseModel = self._instantiate_ungrouped_model_from_row(
            row,
            nested_model,  # type: ignore
        )

        model_bool_predicate: ModelBoolPredicate = get_model_bool_predicate(
            nested_model_instance
        )

        return (
            nested_model_instance
            if model_bool_predicate(nested_model_instance)
            else field_info.default
        )

    def _instantiate_ungrouped_model_from_row(
        self, row: pd.Series, model: type[_TModelInstance]
    ) -> _TModelInstance:
        """Instantiate an ungrouped model from a pd.Series row.

        This handles the UNGROUPED code path in _ModelBindingsMapper._instantiate_models.
        """
        alias_map = FieldsBindingsMap(model=model)
        curried_model = CurryModel(model=model)

        for field_name, field_info in model.model_fields.items():
            if isinstance(nested_model := field_info.annotation, type(BaseModel)):
                curried_model(
                    **{
                        field_name: self._instantiate_ungrouped_model_from_row(
                            row,
                            nested_model,  # type: ignore
                        )
                    }
                )
            elif _is_pydantic_model_union_static_type(field_info.annotation):
                value = self._get_model_union_field_value(
                    field_info=field_info, row=row
                )
                curried_model(**{field_name: value})
            else:
                _sentinel = object()
                field_value = (
                    field_info.default
                    if (value := row.get(alias_map[field_name], _sentinel)) is _sentinel
                    else value
                )
                curried_model(**{field_name: field_value})

        model_instance = curried_model()
        assert isinstance(model_instance, model)  # type narrow
        return model_instance

    @staticmethod
    def _get_unique_models(models: Iterator[_TModelInstance]) -> list[_TModelInstance]:
        """Get a list of unique models from an iterable.

        Note: Unless frozen=True is specified in a model class,
        Pydantic models instances are not hashable, i.e. dict.fromkeys
        is not feasible for acquiring ordered unique models.

        Note: StopIteration in _get_unique_models should be unreachable,
        because the result of _instantiate_models (the input of _get_unique_models
        when called in _instantiate_grouped_model_from_df) gets called
        on grouped dataframes and empty groups do not exist.
        """
        unique_models = []

        _model = next(models, None)
        assert _model is not None, "StopIteration should be unreachable"

        model_bool_predicate: ModelBoolPredicate = get_model_bool_predicate(_model)

        for model in chain([_model], models):
            if (model not in unique_models) and (model_bool_predicate(model)):
                unique_models.append(model)

        return unique_models

    def _instantiate_grouped_model_from_df(
        self, df: pd.DataFrame, model: type[_TModelInstance]
    ) -> _TModelInstance:
        """Instantiate a grouped model  pd.DataFrame (a group dataframe).

        This handles the GROUPED code path in _ModelBindingsMapper._instantiate_models.
        """
        alias_map = FieldsBindingsMap(model=model)
        curried_model = CurryModel(model=model)

        for field_name, field_info in model.model_fields.items():
            if _is_list_pydantic_model_static_type(field_info.annotation):
                nested_model, *_ = get_args(field_info.annotation)
                value = self._get_unique_models(
                    self._instantiate_models(df, nested_model)
                )
            elif _is_list_static_type(field_info.annotation):
                value = list(dict.fromkeys(df[alias_map[field_name]].dropna()))
            elif isinstance(nested_model := field_info.annotation, type(BaseModel)):
                first_row = df.iloc[0]
                value = self._instantiate_ungrouped_model_from_row(
                    first_row,
                    nested_model,  # type: ignore
                )
            elif _is_pydantic_model_union_static_type(field_info.annotation):
                first_row = df.iloc[0]
                value = self._get_model_union_field_value(
                    field_info=field_info, row=first_row
                )
            else:
                first_row = df.iloc[0]
                value = first_row.get(alias_map[field_name]) or field_info.default

            curried_model(**{field_name: value})

        model_instance = curried_model()
        assert isinstance(model_instance, model)  # type narrow
        return model_instance

get_models()

Run the model mapping logic against bindings and collect a list of model instances.

Source code in rdfproxy/mapper.py
44
45
46
def get_models(self) -> list[_TModelInstance]:
    """Run the model mapping logic against bindings and collect a list of model instances."""
    return list(self._instantiate_models(self.df, self.model))


rdfproxy.utils._types.ConfigDict

Bases: ConfigDict

pydantic.ConfigDict extension for RDFProxy model_config options.

Source code in rdfproxy/utils/_types.py
50
51
52
53
54
class ConfigDict(PydanticConfigDict, total=False):
    """pydantic.ConfigDict extension for RDFProxy model_config options."""

    group_by: str
    model_bool: _TModelBoolValue

rdfproxy.utils._types.SPARQLBinding

Bases: str

SPARQLBinding type for explicit SPARQL binding to model field allocation.

This type's intended use is with typing.Annotated in the context of a Pyantic field definition.

Example:

class Work(BaseModel):
   name: Annotated[str, SPARQLBinding("title")]

class Person(BaseModel):
    name: str
    work: Work

This signals to the RDFProxy SPARQL-to-model mapping logic to use the "title" SPARQL binding (not the "name" binding) to populate the Work.name field.

Source code in rdfproxy/utils/_types.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class SPARQLBinding(str):
    """SPARQLBinding type for explicit SPARQL binding to model field allocation.

    This type's intended use is with typing.Annotated in the context of a Pyantic field definition.

    Example:

        class Work(BaseModel):
           name: Annotated[str, SPARQLBinding("title")]

        class Person(BaseModel):
            name: str
            work: Work

    This signals to the RDFProxy SPARQL-to-model mapping logic
    to use the "title" SPARQL binding (not the "name" binding) to populate the Work.name field.
    """

    ...


rdfproxy.utils.models.Page

Bases: BaseModel, Generic[_TModelInstance]

Page model for rdfproxy pagination functionality.

This model is loosely inspired by the fastapi-pagination Page class, see https://github.com/uriyyo/fastapi-pagination.

Also see https://docs.pydantic.dev/latest/concepts/models/#generic-models for Generic Pydantic models.

Source code in rdfproxy/utils/models.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Page(BaseModel, Generic[_TModelInstance]):
    """Page model for rdfproxy pagination functionality.

    This model is loosely inspired by the fastapi-pagination Page class,
    see https://github.com/uriyyo/fastapi-pagination.

    Also see https://docs.pydantic.dev/latest/concepts/models/#generic-models
    for Generic Pydantic models.
    """

    items: list[_TModelInstance] | dict[str, list[_TModelInstance]]
    page: int
    size: int
    total: int
    pages: int

rdfproxy.utils.models.QueryParameters

Bases: BaseModel

Query parameter model for SPARQLModelAdapter.query.

See https://fastapi.tiangolo.com/tutorial/query-param-models/

Source code in rdfproxy/utils/models.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class QueryParameters(BaseModel):
    """Query parameter model for SPARQLModelAdapter.query.

    See https://fastapi.tiangolo.com/tutorial/query-param-models/
    """

    page: int = Field(default=1, gt=0)
    size: int = Field(default=100, ge=1)

    order_by: str | None = Field(default=None)
    desc: bool | None = Field(default=None)

    @model_validator(mode="after")
    @classmethod
    def _check_order_by_desc_dependency(cls, data: Any) -> Any:
        """Validator for checking the semantics for ordering.

        The defaults for order_by and desc should be None.
        If only order_by is defined, desc should be set to False.
        If only desc is defined, a ValueError should be raised.
        """
        match data.order_by, data.desc:
            case None, None:
                pass
            case _, None:
                data.desc = False
            case None, _:
                raise ValueError("Field 'desc' requires field 'order_by'.")

        return data

    def __class_getitem__(cls, model: type[_TModelInstance]):  # type: ignore
        _order_by_fields = [(k, k) for k in OrderableFieldsBindingsMap(model).keys()]
        OrderByEnum = StrEnum("OrderByEnum", _order_by_fields)

        return create_model(cls.__name__, order_by=(OrderByEnum, None), __base__=cls)