Reference

apis_entities

abc

E53_Place

Bases: Model

The feature_code field refers to the geonames feature codes, as listed on https://www.geonames.org/export/codes.html

Source code in apis_core/apis_entities/abc.py
class E53_Place(models.Model):
    """
    The feature_code field refers to the geonames feature codes, as
    listed on https://www.geonames.org/export/codes.html
    """

    label = models.CharField(
        blank=True, default="", max_length=4096, verbose_name=_("label")
    )
    latitude = models.FloatField(blank=True, null=True, verbose_name=_("latitude"))
    longitude = models.FloatField(blank=True, null=True, verbose_name=_("longitude"))
    feature_code = models.CharField(
        blank=True,
        default="",
        max_length=16,
        verbose_name=_("feature code"),
        help_text='<a href="https://www.geonames.org/export/codes.html">Geonames Feature Code List</a>',
    )

    class Meta:
        abstract = True
        verbose_name = _("place")
        verbose_name_plural = _("places")
        ordering = ["label"]

    def __str__(self):
        return self.label

    @classmethod
    def rdf_configs(cls):
        return [
            Path(__file__).parent / "triple_configs/E53_PlaceFromDNB.toml",
            Path(__file__).parent / "triple_configs/E53_PlaceFromGeonames.toml",
            Path(__file__).parent / "triple_configs/E53_PlaceFromWikidata.toml",
        ]

filtersets

ModelSearchFilter

Bases: CharFilter

This filter is a customized CharFilter that uses the generate_search_filter method to adapt the search filter to the model that is searched. It also extracts sets the help text based on the fields searched.

Source code in apis_core/apis_entities/filtersets.py
class ModelSearchFilter(django_filters.CharFilter):
    """
    This filter is a customized CharFilter that
    uses the `generate_search_filter` method to
    adapt the search filter to the model that is
    searched.
    It also extracts sets the help text based on
    the fields searched.
    """

    def __init__(self, *args, **kwargs):
        model = kwargs.pop("model", None)
        super().__init__(*args, **kwargs)

        if model is not None and "help_text" not in self.extra:
            field_names = [field.verbose_name for field in default_search_fields(model)]
            # use force_str on the fields verbose names to convert
            # lazy instances to string and join the results
            fields = ", ".join(map(force_str, field_names))
            self.extra["help_text"] = f"Search in fields: {fields}"

    def filter(self, qs, value):
        return qs.filter(generate_search_filter(qs.model, value))

models

AbstractEntity

Bases: RootObject

Abstract super class which encapsulates common logic between the different entity kinds and provides various methods relating to either all or one specific entity kind.

Most of the class methods are designed to be used in the subclass as they are considering contexts which depend on the subclass entity type. So they are to be understood in that dynamic context.

Source code in apis_core/apis_entities/models.py
class AbstractEntity(RootObject):
    """
    Abstract super class which encapsulates common logic between the
    different entity kinds and provides various methods relating to either
    all or one specific entity kind.

    Most of the class methods are designed to be used in the subclass as they
    are considering contexts which depend on the subclass entity type.
    So they are to be understood in that dynamic context.
    """

    class Meta:
        abstract = True

    @classmethod
    def get_or_create_uri(cls, uri):
        uri = str(uri)
        try:
            if re.match(r"^[0-9]*$", uri):
                p = cls.objects.get(pk=uri)
            else:
                p = cls.objects.get(uri__uri=uri)
            return p
        except Exception as e:
            print("Found no object corresponding to given uri." + e)
            return False

    # TODO
    @classmethod
    def get_entity_list_filter(cls):
        return None

    @functools.cached_property
    def get_prev_id(self):
        if NEXT_PREV:
            prev_instance = (
                type(self)
                .objects.filter(id__lt=self.id)
                .order_by("-id")
                .only("id")
                .first()
            )
            if prev_instance is not None:
                return prev_instance.id
        return False

    @functools.cached_property
    def get_next_id(self):
        if NEXT_PREV:
            next_instance = (
                type(self)
                .objects.filter(id__gt=self.id)
                .order_by("id")
                .only("id")
                .first()
            )
            if next_instance is not None:
                return next_instance.id
        return False

tables

DuplicateColumn

Bases: ActionColumn

A column showing a view button

Source code in apis_core/apis_entities/tables.py
class DuplicateColumn(ActionColumn):
    """
    A column showing a view button
    """

    template_name = "columns/duplicate.html"
    permission = "create"

views

EntitiesAutocomplete

Bases: Select2QuerySetView

This endpoint allows us to use autocomplete over multiple model classes. It takes a parameter entities which is a list of ContentType natural keys and searches for the query in all instances of those entities (using generate_search_filter, which means it uses a different search approach for every model). The return values of the endpoint are then prefixed with the id of the contenttype of the results, separated by an underscore.

Example: Using this endpoint with the parameters:

?entities=apis_ontology.person&entities=apis_ontology.place&q=ammer

gives you all the persons and places that have ammer in their names and labels.

Source code in apis_core/apis_entities/views.py
class EntitiesAutocomplete(autocomplete.Select2QuerySetView):
    """
    This endpoint allows us to use autocomplete over multiple model classes.
    It takes a parameter `entities` which is a list of ContentType natural
    keys and searches for the query in all instances of those entities
    (using `generate_search_filter`, which means it uses a different search
    approach for every model).
    The return values of the endpoint are then prefixed with the id of the
    contenttype of the results, separated by an underscore.

    Example:
    Using this endpoint with the parameters:

        ?entities=apis_ontology.person&entities=apis_ontology.place&q=ammer

    gives you all the persons and places that have `ammer` in their names
    and labels.
    """

    def get_result_value(self, result) -> str:
        content_type = ContentType.objects.get_for_model(result)
        return f"{content_type.id}_" + super().get_result_value(result)

    def get_queryset(self):
        q = Q()
        entities = []
        for entity in self.request.GET.getlist("entities"):
            app_label, model = entity.split(".")
            content_type = get_object_or_404(
                ContentType, app_label=app_label, model=model
            )
            entities.append(content_type)
        if not entities:
            entities = get_entity_content_types()
        for content_type in entities:
            name = RootObject.objects_inheritance.get_queryset()._get_ancestors_path(
                content_type.model_class()
            )
            q |= Q(**{f"{name}__isnull": False}) & generate_search_filter(
                content_type.model_class(), self.q, prefix=f"{name}__"
            )
        return RootObject.objects_inheritance.select_subclasses().filter(q)

apis_metainfo

models

RootObject

Bases: GenericModel, Model

The very root thing that can exist in a given ontology. Several classes inherit from it. By having one overarching super class we gain the advantage of unique identifiers.

Source code in apis_core/apis_metainfo/models.py
class RootObject(GenericModel, models.Model):
    """
    The very root thing that can exist in a given ontology. Several classes inherit from it.
    By having one overarching super class we gain the advantage of unique identifiers.
    """

    objects = models.Manager()
    objects_inheritance = InheritanceManager()

viewsets

UriToObjectViewSet

Bases: ViewSet

This API route provides an endpoint for resolving URIs and forwarding them to the endpoint in the local instance. Pass a uri request parameter to resolve the uri.

Source code in apis_core/apis_metainfo/viewsets.py
class UriToObjectViewSet(viewsets.ViewSet):
    """
    This API route provides an endpoint for resolving URIs and forwarding
    them to the endpoint in the local instance. Pass a `uri` request
    parameter to resolve the uri.
    """

    @extend_schema(
        parameters=[
            OpenApiParameter(
                "uri", OpenApiTypes.URI, OpenApiParameter.QUERY
            ),  # path variable was overridden
        ],
        responses={301: None},
        description="This API route provides an endpoint for resolving URIs and forwarding them to the endpoint in the local instance. Pass a `uri` request parameter to resolve the uri.",
    )
    def list(self, request):
        params = request.query_params.dict()
        uri = params.pop("uri", None)
        if uri:
            u = get_object_or_404(Uri, uri=request.query_params.get("uri"))
            r = u.content_object.get_api_detail_endpoint()
            if params:
                r += "?" + QueryDict.from_keys(params).urlencode()
            return HttpResponseRedirect(r)
        return Response()

collections

filters

CollectionsIncludeExcludeFilter

Bases: ModelMultipleChoiceFilter

The CollectionsIncludeExcludeFilter provides to ModelMultipleChoiceFilters that allow to filter model instances that are either in a collection OR (or AND) are not in a collection.

Source code in apis_core/collections/filters.py
class CollectionsIncludeExcludeFilter(django_filters.filters.ModelMultipleChoiceFilter):
    """
    The CollectionsIncludeExcludeFilter provides to ModelMultipleChoiceFilters that allow
    to filter model instances that are either **in** a collection OR (or AND) are **not in**
    a collection.
    """

    @property
    def field(self):
        return RowColumnMultiValueField(
            fields=[super().field, super().field],
            labels=["include", "exclude"],
            required=self.extra["required"],
        )

    def filter(self, queryset, value):
        if not value:
            return queryset
        include = exclude = []
        q = Q()
        try:
            content_type = ContentType.objects.get_for_model(queryset.model)
            skoscollectioncontentobject = apps.get_model(
                "collections.SkosCollectionContentObject"
            )
            include, exclude = value
            include_ids = skoscollectioncontentobject.objects.filter(
                content_type=content_type, collection__in=include
            ).values("object_id")
            if include:
                q &= Q(id__in=include_ids)
            exclude_ids = skoscollectioncontentobject.objects.filter(
                content_type=content_type, collection__in=exclude
            ).values("object_id")
            if exclude:
                q &= ~Q(id__in=exclude_ids)
        except LookupError as e:
            logger.debug("Not filtering for collections: %s", e)
        return queryset.filter(q)

models

SkosCollection

Bases: GenericModel, Model

SKOS collections are labeled and/or ordered groups of SKOS concepts. Collections are useful where a group of concepts shares something in common, and it is convenient to group them under a common label, or where some concepts can be placed in a meaningful order.

Miles, Alistair, and Sean Bechhofer. "SKOS simple knowledge organization system reference. W3C recommendation (2009)."

Source code in apis_core/collections/models.py
class SkosCollection(GenericModel, models.Model):
    """
    SKOS collections are labeled and/or ordered groups of SKOS concepts.
    Collections are useful where a group of concepts shares something in common,
    and it is convenient to group them under a common label, or
    where some concepts can be placed in a meaningful order.

    Miles, Alistair, and Sean Bechhofer. "SKOS simple knowledge
    organization system reference. W3C recommendation (2009)."

    """

    class Meta:
        ordering = ["name"]
        constraints = [
            models.UniqueConstraint(
                fields=(
                    "name",
                    "parent",
                ),
                name="unique_name_parent",
                nulls_distinct=False,
                violation_error_message="The combination of name and parent collection must be unique",
            ),
            models.CheckConstraint(
                check=~models.Q(name__contains="|"),
                name="check_name_pipe",
                violation_error_message="The name must not contain the pipe symbol: |",
            ),
        ]

    parent = models.ForeignKey("self", null=True, on_delete=models.CASCADE, blank=True)
    name = models.CharField(
        max_length=300,
        verbose_name="skos:prefLabel",
        help_text="Collection label or name",
    )
    label_lang = models.CharField(
        max_length=3,
        blank=True,
        default="en",
        verbose_name="skos:prefLabel language",
        help_text="Language of preferred label given above",
    )
    creator = models.TextField(
        blank=True,
        verbose_name="dc:creator",
        help_text="Person or organisation that created this collection"
        "If more than one list all using a semicolon ;",
    )
    contributor = models.TextField(
        blank=True,
        verbose_name="dc:contributor",
        help_text="Person or organisation that made contributions to the collection"
        "If more than one list all using a semicolon ;",
    )
    objects = SkosCollectionManager()

    def __str__(self):
        return self.name

    def children(self):
        return SkosCollection.objects.filter(parent=self)

    def children_tree_as_list(self):
        childtrees = [self]
        for child in self.children():
            childtrees.extend(child.children_tree_as_list())
        return childtrees

    def add(self, instance: object):
        content_type = ContentType.objects.get_for_model(instance)
        SkosCollectionContentObject.objects.get_or_create(
            collection=self, content_type=content_type, object_id=instance.id
        )

    def remove(self, instance: object):
        content_type = ContentType.objects.get_for_model(instance)
        SkosCollectionContentObject.objects.filter(
            collection=self, content_type=content_type, object_id=instance.id
        ).delete()

    @property
    def parent_collection(self):
        content_type = ContentType.objects.get_for_model(self)
        sccos = SkosCollectionContentObject.objects.filter(
            content_type=content_type, object_id=self.id
        )
        if len(sccos) == 1:
            return sccos.first().collection
        raise SkosCollection.MultipleObjectsReturned(
            f'"{self}" is part of multiple collections'
        )

SkosCollectionContentObject

Bases: GenericModel, Model

Throughtable datamodel to connect collections to arbitrary content

Source code in apis_core/collections/models.py
class SkosCollectionContentObject(GenericModel, models.Model):
    """
    *Throughtable* datamodel to connect collections to arbitrary content
    """

    collection = models.ForeignKey(SkosCollection, on_delete=models.CASCADE)

    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
    object_id = models.PositiveIntegerField()
    content_object = GenericForeignKey("content_type", "object_id")

    def __str__(self):
        return f"{self.content_object} -> {self.collection}"

SkosCollectionManager

Bases: Manager

Source code in apis_core/collections/models.py
class SkosCollectionManager(models.Manager):
    def get_by_full_path(self, name: str):
        """
        Return a collection specified by its full path, from the root colletion
        to the leaf collection, delimited by `|`. I.e. if there is a collection
        named `foo` and it has a parent named `bar` and `bar` does not have a
        parent, then you can use the string "bar|foo" to get the `foo` collection.
        """
        names = name.split("|")
        parent = None
        while names:
            parent = self.get(parent=parent, name=names.pop(0))
        return parent

    def by_instance(self, instance):
        content_type = ContentType.objects.get_for_model(instance)
        scco = SkosCollectionContentObject.objects.filter(
            content_type=content_type, object_id=instance.id
        )
        return self.get_queryset().filter(
            pk__in=scco.values_list("collection", flat=True)
        )
get_by_full_path(name)

Return a collection specified by its full path, from the root colletion to the leaf collection, delimited by |. I.e. if there is a collection named foo and it has a parent named bar and bar does not have a parent, then you can use the string "bar|foo" to get the foo collection.

Source code in apis_core/collections/models.py
def get_by_full_path(self, name: str):
    """
    Return a collection specified by its full path, from the root colletion
    to the leaf collection, delimited by `|`. I.e. if there is a collection
    named `foo` and it has a parent named `bar` and `bar` does not have a
    parent, then you can use the string "bar|foo" to get the `foo` collection.
    """
    names = name.split("|")
    parent = None
    while names:
        parent = self.get(parent=parent, name=names.pop(0))
    return parent

signals

add_to_session_collection(sender, instance, created, raw, using, update_fields, **kwargs)

Add a created apis_core.history model instance to all the SkosCollections that are listed in the session_collections session variable. This needs the 'crum.CurrentRequestUserMiddleware' middleware to be enabled.

Source code in apis_core/collections/signals.py
@receiver(post_save)
def add_to_session_collection(
    sender, instance, created, raw, using, update_fields, **kwargs
):
    """
    Add a created apis_core.history model instance to all the SkosCollections
    that are listed in the `session_collections` session variable.
    This needs the 'crum.CurrentRequestUserMiddleware' middleware to
    be enabled.
    """
    request = get_current_request()
    if isinstance(instance, APISHistoryTableBase) and request:
        for pk in request.session.get("session_collections", []):
            sc = SkosCollection.objects.get(pk=pk)
            content_type = ContentType.objects.get_for_model(instance)
            SkosCollectionContentObject.objects.create(
                collection=sc,
                content_type=content_type,
                object_id=instance.history_id,
            )
            messages.info(request, f"Tagged {instance} with tag {sc}")

templatetags

collections

collection_object_collection(context, obj, skoscollectioncollectionobjects)

Provide a button to change the connection between an object and a collection to point to the collection the collection is in.

Source code in apis_core/collections/templatetags/collections.py
@register.inclusion_tag(
    "collections/collection_object_collection.html", takes_context=True
)
def collection_object_collection(context, obj, skoscollectioncollectionobjects):
    """
    Provide a button to change the connection between an object and
    a collection to point to the collection the collection is in.
    """
    if len(skoscollectioncollectionobjects) > 1:
        context["error"] = (
            "Multiple collections found to toggle, please check `collection_object_collection`"
        )
    if len(skoscollectioncollectionobjects) == 1:
        collectionobject = skoscollectioncollectionobjects.first()
        context["parent"] = collectionobject.collection.parent_collection
        context["collectionobject"] = collectionobject
        context["content_type"] = ContentType.objects.get_for_model(obj)
        context["object"] = obj
    return context
collection_object_collection_by_id(context, obj, *collection_ids)

Wrapper templatetag to allow using collection_object_parent with just the ids of collections.

Source code in apis_core/collections/templatetags/collections.py
@register.inclusion_tag(
    "collections/collection_object_collection.html", takes_context=True
)
def collection_object_collection_by_id(context, obj, *collection_ids):
    """
    Wrapper templatetag to allow using `collection_object_parent` with
    just the `ids` of collections.
    """
    content_type = ContentType.objects.get_for_model(obj)
    sccos = SkosCollectionContentObject.objects.filter(
        collection__in=collection_ids, content_type=content_type, object_id=obj.id
    )
    return collection_object_collection(context, obj, sccos)
collection_session_toggle_by_id(context, collection_id)

Provide a checkbox to toggle if a session collection is active. The checkbox calls the CollectionSessionToggle view.

Source code in apis_core/collections/templatetags/collections.py
@register.inclusion_tag(
    "collections/collection_session_toggle.html", takes_context=True
)
def collection_session_toggle_by_id(context, collection_id):
    """
    Provide a checkbox to toggle if a session collection is active.
    The checkbox calls the CollectionSessionToggle view.
    """
    session_collections = context.request.session.get("session_collections", [])
    context["collection"] = SkosCollection.objects.get(pk=collection_id)
    context["enabled"] = collection_id in session_collections
    return context
collection_toggle(context, obj, collection)

Provide a button to add or remove a connection between a collection and an object.

Source code in apis_core/collections/templatetags/collections.py
@register.inclusion_tag("collections/collection_toggle.html", takes_context=True)
def collection_toggle(context, obj, collection):
    """
    Provide a button to add or remove a connection between a
    collection and an object.
    """
    content_type = ContentType.objects.get_for_model(obj)
    context["content_type"] = content_type
    context["object"] = obj
    context["collection"] = collection
    context["exists"] = SkosCollectionContentObject.objects.filter(
        object_id=obj.id, content_type=content_type, collection=collection
    ).exists()
    return context
collection_toggle_by_id(context, obj, collectionid)

Wrapper templatetag to allow using collection_toggle with just the id of the collection.

Source code in apis_core/collections/templatetags/collections.py
@register.inclusion_tag("collections/collection_toggle.html", takes_context=True)
def collection_toggle_by_id(context, obj, collectionid):
    """
    Wrapper templatetag to allow using `collection_toggle`
    with just the `id` of the collection.
    """
    collection = SkosCollection.objects.get(pk=collectionid)
    return collection_toggle(context, obj, collection)

views

CollectionObjectCollection

Bases: LoginRequiredMixin, ContentObjectMixin, TemplateView

Change the requested CollectionObjects collection to point to the collection the current collection is in.

Source code in apis_core/collections/views.py
class CollectionObjectCollection(LoginRequiredMixin, ContentObjectMixin, TemplateView):
    """
    Change the requested CollectionObjects collection to point to the collection the
    current collection is in.
    """

    template_name = "collections/collection_object_collection.html"

    def get_context_data(self, *args, **kwargs):
        collectionobject = get_object_or_404(
            SkosCollectionContentObject, pk=kwargs["collectionobject"]
        )
        parent = collectionobject.collection.parent_collection
        collectionobject.collection = parent
        collectionobject.save()
        context = super().get_context_data(*args, **kwargs)
        context["collectionobject"] = collectionobject
        return context

CollectionSessionToggle

Bases: LoginRequiredMixin, TemplateView

Toggle the existence of an SkosCollection in the session_collections session variable. This can be used in combination with the collections.signals.add_to_session_collection signal, to add objects to a collection if the collections id is listed in the session variable. The equivalent templateatag that calls this view is collections.templatetags.collections.collection_session_toggle_by_id

Source code in apis_core/collections/views.py
class CollectionSessionToggle(LoginRequiredMixin, TemplateView):
    """
    Toggle the existence of an SkosCollection in the `session_collections`
    session variable.
    This can be used in combination with the
    `collections.signals.add_to_session_collection` signal, to add objects
    to a collection if the collections id is listed in the session variable.
    The equivalent templateatag that calls this view is
    `collections.templatetags.collections.collection_session_toggle_by_id`
    """

    template_name = "collections/collection_session_toggle.html"

    def get_context_data(self, *args, **kwargs):
        ctx = super().get_context_data(*args, **kwargs)
        ctx["collection"] = self.skoscollection
        ctx["enabled"] = self.skoscollection.id in self.session_collections
        return ctx

    def get(self, *args, **kwargs):
        self.skoscollection = get_object_or_404(
            SkosCollection, pk=kwargs["skoscollection"]
        )
        self.session_collections = set(
            self.request.session.get("session_collections", [])
        )
        self.session_collections ^= {self.skoscollection.id}
        self.request.session["session_collections"] = list(self.session_collections)
        if redirect_to := self.request.GET.get("to", False):
            return redirect(redirect_to)
        return super().get(*args, **kwargs)

CollectionToggle

Bases: LoginRequiredMixin, ContentObjectMixin, TemplateView

Toggle a collection - if a CollectionObject connecting the requested object and collection does not exist, then create it. If it does exist, delete it.

Source code in apis_core/collections/views.py
class CollectionToggle(LoginRequiredMixin, ContentObjectMixin, TemplateView):
    """
    Toggle a collection - if a CollectionObject connecting the requested object
    and collection does not exist, then create it. If it does exist, delete it.
    """

    template_name = "collections/collection_toggle.html"

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        self.collection = get_object_or_404(SkosCollection, pk=kwargs["collection"])

    def get_context_data(self, *args, **kwargs):
        context = super().get_context_data(*args, **kwargs)
        context["exists"] = self.created
        context["collection"] = self.collection
        return context

    def get(self, *args, **kwargs):
        scco, self.created = SkosCollectionContentObject.objects.get_or_create(
            collection=self.collection,
            content_type=self.content_type,
            object_id=self.object.id,
        )
        if not self.created:
            scco.delete()
        if redirect_to := self.request.GET.get("to", False):
            return redirect(redirect_to)
        return super().get(*args, **kwargs)

ContentObjectMixin

Setup the ContentType and the object used by a view, based on the content_type_id and the object_id arguments passed in the URL.

Source code in apis_core/collections/views.py
class ContentObjectMixin:
    """
    Setup the ContentType and the object used by a view, based on the
    `content_type_id` and the `object_id` arguments passed in the URL.
    """

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        self.content_type = get_object_or_404(ContentType, pk=kwargs["content_type_id"])
        self.object = get_object_or_404(
            self.content_type.model_class(), pk=kwargs["object_id"]
        )

    def get_context_data(self, *args, **kwargs):
        context = super().get_context_data(*args, **kwargs)
        context["content_type"] = self.content_type
        context["object"] = self.object
        return context

generic

abc

GenericModel

Source code in apis_core/generic/abc.py
class GenericModel:
    def __repr__(self):
        if id := getattr(self, "id", None):
            return super().__repr__() + f" (ID: {id})"
        return super().__repr__()

    @property
    def content_type(self):
        return ContentType.objects.get_for_model(self)

    @classmethod
    def get_listview_url(cls):
        ct = ContentType.objects.get_for_model(cls)
        return reverse("apis_core:generic:list", args=[ct])

    @classmethod
    def get_createview_url(cls):
        ct = ContentType.objects.get_for_model(cls)
        return reverse("apis_core:generic:create", args=[ct])

    @classmethod
    def get_importview_url(cls):
        ct = ContentType.objects.get_for_model(cls)
        return reverse("apis_core:generic:import", args=[ct])

    @classmethod
    def get_openapi_tags(cls):
        return [item[-1] for item in mro_paths(cls)]

    @classmethod
    def get_namespace_prefix(cls):
        ct = ContentType.objects.get_for_model(cls)
        return f"{rdf_namespace_prefix()}-{ct.name.lower()}"

    def get_edit_url(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:update", args=[ct, self.id])

    def get_duplicate_url(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:duplicate", args=[ct, self.id])

    def get_enrich_url(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:enrich", args=[ct, self.id])

    def get_absolute_url(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:detail", args=[ct, self.id])

    def get_delete_url(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:delete", args=[ct, self.id])

    def get_merge_url(self, other_id):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])

    def get_select_merge_or_enrich_url(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])

    def get_create_success_url(self):
        return self.get_absolute_url()

    def get_update_success_url(self):
        return self.get_edit_url()

    def get_api_detail_endpoint(self):
        ct = ContentType.objects.get_for_model(self)
        return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])

    @classmethod
    def get_change_permission(self):
        return permission_fullname("change", self)

    @classmethod
    def get_add_permission(self):
        return permission_fullname("add", self)

    @classmethod
    def get_delete_permission(self):
        return permission_fullname("delete", self)

    @classmethod
    def get_view_permission(self):
        return permission_fullname("view", self)

    def get_merge_charfield_value(self, other: CharField, field: CharField):
        res = getattr(self, field.name)
        if not field.choices:
            otherres = getattr(other, field.name, res)
            if otherres != res:
                res += f" ({otherres})"
        return res

    def get_merge_textfield_value(self, other: TextField, field: TextField):
        res = getattr(self, field.name)
        if getattr(other, field.name):
            res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
        return res

    def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
        return getattr(other, field.name)

    def get_field_value_after_merge(self, other, field):
        """
        This method finds the value of a field after merging `other` into `self`.
        It first tries to find a merge method that is specific to that field
        (merge_{fieldname}) and then tries to find a method that is specific to
        the type of the field (merge_{fieldtype})
        If neither of those exist, it uses the others field value if the field
        in self is not set, otherwise it keeps the value in self.
        """
        fieldtype = field.get_internal_type().lower()
        # if there is a `get_merge_{fieldname}` method in this model, use that one
        if callable(getattr(self, f"get_merge_{field.name}_value", None)):
            return getattr(self, f"get_merge_{field.name}_value")(other)
        # otherwise we check if there is a method for the field type and use that one
        elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
            return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
        else:
            if not getattr(self, field.name):
                return getattr(other, field.name)
        return getattr(self, field.name)

    def merge_fields(self, other):
        """
        This method iterates through the model fields and uses the
        `get_field_value_after_merge` method to copy values from `other` to `self`.
        It is called by the `merge_with` method.
        """
        for field in self._meta.fields:
            newval = self.get_field_value_after_merge(other, field)
            if newval != getattr(self, field.name):
                setattr(self, field.name, newval)
        self.save()

    def merge_with(self, entities):
        if self in entities:
            entities.remove(self)
        origin = self.__class__
        pre_merge_with.send(sender=origin, instance=self, entities=entities)

        # TODO: check if these imports can be put to top of module without
        #  causing circular import issues.
        from apis_core.apis_metainfo.models import Uri

        e_a = type(self).__name__
        self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
        if isinstance(entities, int):
            entities = self_model_class.objects.get(pk=entities)
        if not isinstance(entities, list) and not isinstance(entities, QuerySet):
            entities = [entities]
            entities = [
                self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
                for ent in entities
            ]
        for ent in entities:
            e_b = type(ent).__name__
            if e_a != e_b:
                continue
            for f in ent._meta.local_many_to_many:
                if not f.name.endswith("_set"):
                    sl = list(getattr(self, f.name).all())
                    for s in getattr(ent, f.name).all():
                        if s not in sl:
                            getattr(self, f.name).add(s)
            self_content_type = ContentType.objects.get_for_model(self)
            ent_content_type = ContentType.objects.get_for_model(ent)
            Uri.objects.filter(content_type=ent_content_type, object_id=ent.id).update(
                content_type=self_content_type, object_id=self.id
            )

        for ent in entities:
            self.merge_fields(ent)

        post_merge_with.send(sender=origin, instance=self, entities=entities)

        for ent in entities:
            ent.delete()

    def duplicate(self):
        origin = self.__class__
        pre_duplicate.send(sender=origin, instance=self)
        # usually, copying instances would work like
        # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
        # but we are working with abstract classes,
        # so we have to do it by hand  using model_to_dict:(
        objdict = model_to_dict(self)

        # remove unique fields from dict representation
        unique_fields = [field for field in self._meta.fields if field.unique]
        for field in unique_fields:
            logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
            objdict.pop(field.name, None)

        # remove related fields from dict representation
        related_fields = [
            field for field in self._meta.get_fields() if field.is_relation
        ]
        for field in related_fields:
            objdict.pop(field.name, None)

        newobj = type(self).objects.create(**objdict)

        for field in related_fields:
            # we are not using `isinstance` because we want to
            # differentiate between different levels of inheritance
            if type(field) is ForeignKey:
                setattr(newobj, field.name, getattr(self, field.name))
            if type(field) is ManyToManyField:
                objfield = getattr(newobj, field.name)
                values = getattr(self, field.name).all()
                objfield.set(values)

        newobj.save()
        post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
        return newobj

    duplicate.alters_data = True

    def uri_set(self):
        ct = ContentType.objects.get_for_model(self)
        return (
            ContentType.objects.get(app_label="apis_metainfo", model="uri")
            .model_class()
            .objects.filter(content_type=ct, object_id=self.id)
            .all()
        )
get_field_value_after_merge(other, field)

This method finds the value of a field after merging other into self. It first tries to find a merge method that is specific to that field (merge_{fieldname}) and then tries to find a method that is specific to the type of the field (merge_{fieldtype}) If neither of those exist, it uses the others field value if the field in self is not set, otherwise it keeps the value in self.

Source code in apis_core/generic/abc.py
def get_field_value_after_merge(self, other, field):
    """
    This method finds the value of a field after merging `other` into `self`.
    It first tries to find a merge method that is specific to that field
    (merge_{fieldname}) and then tries to find a method that is specific to
    the type of the field (merge_{fieldtype})
    If neither of those exist, it uses the others field value if the field
    in self is not set, otherwise it keeps the value in self.
    """
    fieldtype = field.get_internal_type().lower()
    # if there is a `get_merge_{fieldname}` method in this model, use that one
    if callable(getattr(self, f"get_merge_{field.name}_value", None)):
        return getattr(self, f"get_merge_{field.name}_value")(other)
    # otherwise we check if there is a method for the field type and use that one
    elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
        return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
    else:
        if not getattr(self, field.name):
            return getattr(other, field.name)
    return getattr(self, field.name)
merge_fields(other)

This method iterates through the model fields and uses the get_field_value_after_merge method to copy values from other to self. It is called by the merge_with method.

Source code in apis_core/generic/abc.py
def merge_fields(self, other):
    """
    This method iterates through the model fields and uses the
    `get_field_value_after_merge` method to copy values from `other` to `self`.
    It is called by the `merge_with` method.
    """
    for field in self._meta.fields:
        newval = self.get_field_value_after_merge(other, field)
        if newval != getattr(self, field.name):
            setattr(self, field.name, newval)
    self.save()

api_views

ModelViewSet

Bases: ModelViewSet

API ViewSet for a generic model. The queryset is overridden by the first match from the first_member_match helper. The serializer class is overridden by the first match from the first_member_match helper.

Source code in apis_core/generic/api_views.py
class ModelViewSet(viewsets.ModelViewSet):
    """
    API ViewSet for a generic model.
    The queryset is overridden by the first match from
    the `first_member_match` helper.
    The serializer class is overridden by the first match from
    the `first_member_match` helper.
    """

    filter_backends = [GenericFilterBackend]
    schema = GenericAutoSchema()

    def dispatch(self, *args, **kwargs):
        self.model = kwargs.get("contenttype").model_class()
        return super().dispatch(*args, **kwargs)

    def get_queryset(self):
        queryset_methods = module_paths(
            self.model, path="querysets", suffix="ViewSetQueryset"
        )
        queryset = first_member_match(queryset_methods) or (lambda x: x)
        return queryset(self.model.objects.all())

    def get_serializer_class(self):
        renderer = getattr(getattr(self, "request", {}), "accepted_renderer", None)
        serializer_class_modules = module_paths(
            self.model, path="serializers", suffix="Serializer"
        )
        if renderer is not None:
            prefix = makeclassprefix(renderer.format)
            serializer_class_modules = (
                module_paths(
                    self.model, path="serializers", suffix=f"{prefix}Serializer"
                )
                + serializer_class_modules
            )

        serializer_class = first_member_match(
            serializer_class_modules,
            getattr(renderer, "serializer", GenericHyperlinkedModelSerializer),
        )
        return serializer_factory(self.model, serializer=serializer_class)

filtersets

GenericFilterSet

Bases: FilterSet

Our GenericFilterSet sets the default form to be our GenericFilterSetForm, which is set up to ignore the columns field of the form.

Source code in apis_core/generic/filtersets.py
class GenericFilterSet(FilterSet):
    """
    Our GenericFilterSet sets the default `form` to be our
    GenericFilterSetForm, which is set up to ignore the `columns` field
    of the form.
    """

    class Meta:
        form = GenericFilterSetForm
        # we set the UnknownFieldBehavior to WARN, so the form does not
        # break if there are JSONFields
        unknown_field_behavior = django_filters.UnknownFieldBehavior.WARN

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        model = self._meta.model
        # remove all the filters that are based on auto_created model fields
        for field in model._meta.get_fields():
            if getattr(field, "auto_created", False) and field.name in self.filters:
                del self.filters[field.name]
        try:
            skoscollection = apps.get_model("collections.SkosCollection")
            if skoscollection.objects.exists():
                self.filters["collections"] = CollectionsIncludeExcludeFilter(
                    queryset=skoscollection.objects.all(),
                )
        except LookupError as e:
            logger.debug("Not adding collections filter to form: %s", e)

forms

GenericFilterSetForm

Bases: Form

FilterSet form for generic models Adds a submit button using the django crispy form helper Adds a columns selector that lists all the fields from the model

Source code in apis_core/generic/forms/__init__.py
class GenericFilterSetForm(forms.Form):
    """
    FilterSet form for generic models
    Adds a submit button using the django crispy form helper
    Adds a `columns` selector that lists all the fields from
    the model
    """

    columns_exclude = []

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.helper = FormHelper()
        self.helper.form_method = "GET"
        self.helper.add_input(Submit("submit", "Submit"))

    def clean(self):
        self.cleaned_data = super().clean()
        self.cleaned_data.pop("columns", None)
        return self.cleaned_data

GenericModelForm

Bases: ModelForm

Model form for generic models Adds a submit button using the django crispy form helper and sets the ModelChoiceFields and ModelMultipleChoiceFields to use autocomplete replacement fields

Source code in apis_core/generic/forms/__init__.py
class GenericModelForm(forms.ModelForm):
    """
    Model form for generic models
    Adds a submit button using the django crispy form helper
    and sets the ModelChoiceFields and ModelMultipleChoiceFields
    to use autocomplete replacement fields
    """

    class Meta:
        fields = "__all__"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        try:
            skoscollection = apps.get_model("collections.SkosCollection")
            self.fields["collections"] = forms.ModelMultipleChoiceField(
                required=False, queryset=skoscollection.objects.all()
            )
            if instance := kwargs.get("instance"):
                self.fields["collections"].initial = skoscollection.objects.by_instance(
                    instance
                ).values_list("pk", flat=True)
        except LookupError as e:
            logger.debug("Not adding collections to form: %s", e)

        self.helper = FormHelper(self)
        self.helper.add_input(Submit("submit", "Submit"))

        # override the fields pointing to other models,
        # to make them use the autocomplete widgets
        override_fieldtypes = {
            "ModelMultipleChoiceField": ApisModelSelect2Multiple,
            "ModelChoiceField": ApisModelSelect2,
            "ModelImportChoiceField": ApisModelSelect2,
        }
        for field in self.fields:
            clsname = self.fields[field].__class__.__name__
            if clsname in override_fieldtypes.keys():
                ct = ContentType.objects.get_for_model(
                    self.fields[field]._queryset.model
                )
                if issubclass(ct.model_class(), GenericModel):
                    url = reverse("apis_core:generic:autocomplete", args=[ct])
                    self.fields[field].widget = override_fieldtypes[clsname](
                        url, attrs={"data-html": True}
                    )
                    self.fields[field].widget.choices = self.fields[field].choices

    def save(self, *args, **kwargs):
        instance = super().save(*args, **kwargs)
        try:
            skoscollection = apps.get_model("collections.SkosCollection")
            if collections := self.cleaned_data.get("collections"):
                for collection in skoscollection.objects.exclude(pk__in=collections):
                    collection.remove(instance)
                for collection in skoscollection.objects.filter(pk__in=collections):
                    collection.add(instance)
        except LookupError as e:
            logger.debug("Not creating collections from form: %s", e)
        return instance

fields

RowColumnMultiValueField

Bases: MultiValueField

This is a custom MultiValueField that simply shows multiple form fields in a row. The form fields are passed to the constructor and the corresponding RowColumnMultiWidget simply iterates through all the fields and shows them in rows. Additionaly it is possible to pass a list of labels that are then also passed on to the widget, which uses those to add a separate label to the individual widgets.

Source code in apis_core/generic/forms/fields.py
class RowColumnMultiValueField(MultiValueField):
    """
    This is a custom MultiValueField that simply shows multiple form
    fields in a row. The form fields are passed to the constructor and
    the corresponding RowColumnMultiWidget simply iterates through all
    the fields and shows them in rows.
    Additionaly it is possible to pass a list of `labels` that are then
    also passed on to the widget, which uses those to add a separate
    label to the individual widgets.
    """

    def __init__(self, fields, labels=[], *args, **kwargs):
        kwargs["widget"] = RowColumnMultiWidget(
            widgets=[f.widget for f in fields], labels=labels
        )
        super().__init__(fields, *args, **kwargs)

    def compress(self, data_list):
        return data_list
RowColumnMultiWidget

Bases: MultiWidget

A custom MultiWidget that is meant to be used with the RowColumnMultiValueField. The widget takes a list of widgets as a parameter and displays those widgets in columns in one row. The labels parameter is used to add a separate label to the individual widgets.

Source code in apis_core/generic/forms/fields.py
class RowColumnMultiWidget(MultiWidget):
    """
    A custom MultiWidget that is meant to be used with the
    RowColumnMultiValueField. The widget takes a list of widgets
    as a parameter and displays those widgets in columns in one row.
    The `labels` parameter is used to add a separate label to the
    individual widgets.
    """

    template_name = "widgets/row_column_multiwidget.html"
    use_fieldset = False

    def __init__(self, widgets, labels=[], attrs=None):
        self.labels = labels
        super().__init__(widgets, attrs)

    def get_context(self, name, value, attrs):
        ctx = super().get_context(name, value, attrs)
        for widget in ctx["widget"]["subwidgets"]:
            if self.labels:
                widget["label"] = self.labels.pop(0)
        return ctx

    def decompress(self, value):
        if value:
            return value
        return []

generators

CustomEndpointEnumerator

Bases: EndpointEnumerator

Source code in apis_core/generic/generators.py
class CustomEndpointEnumerator(EndpointEnumerator):
    def _generate_content_type_endpoint(
        self, content_type: ContentType, method: str = "list"
    ):
        """Create a endpoint tuple, usable by the SchemaGenerator of DRF spectacular"""
        path = reverse("apis_core:generic:genericmodelapi-list", args=[content_type])
        cls = resolve(path).func.cls

        if method == "detail":
            path += "{id}/"
        regex = path
        # for now we only do "GET"
        httpmethod = "GET"
        # we have to add a attribute, so that the
        # `initkwargs` argument to the `as_view`
        # method can contain a `model` argument
        cls.model = None
        callback = cls.as_view({"get": method}, model=content_type.model_class())
        return (path, regex, httpmethod, callback)

    def get_api_endpoints(self, patterns=None, prefix=""):
        """
        Call the EndpointEnumerator's `get_api_endpoints` method to get all
        the automatically found endpoints, remove the ones that we want to override
        and the add our custom endpoints to this list.
        """
        api_endpoints = super().get_api_endpoints(patterns, prefix)
        api_endpoints = [
            endpoint
            for endpoint in api_endpoints
            if not endpoint[0].startswith("/apis/api/{contenttype}/")
        ]
        for content_type in ContentType.objects.all():
            if content_type.model_class() is not None and issubclass(
                content_type.model_class(), GenericModel
            ):
                api_endpoints.append(self._generate_content_type_endpoint(content_type))
                api_endpoints.append(
                    self._generate_content_type_endpoint(content_type, "detail")
                )
        return api_endpoints
get_api_endpoints(patterns=None, prefix='')

Call the EndpointEnumerator's get_api_endpoints method to get all the automatically found endpoints, remove the ones that we want to override and the add our custom endpoints to this list.

Source code in apis_core/generic/generators.py
def get_api_endpoints(self, patterns=None, prefix=""):
    """
    Call the EndpointEnumerator's `get_api_endpoints` method to get all
    the automatically found endpoints, remove the ones that we want to override
    and the add our custom endpoints to this list.
    """
    api_endpoints = super().get_api_endpoints(patterns, prefix)
    api_endpoints = [
        endpoint
        for endpoint in api_endpoints
        if not endpoint[0].startswith("/apis/api/{contenttype}/")
    ]
    for content_type in ContentType.objects.all():
        if content_type.model_class() is not None and issubclass(
            content_type.model_class(), GenericModel
        ):
            api_endpoints.append(self._generate_content_type_endpoint(content_type))
            api_endpoints.append(
                self._generate_content_type_endpoint(content_type, "detail")
            )
    return api_endpoints

helpers

default_search_fields(model, field_names=None)

Retrieve the default model fields to use for a search operation By default those are all the CharFields and TextFields of a model. It is also possible to define those fields on the model using the _default_search_fields attribute. The method also takes a field_names argument to override the list of fields.

Source code in apis_core/generic/helpers.py
def default_search_fields(model, field_names=None):
    """
    Retrieve the default model fields to use for a search operation
    By default those are all the CharFields and TextFields of a model.
    It is also possible to define those fields on the model using the
    `_default_search_fields` attribute.
    The method also takes a `field_names` argument to override the list
    of fields.
    """
    default_types = (CharField, TextField)
    fields = [
        field for field in model._meta.get_fields() if isinstance(field, default_types)
    ]
    # check if the model has a `_default_search_fields`
    # list and use that as searchfields
    if isinstance(getattr(model, "_default_search_fields", None), list):
        fields = [
            model._meta.get_field(field) for field in model._default_search_fields
        ]
    # if `fields_to_search` is a list, use that
    if isinstance(field_names, list):
        fields = [model._meta.get_field(field) for field in field_names]
    return fields

generate_search_filter(model, query, fields_to_search=None, prefix='')

Generate a default search filter that searches for the query This helper can be used by autocomplete querysets if nothing fancier is needed. If the prefix is set, the field names will be prefixed with that string - this can be useful if you want to use the generate_search_filter in a Q combined query while searching over multiple models.

Source code in apis_core/generic/helpers.py
def generate_search_filter(model, query, fields_to_search=None, prefix=""):
    """
    Generate a default search filter that searches for the `query`
    This helper can be used by autocomplete querysets if nothing
    fancier is needed.
    If the `prefix` is set, the field names will be prefixed with that string -
    this can be useful if you want to use the `generate_search_filter` in a
    `Q` combined query while searching over multiple models.
    """
    query = query.split()

    _fields_to_search = [
        field.name for field in default_search_fields(model, fields_to_search)
    ]

    q = Q()

    for token in query:
        q &= functools.reduce(
            lambda acc, field_name: acc
            | Q(**{f"{prefix}{field_name}__icontains": token}),
            _fields_to_search,
            Q(),
        )
    return q

mro_paths(model) cached

Create a list of MRO classes for a Django model

Source code in apis_core/generic/helpers.py
@functools.lru_cache
def mro_paths(model):
    """
    Create a list of MRO classes for a Django model
    """
    paths = []
    for cls in filter(lambda x: x not in Model.mro(), model.mro()):
        paths.append(cls.__module__.split(".")[:-1] + [cls.__name__])
    return paths

split_and_strip_parameter(params)

Clean a URI param list type This method iterates through a list of strings. It looks if the items contain a comma separated list of items and then splits those and also runs strip on all those items. So out of ["foo.bar", "bar.faz, faz.foo"] it creates a list ["foo.bar", "bar.faz", "faz.foo"]

Source code in apis_core/generic/helpers.py
def split_and_strip_parameter(params: [str]) -> [str]:
    """
    Clean a URI param list type
    This method iterates through a list of strings. It looks if
    the items contain a comma separated list of items and then splits
    those and also runs strip on all those items.
    So out of ["foo.bar", "bar.faz, faz.foo"] it
    creates a list ["foo.bar", "bar.faz", "faz.foo"]
    """
    newlist = []
    for param in params:
        subparams = map(str.strip, param.split(","))
        newlist.extend(subparams)
    return newlist

string_to_bool(string='false')

Convert a string to a boolean representing its semantic value

Source code in apis_core/generic/helpers.py
def string_to_bool(string: str = "false") -> bool:
    """
    Convert a string to a boolean representing its semantic value
    """
    return string.lower() == "true"

template_names_via_mro(model, suffix='') cached

Use the MRO to generate a list of template names for a model

Source code in apis_core/generic/helpers.py
@functools.lru_cache
def template_names_via_mro(model, suffix=""):
    """
    Use the MRO to generate a list of template names for a model
    """
    mro_prefix_list = ["/".join(prefix) for prefix in mro_paths(model)]
    return [f"{prefix.lower()}{suffix}" for prefix in mro_prefix_list]

importers

GenericModelImporter

A generic importer class It provides the standard methods for importing data from an URI and creating a model instance of it. By default it fetches a resource, first tries to parse it using our rdf parser, if that fails tries to parse it using json and then extracts the fields whose keys match the model field names. Projects can inherit from this class and override the default methods or simple write their own from scratch.

Source code in apis_core/generic/importers.py
class GenericModelImporter:
    """
    A generic importer class
    It provides the standard methods for importing data from
    an URI and creating a model instance of it.
    By default it fetches a resource, first tries to parse it using
    our rdf parser, if that fails tries to parse it using json and
    then extracts the fields whose keys match the model field names.
    Projects can inherit from this class and override the default
    methods or simple write their own from scratch.
    """

    model = None
    import_uri = None

    def __init__(self, uri, model):
        self.model = model
        self.import_uri = self.clean_uri(uri)

    @property
    def get_uri(self):
        return self.import_uri

    def clean_uri(self, uri):
        return get_normalized_uri(uri)

    @cache
    def request(self, uri):
        # we first try to use the RDF parser
        try:
            data = get_something_from_uri(
                uri,
                [self.model],
            )
            return data
        except Exception as e:
            logger.debug(e)
        # if everything else fails, try parsing json
        # if even that does not help, return an empty dict
        try:
            return json.loads(urllib.request.urlopen(uri).read())
        except Exception as e:
            logger.debug(e)
        return {}

    def mangle_data(self, data):
        return data

    def get_data(self, drop_unknown_fields=True):
        """
        fetch the data using the `request` method and
        mangle the data using the `mangle_data` method.
        If the `drop_unknown_fields` argument is true,
        remove all fields from the data dict that do not
        have an equivalent field in the model.
        """
        data = self.request(self.import_uri)
        data = self.mangle_data(data)
        if drop_unknown_fields:
            # we are dropping all fields that are not part of the model
            modelfields = [field.name for field in self.model._meta.fields]
            data = {key: data[key] for key in data if key in modelfields}
        if not data:
            raise ImproperlyConfigured(
                f"Could not import {self.import_uri}. Data fetched was: {data}"
            )
        return data

    def import_into_instance(self, instance, fields="__all__"):
        data = self.get_data()
        if fields == "__all__":
            fields = data.keys()
        for field in fields:
            if hasattr(instance, field) and field in data.keys():
                setattr(instance, field, data[field])
        instance.save()

    def create_instance(self):
        logger.debug("Create instance from URI %s", self.import_uri)
        data = self.get_data(drop_unknown_fields=False)
        instance = None
        same_as = data.get("same_as", [])
        same_as = [get_normalized_uri(uri) for uri in same_as]
        if sa := Uri.objects.filter(uri__in=same_as):
            root_set = set([s.content_object for s in sa])
            if len(root_set) > 1:
                raise IntegrityError(
                    f"Multiple objects found for sameAs URIs {data['same_as']}. "
                    f"This indicates a data integrity problem as these URIs should be unique."
                )
            instance = sa.first().content_object
            logger.debug("Found existing instance %s", instance)
        if not instance:
            attributes = {}
            for field in self.model._meta.fields:
                if data.get(field.name, False):
                    attributes[field.name] = data[field.name][0]
            instance = self.model.objects.create(**attributes)
            logger.debug("Created instance %s from attributes %s", instance, attributes)
        content_type = ContentType.objects.get_for_model(instance)
        for uri in same_as:
            Uri.objects.get_or_create(
                uri=uri, content_type=content_type, object_id=instance.id
            )
        for relation, details in data.get("relations", {}).items():
            rel_app_label, rel_model = relation.split(".")
            relation_model = ContentType.objects.get_by_natural_key(
                app_label=rel_app_label, model=rel_model
            ).model_class()

            reld = details.get("obj", None) or details.get("subj", None)
            reld_app_label, reld_model = reld.split(".")
            related_content_type = ContentType.objects.get_by_natural_key(
                app_label=reld_app_label, model=reld_model
            )
            related_model = related_content_type.model_class()

            for related_uri in details["curies"]:
                related_instance = create_object_from_uri(
                    uri=related_uri, model=related_model
                )
                if details.get("obj"):
                    subj_object_id = instance.pk
                    subj_content_type = content_type
                    obj_object_id = related_instance.pk
                    obj_content_type = related_content_type
                else:
                    obj_object_id = instance.pk
                    obj_content_type = content_type
                    subj_object_id = related_instance.pk
                    subj_content_type = related_content_type
                rel, _ = relation_model.objects.get_or_create(
                    subj_object_id=subj_object_id,
                    subj_content_type=subj_content_type,
                    obj_object_id=obj_object_id,
                    obj_content_type=obj_content_type,
                )
                logger.debug(
                    "Created relation %s between %s and %s",
                    relation_model.name(),
                    rel.subj,
                    rel.obj,
                )
        return instance
get_data(drop_unknown_fields=True)

fetch the data using the request method and mangle the data using the mangle_data method. If the drop_unknown_fields argument is true, remove all fields from the data dict that do not have an equivalent field in the model.

Source code in apis_core/generic/importers.py
def get_data(self, drop_unknown_fields=True):
    """
    fetch the data using the `request` method and
    mangle the data using the `mangle_data` method.
    If the `drop_unknown_fields` argument is true,
    remove all fields from the data dict that do not
    have an equivalent field in the model.
    """
    data = self.request(self.import_uri)
    data = self.mangle_data(data)
    if drop_unknown_fields:
        # we are dropping all fields that are not part of the model
        modelfields = [field.name for field in self.model._meta.fields]
        data = {key: data[key] for key in data if key in modelfields}
    if not data:
        raise ImproperlyConfigured(
            f"Could not import {self.import_uri}. Data fetched was: {data}"
        )
    return data

renderers

GenericRDFBaseRenderer

Bases: BaseRenderer

Base class to render RDF graphs to various formats. This renderer expects the serialized data to either be a rdflib grap or to contain a list of rdflib graphs. If it works with a list of graphs, those are combined to one graph. This graph is then serialized and the result is returned. The serialization format can be set using the rdflib_format attribute. If this is not set, the format attribute of the renderer is used as serialization format (this is the format as it is used by the Django Rest Framework for content negotiation.

Source code in apis_core/generic/renderers.py
class GenericRDFBaseRenderer(renderers.BaseRenderer):
    """
    Base class to render RDF graphs to various formats.
    This renderer expects the serialized data to either be a rdflib grap **or**
    to contain a list of rdflib graphs. If it works with a list of graphs, those
    are combined to one graph.
    This graph is then serialized and the result is returned. The serialization
    format can be set using the `rdflib_format` attribute. If this is not set, the
    `format` attribute of the renderer is used as serialization format (this is the
    format as it is used by the Django Rest Framework for content negotiation.
    """

    format = "ttl"
    rdflib_format = None

    def render(self, data, accepted_media_type=None, renderer_context=None):
        result = Graph()

        match data:
            case {"results": results, **rest}:  # noqa: F841
                # Handle case where data is a dict with multiple graphs
                for graph in results:
                    if isinstance(graph, Graph):
                        # Merge triples
                        for triple in graph:
                            result.add(triple)
                        # Merge namespace bindings
                        for prefix, namespace in graph.namespaces():
                            result.bind(prefix, namespace, override=False)
            case {"detail": detail}:
                raise APIException(detail)
            case Graph():
                # Handle case where data is a single graph
                result = data
                # Ensure namespaces are properly bound in the single graph case
                for prefix, namespace in data.namespaces():
                    result.bind(prefix, namespace, override=False)
            case _:
                raise ValueError(
                    "Invalid data format. Expected rdflib Graph or dict with 'results' key containing graphs"
                )
        serialization_format = self.rdflib_format or self.format
        return result.serialize(format=serialization_format)

routers

CustomAPIRootView

Bases: APIRootView

The default basic root view for CustomDefaultRouter

This view lists the output of the default APIRootView of the Django Rest Framework. In addition, it injects the routes of the genericmodelapi (those are not hardcoded registered but autogenerated based on the models that inherit from GenericModel).

Source code in apis_core/generic/routers.py
class CustomAPIRootView(APIRootView):
    """
    The default basic root view for CustomDefaultRouter

    This view lists the output of the default APIRootView of the Django Rest Framework.
    In addition, it injects the routes of the genericmodelapi (those are not hardcoded
    registered but autogenerated based on the models that inherit from GenericModel).
    """

    def get(self, request, *args, **kwargs):
        response = super().get(request, *args, **kwargs)
        for content_type in ContentType.objects.all():
            if content_type.model_class() is not None and issubclass(
                content_type.model_class(), GenericModel
            ):
                route = "apis_core:generic:genericmodelapi-list"
                response.data[reverse(route, args=[content_type]).strip("/")] = reverse(
                    route,
                    args=[content_type],
                    kwargs=kwargs,
                    request=request,
                    format=kwargs.get("format"),
                )
        return response

CustomDefaultRouter

Bases: DefaultRouter

The CustomDefaultRouter only diverts from the Django Rest Framework DefaultRouter by setting the APIRootView to our CustomAPIRootView.

Source code in apis_core/generic/routers.py
class CustomDefaultRouter(DefaultRouter):
    """
    The CustomDefaultRouter only diverts from the Django Rest Framework DefaultRouter
    by setting the APIRootView to our CustomAPIRootView.
    """

    APIRootView = CustomAPIRootView

schema

GenericAutoSchema

Bases: AutoSchema

Add an option to the default drf_spectacular schema that allows to set the tags of a route via the model.

Source code in apis_core/generic/schema.py
class GenericAutoSchema(AutoSchema):
    """
    Add an option to the default drf_spectacular schema that
    allows to set the tags of a route via the model.
    """

    def get_tags(self) -> list[str]:
        model = getattr(self.view, "model", None)
        if hasattr(model, "get_openapi_tags"):
            return model.get_openapi_tags()
        return super().get_tags()

tables

ActionColumn

Bases: CustomTemplateColumn

A custom template column with some additional attributes for actions.

Source code in apis_core/generic/tables.py
class ActionColumn(CustomTemplateColumn):
    """
    A custom template column with some additional attributes
    for actions.
    """

    orderable = False
    exclude_from_export = True
    verbose_name = ""
    attrs = {"td": {"style": "width:1%;"}}

    def render(self, record, table, *args, **kwargs):
        if permission := getattr(self, "permission", False):
            if not table.request.user.has_perm(permission_fullname(permission, record)):
                return ""
        return super().render(record, table, *args, **kwargs)

CustomTemplateColumn

Bases: TemplateColumn

A custom template column - the tables.TemplateColumn class does not allow to set attributes via class variables. Therefor we use this CustomTemplateColumn to set some arguments based on class attributes and override the attributes in child classes.

Source code in apis_core/generic/tables.py
class CustomTemplateColumn(tables.TemplateColumn):
    """
    A custom template column - the `tables.TemplateColumn` class does not allow
    to set attributes via class variables. Therefor we use this
    CustomTemplateColumn to set some arguments based on class attributes and
    override the attributes in child classes.
    """

    template_name = None
    orderable = None
    exclude_from_export = False
    verbose_name = None

    def __init__(self, *args, **kwargs):
        super().__init__(
            template_name=self.template_name,
            orderable=self.orderable,
            exclude_from_export=self.exclude_from_export,
            verbose_name=self.verbose_name,
            *args,
            **kwargs,
        )

DeleteColumn

Bases: ActionColumn

A column showing a delete button

Source code in apis_core/generic/tables.py
class DeleteColumn(ActionColumn):
    """
    A column showing a delete button
    """

    template_name = "columns/delete.html"
    permission = "delete"

DescriptionColumn

Bases: CustomTemplateColumn

A column showing a model description

Source code in apis_core/generic/tables.py
class DescriptionColumn(CustomTemplateColumn):
    """
    A column showing a model description
    """

    template_name = "columns/description.html"
    orderable = False

EditColumn

Bases: ActionColumn

A column showing an edit button

Source code in apis_core/generic/tables.py
class EditColumn(ActionColumn):
    """
    A column showing an edit button
    """

    template_name = "columns/edit.html"
    permission = "change"

GenericTable

Bases: Table

A generic table that contains an edit button column, a delete button column and a description column

Source code in apis_core/generic/tables.py
class GenericTable(tables.Table):
    """
    A generic table that contains an edit button column, a delete button column
    and a description column
    """

    edit = EditColumn()
    desc = DescriptionColumn()
    delete = DeleteColumn()
    view = ViewColumn()

    class Meta:
        fields = ["id", "desc"]
        sequence = ("...", "view", "edit", "delete")

MoreLessColumn

Bases: TemplateColumn

Useful for displaying long fields. A preview is shown initially with a "Show more" link which is replaced with a "Show less" link when expanded.

Source code in apis_core/generic/tables.py
class MoreLessColumn(tables.TemplateColumn):
    """
    Useful for displaying long fields.
    A preview is shown initially with a "Show more" link
    which is replaced with a "Show less" link when expanded.
    """

    template_name = "columns/more-less.html"

    def __init__(self, preview, fulltext, *args, **kwargs):
        self.preview = preview
        self.fulltext = fulltext
        super().__init__(template_name=self.template_name, *args, **kwargs)

    def render(self, record, **kwargs):
        self.extra_context["preview"] = self.preview(record)
        self.extra_context["fulltext"] = self.fulltext(record)
        return super().render(record, **kwargs)

ViewColumn

Bases: ActionColumn

A column showing a view button

Source code in apis_core/generic/tables.py
class ViewColumn(ActionColumn):
    """
    A column showing a view button
    """

    template_name = "columns/view.html"
    permission = "view"

urls

ContenttypeConverter

A converter that converts from a string representation of a model (app_label.model) to the actual Django model class.

Source code in apis_core/generic/urls.py
class ContenttypeConverter:
    """
    A converter that converts from a string representation of a
    model (`app_label.model`) to the actual Django model class.
    """

    regex = r"\w+\.\w+"

    def to_python(self, value):
        app_label, model = value.split(".")
        contenttype = get_object_or_404(ContentType, app_label=app_label, model=model)
        if issubclass(contenttype.model_class(), GenericModel):
            return contenttype
        raise Http404

    def to_url(self, value):
        if isinstance(value, ContentType):
            return f"{value.app_label}.{value.model}"
        if isinstance(value, str):
            return value

views

Autocomplete

Bases: GenericModelMixin, PermissionRequiredMixin, Select2QuerySetView

Autocomplete view for a generic model. Access requires the <model>_view permission. The queryset is overridden by the first match from the first_member_match helper.

Source code in apis_core/generic/views.py
class Autocomplete(
    GenericModelMixin, PermissionRequiredMixin, autocomplete.Select2QuerySetView
):
    """
    Autocomplete view for a generic model.
    Access requires the `<model>_view` permission.
    The queryset is overridden by the first match from
    the `first_member_match` helper.
    """

    permission_action_required = "view"
    template_name_suffix = "_autocomplete_result"

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        # We use a URI parameter to enable the create functionality in the
        # autocomplete dropdown. It is not important what the value of the
        # `create_field` is, because we use create_object_from_uri anyway.
        self.create_field = self.request.GET.get("create", None)
        try:
            template = select_template(self.get_template_names())
            self.template = template.template.name
        except TemplateDoesNotExist:
            self.template = None

    def get_queryset(self):
        queryset_methods = module_paths(
            self.model, path="querysets", suffix="AutocompleteQueryset"
        )
        queryset = first_member_match(queryset_methods)
        if queryset:
            return queryset(self.model, self.q)
        return self.model.objects.filter(generate_search_filter(self.model, self.q))

    def get_results(self, context):
        external_only = self.kwargs.get("external_only", False)
        results = [] if external_only else super().get_results(context)
        queryset_methods = module_paths(
            self.model, path="querysets", suffix="ExternalAutocomplete"
        )
        ExternalAutocomplete = first_member_match(queryset_methods)
        if ExternalAutocomplete:
            results.extend(ExternalAutocomplete().get_results(self.q))
        return results

    def create_object(self, value):
        """
        We try multiple approaches to create a model instance from a value:
        * we first test if the value is an URL and if so we expect it to be
        something that can be imported using one of the configured importers
        and so we pass the value to the import logic.
        * if the value is not a string, we try to pass it to the `create_from_string`
        method of the model, if that does exist. Its the models responsibility to
        implement this method and the method should somehow know how to create
        model instance from the value...
        * finally we pass the value to the `create_object` method from the DAL
        view, which tries to pass it to `get_or_create` which likely also fails,
        but this is expected and we raise a more useful exception.
        """
        try:
            URLValidator()(value)
            return create_object_from_uri(
                value, self.queryset.model, raise_on_fail=True
            )
        except ValidationError:
            pass
        try:
            return self.queryset.model.create_from_string(value)
        except AttributeError:
            raise ImproperlyConfigured(
                f'Model "{self.queryset.model._meta.verbose_name}" not configured to create from string'
            )

    def post(self, request, *args, **kwargs):
        try:
            return super().post(request, *args, **kwargs)
        except Exception as e:
            return http.JsonResponse({"error": str(e)})
create_object(value)

We try multiple approaches to create a model instance from a value: * we first test if the value is an URL and if so we expect it to be something that can be imported using one of the configured importers and so we pass the value to the import logic. * if the value is not a string, we try to pass it to the create_from_string method of the model, if that does exist. Its the models responsibility to implement this method and the method should somehow know how to create model instance from the value... * finally we pass the value to the create_object method from the DAL view, which tries to pass it to get_or_create which likely also fails, but this is expected and we raise a more useful exception.

Source code in apis_core/generic/views.py
def create_object(self, value):
    """
    We try multiple approaches to create a model instance from a value:
    * we first test if the value is an URL and if so we expect it to be
    something that can be imported using one of the configured importers
    and so we pass the value to the import logic.
    * if the value is not a string, we try to pass it to the `create_from_string`
    method of the model, if that does exist. Its the models responsibility to
    implement this method and the method should somehow know how to create
    model instance from the value...
    * finally we pass the value to the `create_object` method from the DAL
    view, which tries to pass it to `get_or_create` which likely also fails,
    but this is expected and we raise a more useful exception.
    """
    try:
        URLValidator()(value)
        return create_object_from_uri(
            value, self.queryset.model, raise_on_fail=True
        )
    except ValidationError:
        pass
    try:
        return self.queryset.model.create_from_string(value)
    except AttributeError:
        raise ImproperlyConfigured(
            f'Model "{self.queryset.model._meta.verbose_name}" not configured to create from string'
        )

Create

Bases: GenericModelMixin, PermissionRequiredMixin, CreateView

Create view for a generic model. Access requires the <model>_add permission. The form class is overridden by the first match from the first_member_match helper.

Source code in apis_core/generic/views.py
class Create(GenericModelMixin, PermissionRequiredMixin, CreateView):
    """
    Create view for a generic model.
    Access requires the `<model>_add` permission.
    The form class is overridden by the first match from
    the `first_member_match` helper.
    """

    template_name = "generic/generic_form.html"
    permission_action_required = "add"

    def get_form_class(self):
        form_modules = module_paths(self.model, path="forms", suffix="Form")
        form_class = first_member_match(form_modules, GenericModelForm)
        return modelform_factory(self.model, form_class)

    def get_success_url(self):
        return self.object.get_create_success_url()

Delete

Bases: GenericModelMixin, PermissionRequiredMixin, DeleteView

Delete view for a generic model. Access requires the <model>_delete permission.

Source code in apis_core/generic/views.py
class Delete(GenericModelMixin, PermissionRequiredMixin, DeleteView):
    """
    Delete view for a generic model.
    Access requires the `<model>_delete` permission.
    """

    permission_action_required = "delete"

    def get_success_url(self):
        return reverse(
            "apis_core:generic:list",
            args=[self.request.resolver_match.kwargs["contenttype"]],
        )

    def delete(self, *args, **kwargs):
        if "HX-Request" in self.request.headers:
            return (
                reverse_lazy(
                    "apis_core:generic:list",
                    args=[self.request.resolver_match.kwargs["contenttype"]],
                ),
            )
        return super().delete(*args, **kwargs)

Detail

Bases: GenericModelMixin, PermissionRequiredMixin, DetailView

Detail view for a generic model. Access requires the <model>_view permission.

Source code in apis_core/generic/views.py
class Detail(GenericModelMixin, PermissionRequiredMixin, DetailView):
    """
    Detail view for a generic model.
    Access requires the `<model>_view` permission.
    """

    permission_action_required = "view"

Enrich

Bases: GenericModelMixin, PermissionRequiredMixin, FormView

Enrich an entity with data from an external source If so, it uses the proper Importer to get the data from the Uri and provides the user with a form to select the fields that should be updated.

Source code in apis_core/generic/views.py
class Enrich(GenericModelMixin, PermissionRequiredMixin, FormView):
    """
    Enrich an entity with data from an external source
    If so, it uses the proper Importer to get the data from the Uri and
    provides the user with a form to select the fields that should be updated.
    """

    permission_action_required = "change"
    template_name = "generic/generic_enrich.html"
    form_class = GenericEnrichForm
    importer_class = None

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        self.object = get_object_or_404(self.model, pk=self.kwargs["pk"])
        self.uri = self.request.GET.get("uri")
        if not self.uri:
            messages.error(self.request, "No uri parameter specified.")
        self.importer_class = get_importer_for_model(self.model)

    def get(self, *args, **kwargs):
        if self.uri.isdigit():
            return redirect(self.object.get_merge_url(self.uri))
        try:
            uriobj = Uri.objects.get(uri=self.uri)
            if uriobj.object_id != self.object.id:
                messages.info(
                    self.request,
                    f"Object with URI {self.uri} already exists, you were redirected to the merge form.",
                )
                return redirect(self.object.get_merge_url(uriobj.object_id))
        except Uri.DoesNotExist:
            pass
        return super().get(*args, **kwargs)

    def get_context_data(self, **kwargs):
        ctx = super().get_context_data(**kwargs)
        ctx["object"] = self.object
        ctx["uri"] = self.uri
        return ctx

    def get_form_kwargs(self, *args, **kwargs):
        kwargs = super().get_form_kwargs(*args, **kwargs)
        kwargs["instance"] = self.object
        try:
            importer = self.importer_class(self.uri, self.model)
            kwargs["data"] = importer.get_data()
        except ImproperlyConfigured as e:
            messages.error(self.request, e)
        return kwargs

    def form_valid(self, form):
        """
        Go through all the form fields and extract the ones that
        start with `update_` and that are set (those are the checkboxes that
        select which fields to update).
        Then use the importers `import_into_instance` method to set those
        fields values on the model instance.
        """
        update_fields = [
            key.removeprefix("update_")
            for (key, value) in self.request.POST.items()
            if key.startswith("update_") and value
        ]
        importer = self.importer_class(self.uri, self.model)
        importer.import_into_instance(self.object, fields=update_fields)
        messages.info(self.request, f"Updated fields {update_fields}")
        content_type = ContentType.objects.get_for_model(self.model)
        uri, created = Uri.objects.get_or_create(
            uri=self.uri,
            content_type=content_type,
            object_id=self.object.id,
        )
        if created:
            messages.info(self.request, f"Added uri {self.uri} to {self.object}")
        return super().form_valid(form)

    def get_success_url(self):
        return self.object.get_absolute_url()
form_valid(form)

Go through all the form fields and extract the ones that start with update_ and that are set (those are the checkboxes that select which fields to update). Then use the importers import_into_instance method to set those fields values on the model instance.

Source code in apis_core/generic/views.py
def form_valid(self, form):
    """
    Go through all the form fields and extract the ones that
    start with `update_` and that are set (those are the checkboxes that
    select which fields to update).
    Then use the importers `import_into_instance` method to set those
    fields values on the model instance.
    """
    update_fields = [
        key.removeprefix("update_")
        for (key, value) in self.request.POST.items()
        if key.startswith("update_") and value
    ]
    importer = self.importer_class(self.uri, self.model)
    importer.import_into_instance(self.object, fields=update_fields)
    messages.info(self.request, f"Updated fields {update_fields}")
    content_type = ContentType.objects.get_for_model(self.model)
    uri, created = Uri.objects.get_or_create(
        uri=self.uri,
        content_type=content_type,
        object_id=self.object.id,
    )
    if created:
        messages.info(self.request, f"Added uri {self.uri} to {self.object}")
    return super().form_valid(form)

GenericModelMixin

A mixin providing the common functionality for all the views working with generic models - that is models that are accessed via the contenttype framework (using app_label.model). It sets the .model of the view and generates a list of possible template names (based on the MRO of the model). If the view has a permission_action_required attribute, this is used to set the permission required to access the view for this specific model.

Source code in apis_core/generic/views.py
class GenericModelMixin:
    """
    A mixin providing the common functionality for all the views working
    with `generic` models - that is models that are accessed via the
    contenttype framework (using `app_label.model`).
    It sets the `.model` of the view and generates a list of possible template
    names (based on the MRO of the model).
    If the view has a `permission_action_required` attribute, this is used
    to set the permission required to access the view for this specific model.
    """

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        if contenttype := kwargs.get("contenttype"):
            self.model = contenttype.model_class()
            self.queryset = self.model.objects.all()

    def get_template_names(self):
        template_names = []
        if hasattr(super(), "get_template_names"):
            # Some parent classes come with custom template_names,
            # some need a `.template_name` attribute set. For the
            # latter ones we handle the missing `.template_name`
            # gracefully
            try:
                template_names = super().get_template_names()
            except ImproperlyConfigured:
                pass
        suffix = ".html"
        if hasattr(self, "template_name_suffix"):
            suffix = self.template_name_suffix + ".html"
        additional_templates = template_names_via_mro(self.model, suffix) + [
            f"generic/generic{suffix}"
        ]
        template_names += filter(
            lambda template: template not in template_names, additional_templates
        )
        return template_names

    def get_permission_required(self):
        if getattr(self, "permission_action_required", None) == "view" and getattr(
            settings, "APIS_ANON_VIEWS_ALLOWED", False
        ):
            return []
        if hasattr(self, "permission_action_required"):
            return [permission_fullname(self.permission_action_required, self.model)]
        return []

List

Bases: GenericModelMixin, PermissionRequiredMixin, ExportMixin, SingleTableMixin, FilterView

List view for a generic model. Access requires the <model>_view permission. It is based on django-filters FilterView and django-tables SingleTableMixin. The table class is overridden by the first match from the first_member_match helper. The filterset class is overridden by the first match from the first_member_match helper. The queryset is overridden by the first match from the first_member_match helper.

Source code in apis_core/generic/views.py
class List(
    GenericModelMixin,
    PermissionRequiredMixin,
    ExportMixin,
    SingleTableMixin,
    FilterView,
):
    """
    List view for a generic model.
    Access requires the `<model>_view` permission.
    It is based on django-filters FilterView and django-tables SingleTableMixin.
    The table class is overridden by the first match from
    the `first_member_match` helper.
    The filterset class is overridden by the first match from
    the `first_member_match` helper.
    The queryset is overridden by the first match from
    the `first_member_match` helper.
    """

    template_name_suffix = "_list"
    permission_action_required = "view"

    def get_table_class(self):
        table_modules = module_paths(self.model, path="tables", suffix="Table")
        table_class = first_member_match(table_modules, GenericTable)
        return table_factory(self.model, table_class)

    export_formats = getattr(settings, "EXPORT_FORMATS", ["csv", "json"])

    def get_export_filename(self, extension):
        table_class = self.get_table_class()
        if hasattr(table_class, "export_filename"):
            return f"{table_class.export_filename}.{extension}"

        return super().get_export_filename(extension)

    def get_table_kwargs(self):
        kwargs = super().get_table_kwargs()

        # we look at the selected columns and exclude
        # all modelfields that are not part of that list
        selected_columns = self.request.GET.getlist(
            "columns",
            self.get_filterset(self.get_filterset_class()).form["columns"].initial,
        )
        modelfields = self.model._meta.get_fields()
        kwargs["exclude"] = [
            field.name for field in modelfields if field.name not in selected_columns
        ]

        # now we look at the selected columns and
        # add all modelfields and annotated fields that
        # are part of the selected columns to the extra_columns
        annotationfields = list()
        for key, value in self.object_list.query.annotations.items():
            # we have to use copy, so we don't edit the original field
            fake_field = copy(getattr(value, "field", value.output_field))
            setattr(fake_field, "name", key)
            annotationfields.append(fake_field)
        extra_fields = list(
            filter(
                lambda x: x.name in selected_columns,
                modelfields + tuple(annotationfields),
            )
        )
        kwargs["extra_columns"] = [
            (field.name, library.column_for_field(field, accessor=field.name))
            for field in extra_fields
            if field.name not in self.get_table_class().base_columns
        ]

        return kwargs

    def get_filterset_class(self):
        filterset_modules = module_paths(
            self.model, path="filtersets", suffix="FilterSet"
        )
        filterset_class = first_member_match(filterset_modules, GenericFilterSet)
        return filterset_factory(self.model, filterset_class)

    def _get_columns_choices(self, columns_exclude):
        # we start with the model fields
        choices = [
            (field.name, pretty_name(getattr(field, "verbose_name", field.name)))
            for field in self.model._meta.get_fields()
            if not getattr(field, "auto_created", False)
            and not isinstance(field, ManyToManyRel)
        ]
        # we add any annotated fields to that
        choices += [(key, key) for key in self.get_queryset().query.annotations.keys()]
        # now we drop all the choices that are listed in columns_exclude
        choices = list(filter(lambda x: x[0] not in columns_exclude, choices))
        return choices

    def _get_columns_initial(self, columns_exclude):
        return [
            field
            for field in self.get_table().columns.names()
            if field not in columns_exclude
        ]

    def get_filterset(self, filterset_class):
        """
        We override the `get_filterset` method, so we can inject a
        `columns` selector into the form
        """
        filterset = super().get_filterset(filterset_class)
        columns_exclude = filterset.form.columns_exclude

        # we inject a `columns` selector in the beginning of the form
        columns = forms.MultipleChoiceField(
            required=False,
            choices=self._get_columns_choices(columns_exclude),
            initial=self._get_columns_initial(columns_exclude),
        )
        filterset.form.fields = {**{"columns": columns}, **filterset.form.fields}

        return filterset

    def get_queryset(self):
        queryset_methods = module_paths(
            self.model, path="querysets", suffix="ListViewQueryset"
        )
        queryset = first_member_match(queryset_methods) or (lambda x: x)
        return queryset(self.model.objects.all())

    def get_table_pagination(self, table):
        """
        Override `get_table_pagination` from the tables2 TableMixinBase,
        so we can set the paginate_by and the table_pagination value as attribute of the table.
        """
        self.paginate_by = getattr(table, "paginate_by", None)
        self.table_pagination = getattr(table, "table_pagination", None)
        return super().get_table_pagination(table)
get_filterset(filterset_class)

We override the get_filterset method, so we can inject a columns selector into the form

Source code in apis_core/generic/views.py
def get_filterset(self, filterset_class):
    """
    We override the `get_filterset` method, so we can inject a
    `columns` selector into the form
    """
    filterset = super().get_filterset(filterset_class)
    columns_exclude = filterset.form.columns_exclude

    # we inject a `columns` selector in the beginning of the form
    columns = forms.MultipleChoiceField(
        required=False,
        choices=self._get_columns_choices(columns_exclude),
        initial=self._get_columns_initial(columns_exclude),
    )
    filterset.form.fields = {**{"columns": columns}, **filterset.form.fields}

    return filterset
get_table_pagination(table)

Override get_table_pagination from the tables2 TableMixinBase, so we can set the paginate_by and the table_pagination value as attribute of the table.

Source code in apis_core/generic/views.py
def get_table_pagination(self, table):
    """
    Override `get_table_pagination` from the tables2 TableMixinBase,
    so we can set the paginate_by and the table_pagination value as attribute of the table.
    """
    self.paginate_by = getattr(table, "paginate_by", None)
    self.table_pagination = getattr(table, "table_pagination", None)
    return super().get_table_pagination(table)

MergeWith

Bases: GenericModelMixin, PermissionRequiredMixin, FormView

Generic merge view.

Source code in apis_core/generic/views.py
class MergeWith(GenericModelMixin, PermissionRequiredMixin, FormView):
    """
    Generic merge view.
    """

    permission_action_required = "change"
    form_class = GenericMergeWithForm
    template_name = "generic/generic_merge.html"

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        self.object = get_object_or_404(self.model, pk=self.kwargs["pk"])
        self.other = get_object_or_404(self.model, pk=self.kwargs["otherpk"])

    def get_context_data(self, **kwargs):
        """
        The context consists of the two objects that are merged as well
        as a list of changes. Those changes are presented in the view as
        a table with diffs
        """
        Change = namedtuple("Change", "field old new")
        ctx = super().get_context_data(**kwargs)
        ctx["changes"] = []
        for field in self.object._meta.fields:
            newval = self.object.get_field_value_after_merge(self.other, field)
            ctx["changes"].append(
                Change(field.verbose_name, getattr(self.object, field.name), newval)
            )
        ctx["object"] = self.object
        ctx["other"] = self.other
        return ctx

    def form_valid(self, form):
        self.object.merge_with([self.other])
        messages.info(self.request, f"Merged values of {self.other} into {self.object}")
        return super().form_valid(form)

    def get_success_url(self):
        return self.object.get_absolute_url()
get_context_data(**kwargs)

The context consists of the two objects that are merged as well as a list of changes. Those changes are presented in the view as a table with diffs

Source code in apis_core/generic/views.py
def get_context_data(self, **kwargs):
    """
    The context consists of the two objects that are merged as well
    as a list of changes. Those changes are presented in the view as
    a table with diffs
    """
    Change = namedtuple("Change", "field old new")
    ctx = super().get_context_data(**kwargs)
    ctx["changes"] = []
    for field in self.object._meta.fields:
        newval = self.object.get_field_value_after_merge(self.other, field)
        ctx["changes"].append(
            Change(field.verbose_name, getattr(self.object, field.name), newval)
        )
    ctx["object"] = self.object
    ctx["other"] = self.other
    return ctx

SelectMergeOrEnrich

Bases: GenericModelMixin, PermissionRequiredMixin, FormView

This view provides a simple form that allows to select other entities (also from external sources, if set up) and on form submit redirects to the Enrich view.

Source code in apis_core/generic/views.py
class SelectMergeOrEnrich(GenericModelMixin, PermissionRequiredMixin, FormView):
    """
    This view provides a simple form that allows to select other entities (also from
    external sources, if set up) and on form submit redirects to the Enrich view.
    """

    template_name_suffix = "_selectmergeorenrich"
    permission_action_required = "create"
    form_class = GenericSelectMergeOrEnrichForm

    def get_object(self, *args, **kwargs):
        return get_object_or_404(self.model, pk=self.kwargs.get("pk"))

    def get_context_data(self, *args, **kwargs):
        context = super().get_context_data(*args, **kwargs)
        context["object"] = self.get_object()
        return context

    def get_form_kwargs(self, *args, **kwargs):
        kwargs = super().get_form_kwargs(*args, **kwargs)
        kwargs["instance"] = self.get_object()
        return kwargs

Update

Bases: GenericModelMixin, PermissionRequiredMixin, UpdateView

Update view for a generic model. Access requires the <model>_change permission. The form class is overridden by the first match from the first_member_match helper.

Source code in apis_core/generic/views.py
class Update(GenericModelMixin, PermissionRequiredMixin, UpdateView):
    """
    Update view for a generic model.
    Access requires the `<model>_change` permission.
    The form class is overridden by the first match from
    the `first_member_match` helper.
    """

    permission_action_required = "change"

    def get_form_class(self):
        form_modules = module_paths(self.model, path="forms", suffix="Form")
        form_class = first_member_match(form_modules, GenericModelForm)
        return modelform_factory(self.model, form_class)

    def get_success_url(self):
        return self.object.get_update_success_url()

history

models

APISHistoricalRecords

Bases: HistoricalRecords, GenericModel

Source code in apis_core/history/models.py
class APISHistoricalRecords(HistoricalRecords, GenericModel):
    def get_m2m_fields_from_model(self, model):
        # Change the original simple history function to also return m2m fields
        m2m_fields = []
        try:
            for field in inspect.getmembers(model):
                if isinstance(
                    field[1],
                    django.db.models.fields.related_descriptors.ManyToManyDescriptor,
                ):
                    m2m_fields.append(getattr(model, field[0]).field)
        except AppRegistryNotReady:
            pass
        return m2m_fields

    def get_prev_record(self):
        """
        Get the previous history record for the instance. `None` if first.
        """
        history = utils.get_history_manager_from_history(self)
        return (
            history.filter(history_date__lt=self.history_date)
            .order_by("history_date")
            .first()
        )
get_prev_record()

Get the previous history record for the instance. None if first.

Source code in apis_core/history/models.py
def get_prev_record(self):
    """
    Get the previous history record for the instance. `None` if first.
    """
    history = utils.get_history_manager_from_history(self)
    return (
        history.filter(history_date__lt=self.history_date)
        .order_by("history_date")
        .first()
    )

tables

DescriptionColumnHistory

Bases: CustomTemplateColumn

A column showing a model description

Source code in apis_core/history/tables.py
class DescriptionColumnHistory(CustomTemplateColumn):
    """
    A column showing a model description
    """

    template_name = "history/columns/description.html"
    orderable = False

OriginalIDColumn

Bases: CustomTemplateColumn

A column showing the original id of a model instance

Source code in apis_core/history/tables.py
class OriginalIDColumn(CustomTemplateColumn):
    """
    A column showing the original id of a model instance
    """

    template_name = "history/columns/original_id.html"
    orderable = False
    verbose_name = "Original ID"

tests

test_simple_history

SimpleHistoryTestCase

Bases: TestCase

Test of the simple_history package using the demo project

Source code in apis_core/history/tests/test_simple_history.py
class SimpleHistoryTestCase(TestCase):
    """Test of the simple_history package using the demo project"""

    def setUp(self):
        self.Person = Person
        self.Place = Place
        self.Profession = Profession

        Person.objects.create(forename="John", surname="Doe")
        Place.objects.create(
            label="Steyr", _history_date=datetime.now() - timedelta(hours=0, minutes=50)
        )

    def test_history(self):
        """Tests the simple version of attributes changed on a models instance."""
        pers = self.Person.objects.get(forename="John")
        pers.forename = "Jane"
        pers.save()
        self.assertEqual(pers.forename, "Jane")
        pers_history = pers.history.all()
        assert len(pers_history) == 2
        self.assertEqual("Jane", pers.history.most_recent().forename)
        self.assertEqual("John", pers.history.earliest().forename)

    def test_history_tag(self):
        """Tests the version tag function."""
        pers = self.Person.objects.get(forename="John")
        pers.history.latest().set_version_tag("test_tag")
        self.assertEqual(pers.history.latest().version_tag, "test_tag")
        pers_history = pers.history.all()
        self.assertEqual(pers_history.count(), 1)
        self.assertEqual(pers_history[0].version_tag, "test_tag")
        # test with TemTriple
        pers2 = self.Person.objects.create(forename="Jane", surname="Doe")
        pers2.history.latest().set_version_tag("test_tag")
        self.assertEqual(pers2.history.latest().version_tag, "test_tag")

    def test_history_delete_entry(self):
        """Tests the deletion of an entry."""
        pers = self.Person.objects.get(forename="John")
        pers.delete()
        assert len(self.Person.history.all()) == 2

    def test_history_merge(self):
        """Tests the merge function of the Place model. This is still expected to fail."""
        pl1 = self.Place.objects.first()
        pl2 = self.Place.objects.create(
            label="Test", _history_date=datetime.now() - timedelta(hours=0, minutes=10)
        )
        pl1.merge_with([pl2])
        print("save()")

    def test_m2m_save(self):
        """Test if m2m profession is saved correctly."""
        pers = self.Person.objects.create(
            forename="John",
            _history_date=datetime.now() - timedelta(hours=0, minutes=10),
        )
        self.assertEqual(pers.forename, "John")
        pers_history = pers.history.all()
        assert len(pers_history) == 1
        prof = self.Profession.objects.create(name="Test")
        pers.profession.add(prof)
        pers.forename = "Jane"
        pers.save()
        assert len(pers.profession.all()) == 1
        assert len(pers.history.latest().profession.all()) == 1
        assert len(pers.history.earliest().profession.all()) == 0

    def test_history_date(self):
        """Test that history is set correctly when not manually set."""
        pers = self.Person.objects.all().first()
        pers.forename = "Jane"
        pers.save()
        assert pers.history.earliest().history_date < pers.history.latest().history_date
test_history()

Tests the simple version of attributes changed on a models instance.

Source code in apis_core/history/tests/test_simple_history.py
def test_history(self):
    """Tests the simple version of attributes changed on a models instance."""
    pers = self.Person.objects.get(forename="John")
    pers.forename = "Jane"
    pers.save()
    self.assertEqual(pers.forename, "Jane")
    pers_history = pers.history.all()
    assert len(pers_history) == 2
    self.assertEqual("Jane", pers.history.most_recent().forename)
    self.assertEqual("John", pers.history.earliest().forename)
test_history_date()

Test that history is set correctly when not manually set.

Source code in apis_core/history/tests/test_simple_history.py
def test_history_date(self):
    """Test that history is set correctly when not manually set."""
    pers = self.Person.objects.all().first()
    pers.forename = "Jane"
    pers.save()
    assert pers.history.earliest().history_date < pers.history.latest().history_date
test_history_delete_entry()

Tests the deletion of an entry.

Source code in apis_core/history/tests/test_simple_history.py
def test_history_delete_entry(self):
    """Tests the deletion of an entry."""
    pers = self.Person.objects.get(forename="John")
    pers.delete()
    assert len(self.Person.history.all()) == 2
test_history_merge()

Tests the merge function of the Place model. This is still expected to fail.

Source code in apis_core/history/tests/test_simple_history.py
def test_history_merge(self):
    """Tests the merge function of the Place model. This is still expected to fail."""
    pl1 = self.Place.objects.first()
    pl2 = self.Place.objects.create(
        label="Test", _history_date=datetime.now() - timedelta(hours=0, minutes=10)
    )
    pl1.merge_with([pl2])
    print("save()")
test_history_tag()

Tests the version tag function.

Source code in apis_core/history/tests/test_simple_history.py
def test_history_tag(self):
    """Tests the version tag function."""
    pers = self.Person.objects.get(forename="John")
    pers.history.latest().set_version_tag("test_tag")
    self.assertEqual(pers.history.latest().version_tag, "test_tag")
    pers_history = pers.history.all()
    self.assertEqual(pers_history.count(), 1)
    self.assertEqual(pers_history[0].version_tag, "test_tag")
    # test with TemTriple
    pers2 = self.Person.objects.create(forename="Jane", surname="Doe")
    pers2.history.latest().set_version_tag("test_tag")
    self.assertEqual(pers2.history.latest().version_tag, "test_tag")
test_m2m_save()

Test if m2m profession is saved correctly.

Source code in apis_core/history/tests/test_simple_history.py
def test_m2m_save(self):
    """Test if m2m profession is saved correctly."""
    pers = self.Person.objects.create(
        forename="John",
        _history_date=datetime.now() - timedelta(hours=0, minutes=10),
    )
    self.assertEqual(pers.forename, "John")
    pers_history = pers.history.all()
    assert len(pers_history) == 1
    prof = self.Profession.objects.create(name="Test")
    pers.profession.add(prof)
    pers.forename = "Jane"
    pers.save()
    assert len(pers.profession.all()) == 1
    assert len(pers.history.latest().profession.all()) == 1
    assert len(pers.history.earliest().profession.all()) == 0

views

create_new_version(request, contenttype, pk)

Gets the version of the history instance and creates a new version.

Source code in apis_core/history/views.py
def create_new_version(request, contenttype, pk):
    """Gets the version of the history instance and creates a new version."""
    model = contenttype.model_class()
    instance = model.objects.get(id=pk)
    history_latest = instance.history.latest()
    latest_version_list = [
        int(x.replace("v", ""))
        for x in instance.history.filter(version_tag__isnull=False).values_list(
            "version_tag", flat=True
        )
    ]
    history_latest.history_id = None
    history_latest.history_date = timezone.now()
    history_latest.save()
    if latest_version_list:
        latest_version = max(latest_version_list)
    else:
        latest_version = 0
    history_latest.set_version_tag(f"v{latest_version + 1}")
    return redirect(
        reverse(
            "apis_core:generic:detail",
            args=[
                ContentType.objects.get_for_model(history_latest.__class__),
                history_latest.history_id,
            ],
        )
    )

relations

filtersets

EntityFilter

Bases: CharFilter

Custom CharFilter that uses the generate_search_filter helper to search in all instances inheriting from RootObject and then uses those results to only list relations that point to one of the results.

Source code in apis_core/relations/filtersets.py
class EntityFilter(CharFilter):
    """
    Custom CharFilter that uses the generate_search_filter helper
    to search in all instances inheriting from RootObject and then
    uses those results to only list relations that point to one of
    the results.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.extra["help_text"] = "Searches in subclasses of RootObject"

    def _search_all_entities(self, value) -> list[str]:
        q = Q()
        for content_type in get_all_relation_subj_and_obj():
            name = content_type.model
            q |= Q(**{f"{name}__isnull": False}) & generate_search_filter(
                content_type.model_class(), value, prefix=f"{name}__"
            )
        return RootObject.objects_inheritance.filter(q).values_list("pk", flat=True)

    def filter(self, qs, value):
        if value:
            all_entities = self._search_all_entities(value)
            return qs.filter(**{f"{self.field_name}_object_id__in": all_entities})
        return qs

RelationFilterSet

Bases: GenericFilterSet

Override the GenericFilterSet that is created for the Relation model. It does not really make sense to filter for content type id and object id, so we exclude those. Instead, we add a multiple choice filter for object and subject class, that only lists those choices that actually exists (meaning the classes that are actually set as subj or obj in some relation). Additionaly, we add a search filter, that searches in instances connected to relations (this does only work for instances inheriting from RootObject).

Source code in apis_core/relations/filtersets.py
class RelationFilterSet(GenericFilterSet):
    """
    Override the GenericFilterSet that is created for the Relation model.
    It does not really make sense to filter for content type id and object id,
    so we exclude those.
    Instead, we add a multiple choice filter for object and subject class, that
    only lists those choices that actually exists (meaning the classes that are
    actually set as subj or obj in some relation).
    Additionaly, we add a search filter, that searches in instances connected
    to relations (this does only work for instances inheriting from RootObject).
    """

    subj_search = EntityFilter(
        field_name="subj",
        label="Subject search",
    )
    obj_search = EntityFilter(
        field_name="obj",
        label="Object search",
    )

    class Meta:
        exclude = [
            "subj_object_id",
            "subj_content_type",
            "obj_object_id",
            "obj_content_type",
        ]
        form = RelationFilterSetForm

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if model := getattr(self.Meta, "model", False):
            all_models = [ct.model_class() for ct in get_all_relation_subj_and_obj()]
            subj_models = getattr(model, "subj_model", all_models)
            obj_models = getattr(model, "obj_model", all_models)
            if isinstance(subj_models, list):
                self.filters["subj_class"] = SubjObjClassFilter(
                    field_name="subj", label="Subject class", models=subj_models
                )
            if isinstance(obj_models, list):
                self.filters["obj_class"] = SubjObjClassFilter(
                    field_name="obj", label="Object Class", models=obj_models
                )

SubjObjClassFilter

Bases: MultipleChoiceFilter

Custom MultipleChoiceFilter * it lists model classes as choices, that are in some way connected to a relation, either as subj or as obj * it filters relations by the content types of the connected subjects and objects

Source code in apis_core/relations/filtersets.py
class SubjObjClassFilter(MultipleChoiceFilter):
    """
    Custom MultipleChoiceFilter
    * it lists model classes as choices, that are in some way
      connected to a relation, either as subj or as obj
    * it filters relations by the content types of the connected
      subjects and objects
    """

    def __init__(self, models, *args, **kwargs):
        super().__init__(*args, **kwargs)
        content_types = [ContentType.objects.get_for_model(model) for model in models]
        self.extra["choices"] = [(item.id, item.name) for item in content_types]

    def filter(self, qs, value: list[str] | None):
        # value is the list of contenttypes ids
        if value:
            return qs.filter(Q(**{f"{self.field_name}_content_type__in": value}))
        return qs

forms

CustomSelect2ListChoiceField

Bases: Select2ListChoiceField

We use a custom Select2ListChoiceField in our Relation form, because we don't want the form value to be validated. The field uses the choices setting as a basis for validating, but our choices span over multiple querysets, so its easier to simply not validate (some validation happens in the clean method anyway).

Source code in apis_core/relations/forms/__init__.py
class CustomSelect2ListChoiceField(autocomplete.Select2ListChoiceField):
    """
    We use a custom Select2ListChoiceField in our Relation form,
    because we don't want the form value to be validated. The field
    uses the `choices` setting as a basis for validating, but our
    choices span over multiple querysets, so its easier to simply not
    validate (some validation happens in the `clean` method anyway).
    """

    def validate(self, value):
        if "_" not in value:
            raise ValidationError("please choose a correct value")

RelationFilterSetForm

Bases: GenericFilterSetForm

FilterSet form for relations based on GenericFilterSetForm. Excludes relation_ptr from the columns selector

Source code in apis_core/relations/forms/__init__.py
class RelationFilterSetForm(GenericFilterSetForm):
    """
    FilterSet form for relations based on GenericFilterSetForm.
    Excludes `relation_ptr` from the columns selector
    """

    columns_exclude = ["relation_ptr"]

RelationForm

Bases: GenericModelForm

This form overrides generic form for relation editing. Relations have generic relations to subj and obj, but we hide those ForeignKey form fields and instead show autocomplete choices fields. In addition, one can pass a hx_post_route argument to the form to make the form set the hx-post attribute to the given value. We also pass a reverse boolean, wich gets passed on to the htmx POST endpoint using url parameters (the endpoint can then select the success_url based on the reverse state).

Source code in apis_core/relations/forms/__init__.py
class RelationForm(GenericModelForm):
    """
    This form overrides generic form for relation editing.
    Relations have generic relations to subj and obj, but we
    hide those ForeignKey form fields and instead show
    autocomplete choices fields.
    In addition, one can pass a hx_post_route argument to the
    form to make the form set the `hx-post` attribute to the
    given value.
    We also pass a `reverse` boolean, wich gets passed on
    to the htmx POST endpoint using url parameters (the endpoint
    can then select the success_url based on the `reverse` state).
    """

    class Media:
        js = ["js/relation_dialog.js"]

    class Meta:
        fields = "__all__"
        widgets = {
            "subj_content_type": forms.HiddenInput(),
            "subj_object_id": forms.HiddenInput(),
            "obj_content_type": forms.HiddenInput(),
            "obj_object_id": forms.HiddenInput(),
        }

    def __entities_autocomplete_with_params(self, content_types=[ContentType]) -> str:
        """helper method to generate the entities autocomplete url with contentype parameters"""
        url = reverse("apis_core:apis_entities:autocomplete")
        params = [f"entities={ct.app_label}.{ct.model}" for ct in content_types]
        return url + "?" + "&".join(params)

    def __subj_autocomplete_url(self) -> str:
        """generate the autocomplete url for the subj field, using the subject
        types configured in the relation"""
        subj_content_types = [
            ContentType.objects.get_for_model(model)
            for model in self.Meta.model.subj_list()
        ]
        return self.__entities_autocomplete_with_params(subj_content_types)

    def __obj_autocomplete_url(self) -> str:
        """generate the autocomplete url for the obj field, using the object
        types configured in the relation"""
        obj_content_types = [
            ContentType.objects.get_for_model(model)
            for model in self.Meta.model.obj_list()
        ]
        return self.__entities_autocomplete_with_params(obj_content_types)

    def __init__(self, *args, **kwargs):
        """
        Initialize the form and add the `subj` and `obj` fields using the
        generic apis_entities autocomplete with the correct parameters.
        """

        self.params = kwargs.pop("params", {})
        # workaround: if there is only one possible subj or obj type
        # we use that as subj_content_type or obj_content_type, which
        # lets us use another endpoint for autocomplete
        # this can be removed when we stop allowing multiple types
        # for relation subjects or objects
        initial = kwargs["initial"]
        if (
            initial.get("subj_content_type", None) is None
            and len(self.Meta.model.subj_list()) == 1
        ):
            kwargs["initial"]["subj_content_type"] = ContentType.objects.get_for_model(
                self.Meta.model.subj_list()[0]
            ).id
        if (
            initial.get("obj_content_type", None) is None
            and len(self.Meta.model.obj_list()) == 1
        ):
            kwargs["initial"]["obj_content_type"] = ContentType.objects.get_for_model(
                self.Meta.model.obj_list()[0]
            ).id
        super().__init__(*args, **kwargs)
        subj_content_type = kwargs["initial"].get("subj_content_type", None)
        subj_object_id = kwargs["initial"].get("subj_object_id", None)
        obj_content_type = kwargs["initial"].get("obj_content_type", None)
        obj_object_id = kwargs["initial"].get("obj_object_id", None)

        self.subj_instance, self.obj_instance = None, None
        if instance := kwargs.get("instance"):
            self.subj_instance = instance.subj
            self.obj_instance = instance.obj
            subj_object_id = instance.subj.id
            obj_object_id = instance.obj.id
        else:
            if subj_content_type and subj_object_id:
                model = get_object_or_404(ContentType, pk=subj_content_type)
                self.subj_instance = get_object_or_404(
                    model.model_class(), pk=subj_object_id
                )
            if obj_content_type and obj_object_id:
                model = get_object_or_404(ContentType, pk=obj_content_type)
                self.obj_instance = get_object_or_404(
                    model.model_class(), pk=obj_object_id
                )

        self.fields["subj_object_id"].required = False
        self.fields["subj_content_type"].required = False
        if not subj_object_id:
            if subj_content_type:
                """ If we know the content type the subject will have, we
                use another autocomplete field that allows us to paste links
                and provides external autocomplete results.
                """
                ct = ContentType.objects.get(pk=subj_content_type)
                self.fields["subj"] = ModelImportChoiceField(
                    queryset=ct.model_class().objects.all()
                )
                self.fields["subj"].widget = ApisListSelect2(
                    attrs={"data-html": True},
                    url=reverse("apis_core:generic:autocomplete", args=[ct])
                    + "?create=True",
                )
                self.fields["subj"].widget.choices = self.fields["subj"].choices
            else:
                """ If we don't know the content type, we use a generic autocomplete
                field that autocompletes any content type the relation can have as a
                subject.
                """
                self.fields["subj_ct_and_id"] = CustomSelect2ListChoiceField()
                self.fields["subj_ct_and_id"].widget = ApisListSelect2(
                    attrs={"data-html": True},
                    url=self.__subj_autocomplete_url(),
                )
                if self.subj_instance:
                    content_type = ContentType.objects.get_for_model(self.subj_instance)
                    select_identifier = f"{content_type.id}_{self.subj_instance.id}"
                    self.fields["subj_ct_and_id"].initial = select_identifier
                    self.fields["subj_ct_and_id"].choices = [
                        (select_identifier, self.subj_instance)
                    ]

        self.fields["obj_object_id"].required = False
        self.fields["obj_content_type"].required = False
        if not obj_object_id:
            if obj_content_type:
                """ If we know the content type the object will have, we
                use another autocomplete field that allows us to paste links
                and provides external autocomplete results.
                """
                ct = ContentType.objects.get(pk=obj_content_type)
                self.fields["obj"] = ModelImportChoiceField(
                    queryset=ct.model_class().objects.all()
                )
                self.fields["obj"].widget = ApisListSelect2(
                    attrs={"data-html": True},
                    url=reverse("apis_core:generic:autocomplete", args=[ct])
                    + "?create=True",
                )
                self.fields["obj"].widget.choices = self.fields["obj"].choices
            else:
                """ If we don't know the content type, we use a generic autocomplete
                field that autocompletes any content type the relation can have as a
                object.
                """
                self.fields["obj_ct_and_id"] = CustomSelect2ListChoiceField()
                self.fields["obj_ct_and_id"].widget = ApisListSelect2(
                    attrs={"data-html": True},
                    url=self.__obj_autocomplete_url(),
                )
                if self.obj_instance:
                    content_type = ContentType.objects.get_for_model(self.obj_instance)
                    select_identifier = f"{content_type.id}_{self.obj_instance.id}"
                    self.fields["obj_ct_and_id"].initial = select_identifier
                    self.fields["obj_ct_and_id"].choices = [
                        (select_identifier, self.obj_instance)
                    ]

        self.order_fields(self.field_order)
        self.helper = FormHelper(self)
        model_ct = ContentType.objects.get_for_model(self.Meta.model)
        self.helper.form_id = f"relation_{model_ct.model}_form"
        self.helper.add_input(Submit("submit", "Submit"))

        if hx_post_route := self.params.get("hx_post_route", False):
            self.helper.attrs = {
                "hx-post": hx_post_route + "?" + urlencode(self.params, doseq=True),
                "hx-swap": "outerHTML",
                "hx-target": f"#{self.helper.form_id}",
            }

    def clean(self) -> dict:
        """
        We check if there are `subj` or `obj` fields in the form data
        and if so, we use the data to create objects for the real fields of
        the Relation
        """
        cleaned_data = super().clean()
        if "subj_ct_and_id" in cleaned_data:
            subj_content_type, subj_object_id = cleaned_data["subj_ct_and_id"].split(
                "_"
            )
            cleaned_data["subj_content_type"] = ContentType.objects.get(
                pk=subj_content_type
            )
            cleaned_data["subj_object_id"] = subj_object_id
            del cleaned_data["subj_ct_and_id"]
        if "obj_ct_and_id" in cleaned_data:
            obj_content_type, obj_object_id = cleaned_data["obj_ct_and_id"].split("_")
            cleaned_data["obj_content_type"] = ContentType.objects.get(
                pk=obj_content_type
            )
            cleaned_data["obj_object_id"] = obj_object_id
            del cleaned_data["obj_ct_and_id"]
        if "subj" in cleaned_data:
            cleaned_data["subj_object_id"] = cleaned_data.pop("subj").id
        if "obj" in cleaned_data:
            cleaned_data["obj_object_id"] = cleaned_data.pop("obj").id
        return cleaned_data

    @property
    def relation_name(self) -> str:
        """A helper method to access the correct name of the relation"""
        if self.params["reverse"]:
            return self._meta.model.reverse_name
        return self._meta.model.name
relation_name property

A helper method to access the correct name of the relation

__entities_autocomplete_with_params(content_types=[ContentType])

helper method to generate the entities autocomplete url with contentype parameters

Source code in apis_core/relations/forms/__init__.py
def __entities_autocomplete_with_params(self, content_types=[ContentType]) -> str:
    """helper method to generate the entities autocomplete url with contentype parameters"""
    url = reverse("apis_core:apis_entities:autocomplete")
    params = [f"entities={ct.app_label}.{ct.model}" for ct in content_types]
    return url + "?" + "&".join(params)
__init__(*args, **kwargs)

Initialize the form and add the subj and obj fields using the generic apis_entities autocomplete with the correct parameters.

Source code in apis_core/relations/forms/__init__.py
def __init__(self, *args, **kwargs):
    """
    Initialize the form and add the `subj` and `obj` fields using the
    generic apis_entities autocomplete with the correct parameters.
    """

    self.params = kwargs.pop("params", {})
    # workaround: if there is only one possible subj or obj type
    # we use that as subj_content_type or obj_content_type, which
    # lets us use another endpoint for autocomplete
    # this can be removed when we stop allowing multiple types
    # for relation subjects or objects
    initial = kwargs["initial"]
    if (
        initial.get("subj_content_type", None) is None
        and len(self.Meta.model.subj_list()) == 1
    ):
        kwargs["initial"]["subj_content_type"] = ContentType.objects.get_for_model(
            self.Meta.model.subj_list()[0]
        ).id
    if (
        initial.get("obj_content_type", None) is None
        and len(self.Meta.model.obj_list()) == 1
    ):
        kwargs["initial"]["obj_content_type"] = ContentType.objects.get_for_model(
            self.Meta.model.obj_list()[0]
        ).id
    super().__init__(*args, **kwargs)
    subj_content_type = kwargs["initial"].get("subj_content_type", None)
    subj_object_id = kwargs["initial"].get("subj_object_id", None)
    obj_content_type = kwargs["initial"].get("obj_content_type", None)
    obj_object_id = kwargs["initial"].get("obj_object_id", None)

    self.subj_instance, self.obj_instance = None, None
    if instance := kwargs.get("instance"):
        self.subj_instance = instance.subj
        self.obj_instance = instance.obj
        subj_object_id = instance.subj.id
        obj_object_id = instance.obj.id
    else:
        if subj_content_type and subj_object_id:
            model = get_object_or_404(ContentType, pk=subj_content_type)
            self.subj_instance = get_object_or_404(
                model.model_class(), pk=subj_object_id
            )
        if obj_content_type and obj_object_id:
            model = get_object_or_404(ContentType, pk=obj_content_type)
            self.obj_instance = get_object_or_404(
                model.model_class(), pk=obj_object_id
            )

    self.fields["subj_object_id"].required = False
    self.fields["subj_content_type"].required = False
    if not subj_object_id:
        if subj_content_type:
            """ If we know the content type the subject will have, we
            use another autocomplete field that allows us to paste links
            and provides external autocomplete results.
            """
            ct = ContentType.objects.get(pk=subj_content_type)
            self.fields["subj"] = ModelImportChoiceField(
                queryset=ct.model_class().objects.all()
            )
            self.fields["subj"].widget = ApisListSelect2(
                attrs={"data-html": True},
                url=reverse("apis_core:generic:autocomplete", args=[ct])
                + "?create=True",
            )
            self.fields["subj"].widget.choices = self.fields["subj"].choices
        else:
            """ If we don't know the content type, we use a generic autocomplete
            field that autocompletes any content type the relation can have as a
            subject.
            """
            self.fields["subj_ct_and_id"] = CustomSelect2ListChoiceField()
            self.fields["subj_ct_and_id"].widget = ApisListSelect2(
                attrs={"data-html": True},
                url=self.__subj_autocomplete_url(),
            )
            if self.subj_instance:
                content_type = ContentType.objects.get_for_model(self.subj_instance)
                select_identifier = f"{content_type.id}_{self.subj_instance.id}"
                self.fields["subj_ct_and_id"].initial = select_identifier
                self.fields["subj_ct_and_id"].choices = [
                    (select_identifier, self.subj_instance)
                ]

    self.fields["obj_object_id"].required = False
    self.fields["obj_content_type"].required = False
    if not obj_object_id:
        if obj_content_type:
            """ If we know the content type the object will have, we
            use another autocomplete field that allows us to paste links
            and provides external autocomplete results.
            """
            ct = ContentType.objects.get(pk=obj_content_type)
            self.fields["obj"] = ModelImportChoiceField(
                queryset=ct.model_class().objects.all()
            )
            self.fields["obj"].widget = ApisListSelect2(
                attrs={"data-html": True},
                url=reverse("apis_core:generic:autocomplete", args=[ct])
                + "?create=True",
            )
            self.fields["obj"].widget.choices = self.fields["obj"].choices
        else:
            """ If we don't know the content type, we use a generic autocomplete
            field that autocompletes any content type the relation can have as a
            object.
            """
            self.fields["obj_ct_and_id"] = CustomSelect2ListChoiceField()
            self.fields["obj_ct_and_id"].widget = ApisListSelect2(
                attrs={"data-html": True},
                url=self.__obj_autocomplete_url(),
            )
            if self.obj_instance:
                content_type = ContentType.objects.get_for_model(self.obj_instance)
                select_identifier = f"{content_type.id}_{self.obj_instance.id}"
                self.fields["obj_ct_and_id"].initial = select_identifier
                self.fields["obj_ct_and_id"].choices = [
                    (select_identifier, self.obj_instance)
                ]

    self.order_fields(self.field_order)
    self.helper = FormHelper(self)
    model_ct = ContentType.objects.get_for_model(self.Meta.model)
    self.helper.form_id = f"relation_{model_ct.model}_form"
    self.helper.add_input(Submit("submit", "Submit"))

    if hx_post_route := self.params.get("hx_post_route", False):
        self.helper.attrs = {
            "hx-post": hx_post_route + "?" + urlencode(self.params, doseq=True),
            "hx-swap": "outerHTML",
            "hx-target": f"#{self.helper.form_id}",
        }
__obj_autocomplete_url()

generate the autocomplete url for the obj field, using the object types configured in the relation

Source code in apis_core/relations/forms/__init__.py
def __obj_autocomplete_url(self) -> str:
    """generate the autocomplete url for the obj field, using the object
    types configured in the relation"""
    obj_content_types = [
        ContentType.objects.get_for_model(model)
        for model in self.Meta.model.obj_list()
    ]
    return self.__entities_autocomplete_with_params(obj_content_types)
__subj_autocomplete_url()

generate the autocomplete url for the subj field, using the subject types configured in the relation

Source code in apis_core/relations/forms/__init__.py
def __subj_autocomplete_url(self) -> str:
    """generate the autocomplete url for the subj field, using the subject
    types configured in the relation"""
    subj_content_types = [
        ContentType.objects.get_for_model(model)
        for model in self.Meta.model.subj_list()
    ]
    return self.__entities_autocomplete_with_params(subj_content_types)
clean()

We check if there are subj or obj fields in the form data and if so, we use the data to create objects for the real fields of the Relation

Source code in apis_core/relations/forms/__init__.py
def clean(self) -> dict:
    """
    We check if there are `subj` or `obj` fields in the form data
    and if so, we use the data to create objects for the real fields of
    the Relation
    """
    cleaned_data = super().clean()
    if "subj_ct_and_id" in cleaned_data:
        subj_content_type, subj_object_id = cleaned_data["subj_ct_and_id"].split(
            "_"
        )
        cleaned_data["subj_content_type"] = ContentType.objects.get(
            pk=subj_content_type
        )
        cleaned_data["subj_object_id"] = subj_object_id
        del cleaned_data["subj_ct_and_id"]
    if "obj_ct_and_id" in cleaned_data:
        obj_content_type, obj_object_id = cleaned_data["obj_ct_and_id"].split("_")
        cleaned_data["obj_content_type"] = ContentType.objects.get(
            pk=obj_content_type
        )
        cleaned_data["obj_object_id"] = obj_object_id
        del cleaned_data["obj_ct_and_id"]
    if "subj" in cleaned_data:
        cleaned_data["subj_object_id"] = cleaned_data.pop("subj").id
    if "obj" in cleaned_data:
        cleaned_data["obj_object_id"] = cleaned_data.pop("obj").id
    return cleaned_data

utils

get_all_relation_subj_and_obj() cached

Return the model classes of any model that is in some way connected to a relation - either as obj or as subj

Returns: list[ContentType]: A list of unique ContentTypes for related models.

Source code in apis_core/relations/utils.py
@functools.cache
def get_all_relation_subj_and_obj() -> list[ContentType]:
    """
    Return the model classes of any model that is in some way
    connected to a relation - either as obj or as subj

    Returns:
    list[ContentType]: A list of unique ContentTypes for related models.
    """
    related_models = set()
    for rel in relation_content_types():
        related_models.update(rel.model_class().subj_list())
        related_models.update(rel.model_class().obj_list())
    return [ContentType.objects.get_for_model(item) for item in related_models]

relation_match_target(relation, target)

test if a relation points to a target this function should not be cached, because the forward attribute is an annotation that does not seem to be part of the relation, so if cached, method could be called with another forward value and return the wrong result

Source code in apis_core/relations/utils.py
def relation_match_target(relation, target: ContentType) -> bool:
    """
    test if a relation points to a target
    this function should not be cached, because the `forward` attribute
    is an annotation that does not seem to be part of the relation, so
    if cached, method could be called with another `forward` value and
    return the wrong result
    """
    if relation.forward and relation.obj_content_type == target:
        return True
    if not relation.forward and relation.subj_content_type == target:
        return True
    return False

utils

authentication

TokenAuthSupportQueryString

Bases: TokenAuthentication

Extend the TokenAuthentication class to support querystring authentication in the form of "http://www.example.com/?auth_token="

Source code in apis_core/utils/authentication.py
class TokenAuthSupportQueryString(TokenAuthentication):
    """
    Extend the TokenAuthentication class to support querystring authentication
    in the form of "http://www.example.com/?auth_token=<api_key>"
    """

    def authenticate(self, request):
        # Check if 'token_auth' is in the request query params.
        # Give precedence to 'Authorization' header.
        if (
            "api_key" in request.query_params
            and "HTTP_AUTHORIZATION" not in request.META
        ):
            return self.authenticate_credentials(request.query_params.get("api_key"))
        else:
            return super().authenticate(request)

autocomplete

ExternalAutocomplete

This is a helper base class for implementing external autocomplete classes. ExernalAutocomplete classes are expected to have a get_results(self, q) method that returns a list of results usable by the autocomplete view. This base class implements this get_results method in a way that you can inherit from it and just define a list of adapters. Those adapters are then used one by one to add external autocomplete search results.

Source code in apis_core/utils/autocomplete.py
class ExternalAutocomplete:
    """
    This is a helper base class for implementing external
    autocomplete classes. <Modelname>ExernalAutocomplete classes
    are expected to have a `get_results(self, q)` method that
    returns a list of results usable by the autocomplete view.
    This base class implements this `get_results` method in a
    way that you can inherit from it and just define a list of
    `adapters`. Those adapters are then used one by one to
    add external autocomplete search results.
    """

    session = requests.Session()
    adapters = []

    def get_results(self, q):
        results = []
        for adapter in self.adapters:
            results.extend(adapter.get_results(q, self.session))
        return results

ExternalAutocompleteAdapter

Base class for ExternalAutocompleteAdapters. It provides the methods used for templating the autocomplete results. You can pass a template name to initialization, which is then used to style the results.

Source code in apis_core/utils/autocomplete.py
class ExternalAutocompleteAdapter:
    """
    Base class for ExternalAutocompleteAdapters. It provides
    the methods used for templating the autocomplete results.
    You can pass a `template` name to initialization, which
    is then used to style the results.
    """

    template = None

    def __init__(self, *args, **kwargs):
        self.template = kwargs.get("template", None)

    def default_template(self, result):
        return f'{result["label"]} <a href="{result["id"]}">{result["id"]}</a>'

    def get_result_label(self, result):
        if self.template:
            return render_to_string(self.template, {"result": result})
        return self.default_template(result)

LobidAutocompleteAdapter

Bases: ExternalAutocompleteAdapter

This autocomplete adapters queries the lobid autocomplete apis. See https://lobid.org/gnd/api for details You can pass a lobid_params dict which will then be use as GET request parameters.

Source code in apis_core/utils/autocomplete.py
class LobidAutocompleteAdapter(ExternalAutocompleteAdapter):
    """
    This autocomplete adapters queries the lobid autocomplete apis.
    See https://lobid.org/gnd/api for details
    You can pass a `lobid_params` dict which will then be use as GET
    request parameters.
    """

    params = {}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.params = kwargs.get("params", {})

    def extract(self, res):
        return {
            "id": res["id"],
            "text": self.get_result_label(res),
            "selected_text": self.get_result_label(res),
        }

    def get_results(self, q, session=requests.Session()):
        endpoint = "https://lobid.org/gnd/search?"
        self.params["q"] = q
        res = session.get(endpoint, params=self.params)
        if res:
            return list(filter(bool, map(self.extract, res.json())))
        return []

TypeSenseAutocompleteAdapter

Bases: ExternalAutocompleteAdapter

This autocomplete adapters queries typesense collections on a typesense server. The collections variable can either be a string or a list - if its a string, that collection is queried directly, if its a list, the adapter uses typesense multi_search endpoint.

Source code in apis_core/utils/autocomplete.py
class TypeSenseAutocompleteAdapter(ExternalAutocompleteAdapter):
    """
    This autocomplete adapters queries typesense collections on a
    typesense server. The `collections` variable can either be a
    string or a list - if its a string, that collection is queried
    directly, if its a list, the adapter uses typesense `multi_search`
    endpoint.
    """

    collections = None
    token = None
    server = None

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.collections = kwargs.get("collections", None)
        self.token = kwargs.get("token", None)
        self.server = kwargs.get("server", None)

    def default_template(self, result):
        return super().default_template(result["document"])

    def extract(self, res):
        if res.get("document"):
            return {
                "id": res["document"]["id"],
                "text": self.get_result_label(res),
                "selected_text": self.get_result_label(res),
            }
        logger.error(
            "Could not parse result from typesense collection %s: %s",
            self.collections,
            res,
        )
        return False

    def get_results(self, q, session=requests.Session()):
        headers = {"X-TYPESENSE-API-KEY": self.token}
        res = None
        if self.token and self.server:
            match self.collections:
                # if there is only on collection configured, we hit that collection directly
                case str() as collection:
                    url = f"{self.server}/collections/{collection}/documents/search?q={q}&query_by=description&query_by=label"
                    res = session.get(url, headers=headers)
                # if there are multiple collections configured, we use the `multi_search` endpoint
                case list() as collectionlist:
                    url = f"{self.server}/multi_search?q={q}&query_by=description&query_by=label"
                    data = {"searches": []}
                    for collection in collectionlist:
                        data["searches"].append({"collection": collection})
                    res = session.post(url, data=json.dumps(data), headers=headers)
                case unknown:
                    logger.error("Don't know what to do with collection %s", unknown)

            if res:
                data = res.json()
                hits = data.get("hits", [])
                for result in data.get("results", []):
                    hits.extend(result["hits"])
                return list(filter(bool, map(self.extract, hits)))
        return []

fields

NewlineSeparatedListField

Bases: TextField

This field is basically a textfield with a custom widget. It uses the NewlineSeparatedListWidget to provide a simply way for users to enter multiple values without having to think about the separator.

Source code in apis_core/utils/fields.py
class NewlineSeparatedListField(models.TextField):
    """
    This field is basically a textfield with a custom widget.
    It uses the NewlineSeparatedListWidget to provide a simply way
    for users to enter multiple values without having to think about
    the separator.
    """

    def formfield(self, form_class=None, choices_form_class=None, **kwargs):
        kwargs["widget"] = NewlineSeparatedListWidget(attrs={"class": "mb-1"})
        return super().formfield(
            form_class=form_class, choices_form_class=choices_form_class, **kwargs
        )

helpers

construct_lookup(value)

Helper method to parse input values and construct field lookups (https://docs.djangoproject.com/en/4.2/ref/models/querysets/#field-lookups) Parses user input for wildcards and returns a tuple containing the interpreted django lookup string and the trimmed value E.g.

  • example -> ('__icontains', 'example')
  • *example* -> ('__icontains', 'example')
  • *example -> ('__iendswith', 'example')
  • example*-> ('__istartswith', 'example')
  • "example" -> ('__iexact', 'example')

:param str value: text to be parsed for * :return: a tuple containing the lookup type and the value without modifiers

Source code in apis_core/utils/helpers.py
def construct_lookup(value: str) -> tuple[str, str]:
    """
    Helper method to parse input values and construct field lookups
    (https://docs.djangoproject.com/en/4.2/ref/models/querysets/#field-lookups)
    Parses user input for wildcards and returns a tuple containing the
    interpreted django lookup string and the trimmed value
    E.g.

    - ``example`` -> ``('__icontains', 'example')``
    - ``*example*`` -> ``('__icontains', 'example')``
    - ``*example`` -> ``('__iendswith', 'example')``
    - ``example*``-> ``('__istartswith', 'example')``
    - ``"example"`` -> ``('__iexact', 'example')``

    :param str value: text to be parsed for ``*``
    :return: a tuple containing the lookup type and the value without modifiers
    """

    if value.startswith("*") and not value.endswith("*"):
        value = value[1:]
        return "__iendswith", value

    elif not value.startswith("*") and value.endswith("*"):
        value = value[:-1]
        return "__istartswith", value

    elif value.startswith('"') and value.endswith('"'):
        value = value[1:-1]
        return "__iexact", value

    else:
        if value.startswith("*") and value.endswith("*"):
            value = value[1:-1]
        return "__icontains", value

datadump_get_queryset(additional_app_labels=[])

This method is loosely based on the dumpdata admin command. It iterates throug the relevant app models and exports them using a serializer and natural foreign keys. Data exported this way can be reimported into a newly created Django APIS app

Source code in apis_core/utils/helpers.py
def datadump_get_queryset(additional_app_labels: list = []):
    """
    This method is loosely based on the `dumpdata` admin command.
    It iterates throug the relevant app models and exports them using
    a serializer and natural foreign keys.
    Data exported this way can be reimported into a newly created Django APIS app
    """

    # get all APIS apps and all APIS models
    apis_app_labels = ["apis_relations", "apis_metainfo"]
    apis_app_models = [
        model for model in apps.get_models() if model._meta.app_label in apis_app_labels
    ]

    # create a list of app labels we want to iterate
    # this allows to extend the apps via the ?app_labels= parameter
    app_labels = set(apis_app_labels)
    app_labels |= set(additional_app_labels)

    # look for models that inherit from APIS models and add their
    # app label to app_labels
    for model in apps.get_models():
        if any(map(lambda x: issubclass(model, x), apis_app_models)):
            app_labels.add(model._meta.app_label)

    # now go through all app labels
    app_list = {}
    for app_label in app_labels:
        app_config = apps.get_app_config(app_label)
        app_list[app_config] = None

    models = serializers.sort_dependencies(app_list.items(), allow_cycles=True)

    yield from datadump_get_objects(models)

get_html_diff(a, b, show_a=True, show_b=True, shorten=0)

Create an colorized html represenation of the difference of two values a and b If show_a is True, colorize deletions in a If show_b is True, colorize insertions in b The value of shorten defines if long parts of strings that contains no change should be shortened

Source code in apis_core/utils/helpers.py
def get_html_diff(a, b, show_a=True, show_b=True, shorten=0):
    """
    Create an colorized html represenation of the difference of two values a and b
    If `show_a` is True, colorize deletions in `a`
    If `show_b` is True, colorize insertions in `b`
    The value of `shorten` defines if long parts of strings that contains no change should be shortened
    """

    def style_remove(text):
        return f"<span class='diff-remove'>{text}</span>"

    def style_insert(text):
        return f"<span class='diff-insert'>{text}</span>"

    nones = ["", None]
    if a in nones and b in nones:
        result = ""
    elif a in nones:
        result = style_insert(b) if show_b else ""
    elif b in nones:
        result = style_remove(a) if show_a else ""
    else:
        result = ""
        a = str(a)
        b = str(b)
        codes = difflib.SequenceMatcher(None, a, b).get_opcodes()
        for opcode, a_start, a_end, b_start, b_end in codes:
            match opcode:
                case "equal":
                    equal = a[a_start:a_end]
                    if shorten and len(equal) > shorten:
                        equal = equal[:5] + " ... " + equal[-10:]
                    result += equal
                case "delete":
                    if show_a:
                        result += style_remove(a[a_start:a_end])
                case "insert":
                    if show_b:
                        result += style_insert(b[b_start:b_end])
                case "replace":
                    if show_b:
                        result += style_insert(b[b_start:b_end])
                    if show_a:
                        result += style_remove(a[a_start:a_end])
    return result

rdf

build_sparql_query(curie)

Build a SPARQL query with language preferences.

Parameters:

Name Type Description Default
curie str

predicate to filter on as defined in the toml. needs to include the predicate and optionally a lang tag to filter for separated with a comma. Eg "wdt:P122,en".

required

Returns:

Type Description
str

A SPARQL query string

Source code in apis_core/utils/rdf.py
def build_sparql_query(curie: str) -> str:
    """
    Build a SPARQL query with language preferences.

    Args:
        curie: predicate to filter on as defined in the toml.
                needs to include the predicate and optionally
                a lang tag to filter for separated with a comma.
                Eg "wdt:P122,en".

    Returns:
        A SPARQL query string
    """
    if curie.lower().strip().startswith(("select", "prefix")):
        return curie
    lang_tag = ""
    if "," in curie:
        curie, lang_tag = curie.split(",", 1)
        lang_tag = f'FILTER LANGMATCHES(LANG(?object), "{lang_tag}")'
    query = f"""
            SELECT ?object 
            WHERE {{ 
                ?subject {curie} ?object {lang_tag}
            }}
        """

    logger.debug("Generated SPARQL query: %s", query)
    return query

resolve(obj, graph)

Look at the value of object and return the parsed value. If the value starts and ens with angle brackets, we interpret it as and transform it to an URI. If the value is simple text we interpret it as an curie and we expand it using the graphs namespace manager. Otherwise we simply return the value

Source code in apis_core/utils/rdf.py
def resolve(obj, graph):
    """
    Look at the value of object and return the parsed
    value. If the value starts and ens with angle brackets,
    we interpret it as and transform it to an URI.
    If the value is simple text we interpret it as an curie
    and we expand it using the graphs namespace manager.
    Otherwise we simply return the value
    """
    if isinstance(obj, str):
        if obj.startswith("<") and obj.endswith(">"):
            return URIRef(obj[1:-1])
        return graph.namespace_manager.expand_curie(obj)
    return obj

settings

get_entity_settings_by_modelname(entity=None)

return the settings for a specific entity or the dict for all entities if no entity is given

Source code in apis_core/utils/settings.py
def get_entity_settings_by_modelname(entity: str = None) -> dict:
    """
    return the settings for a specific entity or the dict for all entities
    if no entity is given
    """
    apis_entities = getattr(settings, "APIS_ENTITIES", {})
    if entity:
        # lookup entity settings by name and by capitalized name
        return apis_entities.get(entity, apis_entities.get(entity.capitalize(), {}))
    return apis_entities