Konfiguration von Relationship Joins

relationship() erstellt normalerweise einen Join zwischen zwei Tabellen, indem es die Fremdschlüsselbeziehung zwischen den beiden Tabellen untersucht, um zu bestimmen, welche Spalten verglichen werden sollen. Es gibt eine Vielzahl von Situationen, in denen dieses Verhalten angepasst werden muss.

Behandlung mehrerer Join-Pfade

Eine der häufigsten Situationen, mit denen man konfrontiert wird, ist, wenn es mehr als einen Fremdschlüsselpfad zwischen zwei Tabellen gibt.

Betrachten Sie eine Customer-Klasse, die zwei Fremdschlüssel zu einer Address-Klasse enthält

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address")
    shipping_address = relationship("Address")


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

Das obige Mapping wird, wenn wir versuchen, es zu verwenden, die Fehlermeldung erzeugen

sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables.  Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.

Die obige Nachricht ist ziemlich lang. Es gibt viele potenzielle Meldungen, die relationship() zurückgeben kann, die sorgfältig darauf abgestimmt sind, eine Vielzahl gängiger Konfigurationsprobleme zu erkennen; die meisten werden zusätzliche Konfigurationen vorschlagen, die zur Behebung der Mehrdeutigkeit oder anderer fehlender Informationen erforderlich sind.

In diesem Fall fordert uns die Nachricht auf, jedes relationship() zu qualifizieren, indem wir für jedes angeben, welche Fremdschlüsselspalte berücksichtigt werden soll, und die entsprechende Form ist wie folgt

class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

Oben geben wir das Argument foreign_keys an, das ein Column-Objekt oder eine Liste von Column-Objekten ist, die angeben, welche Spalten als „fremd“ betrachtet werden sollen, oder anders ausgedrückt, die Spalten, die einen Wert enthalten, der auf eine übergeordnete Tabelle verweist. Das Laden der Customer.billing_address-Beziehung von einem Customer-Objekt verwendet den Wert in billing_address_id, um die Zeile in Address zu identifizieren, die geladen werden soll; ebenso wird shipping_address_id für die shipping_address-Beziehung verwendet. Die Verknüpfung der beiden Spalten spielt auch bei der Persistenz eine Rolle; der neu generierte Primärschlüssel eines gerade eingefügten Address-Objekts wird während eines Flushes in die entsprechende Fremdschlüsselspalte eines zugehörigen Customer-Objekts kopiert.

Beim Angeben von foreign_keys mit Declarative können wir auch Zeichenkettennamen zur Angabe verwenden, jedoch ist es wichtig, dass bei Verwendung einer Liste **die Liste Teil der Zeichenkette** ist

billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")

In diesem speziellen Beispiel ist die Liste in keinem Fall notwendig, da es nur eine Column gibt, die wir benötigen

billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")

Warnung

Wenn das Argument relationship.foreign_keys als Python-auswertbare Zeichenkette übergeben wird, wird es mit der eval()-Funktion von Python interpretiert. **Geben Sie KEINE unvertrauenswürdigen Eingaben in diese Zeichenkette ein**. Siehe Auswertung von Relationship-Argumenten für Details zur deklarativen Auswertung von relationship()-Argumenten.

Angabe alternativer Join-Bedingungen

Das Standardverhalten von relationship() beim Erstellen eines Joins besteht darin, dass die Werte der Primärschlüsselspalten auf der einen Seite mit denen der Fremdschlüssel-referenzierenden Spalten auf der anderen Seite gleichgesetzt werden. Wir können dieses Kriterium mit dem Argument relationship.primaryjoin sowie dem Argument relationship.secondaryjoin ändern, wenn eine "sekundäre" Tabelle verwendet wird.

Im folgenden Beispiel, das die User-Klasse zusammen mit einer Address-Klasse verwendet, die eine Straßenadresse speichert, erstellen wir eine Beziehung boston_addresses, die nur die Address-Objekte lädt, die eine Stadt "Boston" angeben

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)
    boston_addresses = relationship(
        "Address",
        primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
    )


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    user_id = mapped_column(Integer, ForeignKey("user.id"))

    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

Innerhalb dieses String-SQL-Ausdrucks haben wir die Konjunktion and_() verwendet, um zwei verschiedene Prädikate für die Join-Bedingung festzulegen – sowohl die Spalten User.id und Address.user_id miteinander zu verknüpfen, als auch die Zeilen in Address auf city='Boston' zu beschränken. Bei der Verwendung von Declarative sind rudimentäre SQL-Funktionen wie and_() im ausgewerteten Namensraum eines Zeichenketten-Arguments von relationship() automatisch verfügbar.

Warnung

Wenn das Argument relationship.primaryjoin als Python-auswertbare Zeichenkette übergeben wird, wird es mit der eval()-Funktion von Python interpretiert. **Geben Sie KEINE unvertrauenswürdigen Eingaben in diese Zeichenkette ein**. Siehe Auswertung von Relationship-Argumenten für Details zur deklarativen Auswertung von relationship()-Argumenten.

Die benutzerdefinierten Kriterien, die wir in einem relationship.primaryjoin verwenden, sind im Allgemeinen nur dann von Bedeutung, wenn SQLAlchemy SQL rendert, um diese Beziehung zu laden oder darzustellen. Das heißt, sie werden in der SQL-Anweisung verwendet, die zur Durchführung eines Lazy-Loads pro Attribut emittiert wird, oder wenn zur Abfragezeit ein Join erstellt wird, z. B. über Select.join(), oder über die Eager-Lade-Stile "joined" oder "subquery". Wenn in-memory Objekte manipuliert werden, können wir jedes Address-Objekt, das wir möchten, in die boston_addresses-Collection legen, unabhängig vom Wert des Attributs .city. Die Objekte bleiben in der Collection vorhanden, bis das Attribut abgelaufen und aus der Datenbank neu geladen wird, wo das Kriterium angewendet wird. Wenn ein Flush auftritt, werden die Objekte innerhalb von boston_addresses bedingungslos geflusht, wobei der Wert der Primärschlüsselspalte user.id für jede Zeile in die Fremdschlüssel-haltende Spalte address.user_id kopiert wird. Das Kriterium city hat hier keine Auswirkung, da der Flush-Prozess nur die Synchronisation von Primärschlüsselwerten mit referenzierenden Fremdschlüsselwerten betrifft.

Erstellung benutzerdefinierter Fremdbedingungen

Ein weiterer Bestandteil der primären Join-Bedingung ist die Art und Weise, wie die als "fremd" betrachteten Spalten bestimmt werden. Normalerweise gibt eine Teilmenge von Column-Objekten ForeignKey an oder ist Teil eines ForeignKeyConstraint, der für die Join-Bedingung relevant ist. relationship() berücksichtigt diesen Fremdschlüsselstatus, um zu entscheiden, wie Daten für diese Beziehung geladen und gespeichert werden sollen. Das Argument relationship.primaryjoin kann jedoch verwendet werden, um eine Join-Bedingung zu erstellen, die keine "Schema"-Fremdschlüssel beinhaltet. Wir können relationship.primaryjoin mit relationship.foreign_keys und relationship.remote_side explizit kombinieren, um einen solchen Join zu etablieren.

Nachfolgend verbindet sich eine Klasse HostEntry mit sich selbst und setzt die Zeichenketten-Spalte content mit der Spalte ip_address gleich, die ein PostgreSQL-Typ namens INET ist. Wir müssen cast() verwenden, um eine Seite des Joins in den Typ der anderen zu konvertieren

from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET

from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign_keys, remote_side
    parent_host = relationship(
        "HostEntry",
        primaryjoin=ip_address == cast(content, INET),
        foreign_keys=content,
        remote_side=ip_address,
    )

Die obige Beziehung erzeugt einen Join wie

SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)

Eine alternative Syntax zur obigen besteht darin, die foreign() und remote() Annotationen inline innerhalb des relationship.primaryjoin-Ausdrucks zu verwenden. Diese Syntax repräsentiert die Annotationen, die relationship() selbst auf die Join-Bedingung anwendet, gegeben die Argumente relationship.foreign_keys und relationship.remote_side. Diese Funktionen können prägnanter sein, wenn eine explizite Join-Bedingung vorhanden ist, und dienen zusätzlich dazu, genau die Spalte zu markieren, die "fremd" oder "remote" ist, unabhängig davon, ob diese Spalte mehrmals oder in komplexen SQL-Ausdrücken angegeben wird.

from sqlalchemy.orm import foreign, remote


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign() and remote() annotations
    # in lieu of separate arguments
    parent_host = relationship(
        "HostEntry",
        primaryjoin=remote(ip_address) == cast(foreign(content), INET),
    )

Verwendung benutzerdefinierter Operatoren in Join-Bedingungen

Ein weiterer Anwendungsfall für Beziehungen ist die Verwendung benutzerdefinierter Operatoren, wie z. B. der PostgreSQL "enthält" (<<) Operator beim Verknüpfen mit Typen wie INET und CIDR. Für benutzerdefinierte boolesche Operatoren verwenden wir die Funktion Operators.bool_op()

inet_column.bool_op("<<")(cidr_column)

Ein Vergleich wie der obige kann direkt mit relationship.primaryjoin verwendet werden, wenn eine relationship() konstruiert wird

class IPA(Base):
    __tablename__ = "ip_address"

    id = mapped_column(Integer, primary_key=True)
    v4address = mapped_column(INET)

    network = relationship(
        "Network",
        primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
        viewonly=True,
    )


class Network(Base):
    __tablename__ = "network"

    id = mapped_column(Integer, primary_key=True)
    v4representation = mapped_column(CIDR)

Oben, eine Abfrage wie

select(IPA).join(IPA.network)

Wird gerendert als

SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation

Benutzerdefinierte Operatoren basierend auf SQL-Funktionen

Eine Variante des Anwendungsfalls für Operators.op.is_comparison ist, wenn wir keinen Operator, sondern eine SQL-Funktion verwenden. Das typische Beispiel für diesen Anwendungsfall sind die PostgreSQL PostGIS-Funktionen, jedoch kann jede SQL-Funktion auf jeder Datenbank, die zu einer binären Bedingung aufgelöst wird, angewendet werden. Um diesem Anwendungsfall gerecht zu werden, kann die Methode FunctionElement.as_comparison() jede SQL-Funktion modifizieren, wie z. B. die aus dem func-Namespace aufgerufenen, um dem ORM anzuzeigen, dass die Funktion einen Vergleich zweier Ausdrücke erzeugt. Das folgende Beispiel veranschaulicht dies mit der Bibliothek Geoalchemy2

from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign


class Polygon(Base):
    __tablename__ = "polygon"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POLYGON", srid=4326))
    points = relationship(
        "Point",
        primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
        viewonly=True,
    )


class Point(Base):
    __tablename__ = "point"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POINT", srid=4326))

Oben zeigt FunctionElement.as_comparison() an, dass die SQL-Funktion func.ST_Contains() die Ausdrücke Polygon.geom und Point.geom vergleicht. Die Annotation foreign() gibt zusätzlich an, welche Spalte in dieser besonderen Beziehung die Rolle des „Fremdschlüssels“ übernimmt.

Neu in Version 1.3: Hinzugefügt FunctionElement.as_comparison().

Überlappende Fremdschlüssel

Ein seltenes Szenario kann auftreten, wenn zusammengesetzte Fremdschlüssel verwendet werden, so dass eine einzelne Spalte Gegenstand von mehr als einer Spalte sein kann, auf die über eine Fremdschlüsselbeschränkung verwiesen wird.

Betrachten Sie ein (zugegebenermaßen komplexes) Mapping wie das Magazine-Objekt, auf das sowohl vom Writer-Objekt als auch vom Article-Objekt über ein zusammengesetztes Primärschlüssel-Schema verwiesen wird, das für beide magazine_id enthält; dann ist, um Article auch auf Writer verweisen zu lassen, Article.magazine_id an zwei separaten Beziehungen beteiligt; Article.magazine und Article.writer

class Magazine(Base):
    __tablename__ = "magazine"

    id = mapped_column(Integer, primary_key=True)


class Article(Base):
    __tablename__ = "article"

    article_id = mapped_column(Integer)
    magazine_id = mapped_column(ForeignKey("magazine.id"))
    writer_id = mapped_column()

    magazine = relationship("Magazine")
    writer = relationship("Writer")

    __table_args__ = (
        PrimaryKeyConstraint("article_id", "magazine_id"),
        ForeignKeyConstraint(
            ["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
        ),
    )


class Writer(Base):
    __tablename__ = "writer"

    id = mapped_column(Integer, primary_key=True)
    magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
    magazine = relationship("Magazine")

Wenn das obige Mapping konfiguriert ist, wird diese Warnung ausgegeben

SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.

Dies bezieht sich darauf, dass Article.magazine_id Gegenstand zweier verschiedener Fremdschlüsselbeschränkungen ist; es verweist direkt auf Magazine.id als Quellspalte, verweist aber auch auf Writer.magazine_id als Quellspalte im Kontext des zusammengesetzten Schlüssels zu Writer. Wenn wir einen Article mit einem bestimmten Magazine verknüpfen, aber dann den Article mit einem Writer verknüpfen, der mit einem *anderen* Magazine verknüpft ist, überschreibt die ORM Article.magazine_id nicht-deterministisch, ändert stillschweigend, auf welches Magazin wir verweisen; sie kann auch versuchen, NULL in diese Spalte zu setzen, wenn wir einen Writer von einem Article trennen. Die Warnung teilt uns mit, dass dies der Fall ist.

Um dies zu lösen, müssen wir das Verhalten von Article aufbrechen, um alle drei der folgenden Merkmale einzuschließen

  1. Article schreibt in erster Linie in Article.magazine_id basierend auf Daten, die in der Article.magazine-Beziehung gespeichert sind, das heißt, ein Wert, der von Magazine.id kopiert wird.

  2. Article kann im Auftrag von Daten, die in der Article.writer-Beziehung gespeichert sind, in Article.writer_id schreiben, aber nur die Spalte Writer.id; die Spalte Writer.magazine_id sollte nicht in Article.magazine_id geschrieben werden, da sie letztendlich von Magazine.id stammt.

  3. Article berücksichtigt Article.magazine_id beim Laden von Article.writer, auch wenn es für diese Beziehung nicht hineinschreibt.

Um nur #1 und #2 zu erhalten, könnten wir nur Article.writer_id als "Fremdschlüssel" für Article.writer angeben

class Article(Base):
    # ...

    writer = relationship("Writer", foreign_keys="Article.writer_id")

Dies hat jedoch zur Folge, dass Article.writer Article.magazine_id bei Abfragen gegen Writer nicht berücksichtigt.

SELECT article.article_id AS article_article_id,
    article.magazine_id AS article_magazine_id,
    article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id

Daher, um alle #1, #2 und #3 zu erreichen, drücken wir die Join-Bedingung sowie die Spalten, die geschrieben werden sollen, aus, indem wir relationship.primaryjoin vollständig verwenden, zusammen mit entweder dem Argument relationship.foreign_keys oder prägnanter durch Annotation mit foreign()

class Article(Base):
    # ...

    writer = relationship(
        "Writer",
        primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
        "Writer.magazine_id == Article.magazine_id)",
    )

Nicht-relationale Vergleiche / Materialized Path

Warnung

Dieser Abschnitt beschreibt ein experimentelles Feature.

Die Verwendung benutzerdefinierter Ausdrücke ermöglicht es uns, unorthodoxe Join-Bedingungen zu erzeugen, die nicht dem üblichen Primär-/Fremdschlüsselmodell folgen. Ein solches Beispiel ist das Materialized-Path-Muster, bei dem wir Zeichenketten auf überlappende Pfad-Tokens vergleichen, um eine Baumstruktur zu erzeugen.

Durch sorgfältige Verwendung von foreign() und remote() können wir eine Beziehung aufbauen, die effektiv ein rudimentäres Materialized-Path-System erzeugt. Im Wesentlichen, wenn foreign() und remote() auf der *gleichen* Seite des Vergleichsausdrucks stehen, wird die Beziehung als "eins zu viele" betrachtet; wenn sie auf *unterschiedlichen* Seiten stehen, wird die Beziehung als "viele zu eins" betrachtet. Für den Vergleich, den wir hier verwenden werden, werden wir mit Collections arbeiten, also behalten wir die Konfiguration als "eins zu viele" bei.

class Element(Base):
    __tablename__ = "element"

    path = mapped_column(String, primary_key=True)

    descendants = relationship(
        "Element",
        primaryjoin=remote(foreign(path)).like(path.concat("/%")),
        viewonly=True,
        order_by=path,
    )

Oben, wenn ein Element-Objekt mit einem Pfadattribut von "/foo/bar2" gegeben ist, suchen wir für das Laden von Element.descendants nach Folgendem

SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path

Selbstreferenzielle Many-to-Many Beziehung

Siehe auch

Dieser Abschnitt dokumentiert eine Zwei-Tabellen-Variante des "Adjazenzlisten"-Musters, das unter Adjazenzlisten-Beziehungen dokumentiert ist. Beachten Sie die selbstreferenziellen Abfragemuster in den Unterabschnitten Selbstreferenzielle Abfragestrategien und Konfiguration des selbstreferenziellen Eager-Loadings, die gleichermaßen für das hier diskutierte Mapping-Muster gelten.

Many-to-Many-Beziehungen können durch einen oder beide der Parameter relationship.primaryjoin und relationship.secondaryjoin angepasst werden – letzteres ist wichtig für eine Beziehung, die eine Many-to-Many-Referenz über das Argument relationship.secondary angibt. Eine häufige Situation, die die Verwendung von relationship.primaryjoin und relationship.secondaryjoin beinhaltet, ist die Einrichtung einer Many-to-Many-Beziehung von einer Klasse zu sich selbst, wie unten gezeigt.

from typing import List

from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship


class Base(DeclarativeBase):
    pass


node_to_node = Table(
    "node_to_node",
    Base.metadata,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)


class Node(Base):
    __tablename__ = "node"
    id: Mapped[int] = mapped_column(primary_key=True)
    label: Mapped[str]
    right_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.left_node_id,
        secondaryjoin=id == node_to_node.c.right_node_id,
        back_populates="left_nodes",
    )
    left_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.right_node_id,
        secondaryjoin=id == node_to_node.c.left_node_id,
        back_populates="right_nodes",
    )

Wo SQLAlchemy oben automatisch nicht wissen kann, welche Spalten für die Beziehungen right_nodes und left_nodes miteinander verbunden werden sollen. Die Argumente relationship.primaryjoin und relationship.secondaryjoin legen fest, wie wir die Assoziationstabelle verknüpfen möchten. In der obigen Deklarativen Form, da wir diese Bedingungen innerhalb des Python-Blocks deklarieren, der der Node-Klasse entspricht, ist die Variable id direkt als das Column-Objekt verfügbar, mit dem wir verknüpfen möchten.

Alternativ können wir die Argumente relationship.primaryjoin und relationship.secondaryjoin mithilfe von Zeichenketten definieren, was in dem Fall geeignet ist, dass unsere Konfiguration entweder das Spaltenobjekt Node.id noch nicht verfügbar hat oder die Tabelle node_to_node vielleicht noch nicht verfügbar ist. Wenn wir auf ein einfaches Table-Objekt in einer Deklarativen Zeichenkette verweisen, verwenden wir den Namen der Tabelle, wie er in den MetaData vorhanden ist.

class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    label = mapped_column(String)
    right_nodes = relationship(
        "Node",
        secondary="node_to_node",
        primaryjoin="Node.id==node_to_node.c.left_node_id",
        secondaryjoin="Node.id==node_to_node.c.right_node_id",
        backref="left_nodes",
    )

Warnung

Wenn die Argumente relationship.primaryjoin und relationship.secondaryjoin als Python-auswertbare Zeichenkette übergeben werden, werden sie mit der eval()-Funktion von Python interpretiert. **Geben Sie KEINE unvertrauenswürdigen Eingaben in diese Zeichenketten ein**. Siehe Auswertung von Relationship-Argumenten für Details zur deklarativen Auswertung von relationship()-Argumenten.

Eine klassische Mapping-Situation hier ist ähnlich, bei der node_to_node mit node.c.id verknüpft werden kann.

from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry

metadata_obj = MetaData()
mapper_registry = registry()

node_to_node = Table(
    "node_to_node",
    metadata_obj,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

node = Table(
    "node",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("label", String),
)


class Node:
    pass


mapper_registry.map_imperatively(
    Node,
    node,
    properties={
        "right_nodes": relationship(
            Node,
            secondary=node_to_node,
            primaryjoin=node.c.id == node_to_node.c.left_node_id,
            secondaryjoin=node.c.id == node_to_node.c.right_node_id,
            backref="left_nodes",
        )
    },
)

Beachten Sie, dass in beiden Beispielen das Keyword relationship.backref eine left_nodes Backref spezifiziert – wenn relationship() die zweite Beziehung in umgekehrter Richtung erstellt, ist sie intelligent genug, die Argumente relationship.primaryjoin und relationship.secondaryjoin umzukehren.

Siehe auch

Kompositierte „sekundäre“ Joins

Hinweis

Dieser Abschnitt behandelt Randfälle, die von SQLAlchemy einigermaßen unterstützt werden. Es wird jedoch empfohlen, solche Probleme wann immer möglich auf einfachere Weise zu lösen, indem vernünftige relationale Layouts und/oder In-Python-Attribute verwendet werden.

Manchmal, wenn man eine relationship() zwischen zwei Tabellen aufbauen möchte, sind mehr als nur zwei oder drei Tabellen beteiligt, um sie zu verbinden. Dies ist ein Bereich von relationship(), in dem man die Grenzen des Möglichen ausloten möchte, und oft muss die endgültige Lösung für viele dieser exotischen Anwendungsfälle in der SQLAlchemy-Mailingliste ausgearbeitet werden.

In neueren Versionen von SQLAlchemy kann der Parameter relationship.secondary in einigen dieser Fälle verwendet werden, um ein zusammengesetztes Ziel bereitzustellen, das aus mehreren Tabellen besteht. Nachfolgend finden Sie ein Beispiel für eine solche Join-Bedingung (erfordert mindestens Version 0.9.2, um wie gezeigt zu funktionieren)

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

    d = relationship(
        "D",
        secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
        primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
        secondaryjoin="D.id == B.d_id",
        uselist=False,
        viewonly=True,
    )


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)
    d_id = mapped_column(ForeignKey("d.id"))


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
    d_id = mapped_column(ForeignKey("d.id"))


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)

Im obigen Beispiel stellen wir alle drei bereit: relationship.secondary, relationship.primaryjoin und relationship.secondaryjoin im deklarativen Stil, wobei direkt auf die benannten Tabellen a, b, c, d Bezug genommen wird. Eine Abfrage von A nach D sieht wie folgt aus:

sess.scalars(select(A).join(A.d)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN ( b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id JOIN c AS c_1 ON c_1.d_id = d_1.id) ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id

Im obigen Beispiel nutzen wir die Möglichkeit, mehrere Tabellen in einen „sekundären“ Container zu packen, sodass wir über viele Tabellen hinweg joinen können, während wir die Dinge für relationship() „einfach“ halten, in dem Sinne, dass es auf beiden Seiten, der „linken“ und der „rechten“, nur „eine“ Tabelle gibt; die Komplexität bleibt in der Mitte.

Warnung

Eine Beziehung wie die obige wird typischerweise als viewonly=True markiert, unter Verwendung von relationship.viewonly, und sollte als schreibgeschützt betrachtet werden. Obwohl es manchmal Wege gibt, Beziehungen wie die obige schreibbar zu machen, ist dies im Allgemeinen kompliziert und fehleranfällig.

Beziehung zu einer alias-Klasse

Im vorherigen Abschnitt haben wir eine Technik veranschaulicht, bei der wir relationship.secondary verwendet haben, um zusätzliche Tabellen in eine Join-Bedingung aufzunehmen. Es gibt einen komplexen Join-Fall, bei dem selbst diese Technik nicht ausreicht; wenn wir von A nach B joinen wollen und dabei eine beliebige Anzahl von C, D usw. dazwischen verwenden, es jedoch auch Join-Bedingungen zwischen A und B *direkt* gibt. In diesem Fall kann der Join von A nach B schwierig zu formulieren sein, nur mit einer komplexen relationship.primaryjoin-Bedingung, da die dazwischen liegenden Tabellen spezielle Behandlung erfordern können, und es ist auch nicht mit einem relationship.secondary-Objekt ausdrückbar, da das Muster A->secondary->B keine direkten Bezüge zwischen A und B unterstützt. Wenn dieser **extrem fortgeschrittene** Fall auftritt, können wir darauf zurückgreifen, ein zweites Mapping als Ziel für die Beziehung zu erstellen. Hier verwenden wir AliasedClass, um ein Mapping zu einer Klasse zu erstellen, die alle zusätzlichen Tabellen enthält, die wir für diesen Join benötigen. Um diesen Mapper als „alternatives“ Mapping für unsere Klasse zu erzeugen, verwenden wir die Funktion aliased(), um das neue Konstrukt zu erzeugen, und verwenden dann relationship() auf das Objekt, als wäre es eine einfache abgebildete Klasse.

Unten wird eine relationship() mit einem einfachen Join von A nach B veranschaulicht, wobei die primaryjoin-Bedingung um zwei zusätzliche Entitäten C und D erweitert wird, die ebenfalls Zeilen haben müssen, die mit den Zeilen in sowohl A als auch B gleichzeitig übereinstimmen.

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

    some_c_value = mapped_column(String)


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)
    c_id = mapped_column(ForeignKey("c.id"))
    b_id = mapped_column(ForeignKey("b.id"))

    some_d_value = mapped_column(String)


# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)

# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)

A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

Mit dem obigen Mapping sieht ein einfacher Join so aus:

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id

Integration von AliasedClass-Mappings mit Typing und Vermeidung früher Mapper-Konfiguration

Die Erstellung des aliased()-Konstrukts auf eine abgebildete Klasse erzwingt den Schritt configure_mappers(), der alle aktuellen Klassen und ihre Beziehungen auflöst. Dies kann problematisch sein, wenn andere abgebildete Klassen, die von den aktuellen Mappings benötigt werden, noch nicht deklariert wurden, oder wenn die Konfiguration der Beziehung selbst Zugriff auf noch nicht deklarierte Klassen benötigt. Darüber hinaus funktioniert Saltakeys Declarative Pattern am besten mit Python-Typisierung, wenn Beziehungen im Voraus deklariert werden.

Um die Erstellung der Beziehung so zu organisieren, dass sie mit diesen Problemen funktioniert, kann ein Konfigurationsebene-Event-Hook wie MapperEvents.before_mapper_configured() verwendet werden, der den Konfigurationscode nur dann aufruft, wenn alle Mappings zur Konfiguration bereit sind.

from sqlalchemy import event


class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

Oben wird die Funktion _configure_ab_relationship() nur dann aufgerufen, wenn eine vollständig konfigurierte Version von A angefordert wird, zu welchem Zeitpunkt die Klassen B, D und C verfügbar wären.

Für einen Ansatz, der sich mit Inline-Typisierung integriert, kann eine ähnliche Technik verwendet werden, um effektiv ein „Singleton“-Erstellungsmuster für die alias-Klasse zu generieren, bei dem sie als globale Variable verzögert initialisiert wird, die dann inline in der Beziehung verwendet werden kann.

from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )


# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)

Verwendung des AliasedClass-Ziels in Abfragen

Im vorherigen Beispiel verweist die Beziehung A.b auf die Entität B_viacd als Ziel und **nicht** direkt auf die Klasse B. Um zusätzliche Kriterien, die die Beziehung A.b betreffen, hinzuzufügen, ist es typischerweise notwendig, direkt auf B_viacd zu verweisen, anstatt B zu verwenden, insbesondere in Fällen, in denen die Zielentität von A.b in einen Alias oder eine Unterabfrage umgewandelt werden soll. Unten wird dieselbe Beziehung veranschaulicht, die eine Unterabfrage anstelle eines Joins verwendet.

subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)

Eine Abfrage, die die obige Beziehung A.b verwendet, rendert eine Unterabfrage.

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id

Wenn wir zusätzliche Kriterien basierend auf dem Join A.b hinzufügen möchten, müssen wir dies in Bezug auf B_viacd_subquery und nicht direkt in Bezug auf B tun.

sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id WHERE anon_1.some_b_column = ? ORDER BY anon_1.id

Zeilenbegrenzte Beziehungen mit Fensterfunktionen

Ein weiterer interessanter Anwendungsfall für Beziehungen zu AliasedClass-Objekten sind Situationen, in denen die Beziehung zu einer spezialisierten SELECT-Abfrage beliebiger Form joinen muss. Ein Szenario ist die Verwendung einer Fensterfunktion, z. B. um zu begrenzen, wie viele Zeilen für eine Beziehung zurückgegeben werden sollen. Das folgende Beispiel veranschaulicht eine nicht primäre Mapper-Beziehung, die die ersten zehn Elemente für jede Sammlung lädt.

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)


class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))


partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()

partitioned_b = aliased(B, partition)

A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)

Wir können die obige partitioned_bs-Beziehung mit den meisten Lade-Strategien verwenden, wie z. B. selectinload().

for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects

Wo die „selectinload“-Abfrage oben so aussieht:

SELECT
    a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
    anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
    SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
    row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
    FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id

Oben erhalten wir für jeden übereinstimmenden Primärschlüssel in „a“ die ersten zehn „bs“, sortiert nach „b.id“. Durch die Partitionierung nach „a_id“ stellen wir sicher, dass jede „Zeilennummer“ lokal zum übergeordneten „a_id“ ist.

Ein solches Mapping würde normalerweise auch eine „normale“ Beziehung von „A“ zu „B“ für Persistenzoperationen sowie für den Fall enthalten, dass die vollständige Menge von „B“-Objekten pro „A“ gewünscht wird.

Erstellen von abfragefähigen Eigenschaften

Sehr ehrgeizige benutzerdefinierte Join-Bedingungen können möglicherweise nicht direkt persistent gemacht werden und in einigen Fällen nicht einmal korrekt geladen werden. Um den Persistenzteil der Gleichung zu entfernen, verwenden Sie das Flag relationship.viewonly auf der relationship(), was sie als schreibgeschütztes Attribut festlegt (in die Sammlung geschriebene Daten werden beim Flush ignoriert). In extremen Fällen sollten Sie jedoch erwägen, eine normale Python-Eigenschaft in Verbindung mit Query wie folgt zu verwenden:

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)

    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()

In anderen Fällen kann der Deskriptor so aufgebaut werden, dass er vorhandene In-Python-Daten nutzt. Siehe den Abschnitt über Verwendung von Deskriptoren und Hybriden für eine allgemeinere Diskussion über spezielle Python-Attribute.

Hinweise zur Verwendung des viewonly-Beziehungsparameters

Der Parameter relationship.viewonly, wenn er auf ein relationship()-Konstrukt angewendet wird, zeigt an, dass diese relationship() nicht an ORM Unit of Work-Operationen teilnimmt und dass das Attribut auch keine Änderungen in Python an der dargestellten Sammlung erwartet. Das bedeutet, dass Änderungen an dieser Liste oder diesem Set, wie sie auf einer abgebildeten Instanz vorhanden sind, **keine Auswirkung** auf den ORM-Flush-Prozess haben, auch wenn die Viewonly-Beziehung auf eine mutable Python-Sammlung wie eine Liste oder ein Set verweisen mag.

Betrachten Sie zum Erkunden dieses Szenarios dieses Mapping:

from __future__ import annotations

import datetime

from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship()

    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="current_week_tasks")

Die folgenden Abschnitte werden verschiedene Aspekte dieser Konfiguration erläutern.

In-Python-Mutationen, einschließlich Backrefs, sind mit viewonly=True nicht angemessen

Das obige Mapping zielt auf die User.current_week_tasks Viewonly-Beziehung als Ziel der Backref des Task.user-Attributs ab. Dies wird derzeit nicht von Saltakeys ORM-Konfigurationsprozess markiert, ist aber ein Konfigurationsfehler. Das Ändern des .user-Attributs eines Task hat keine Auswirkungen auf das .current_week_tasks-Attribut.

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

Es gibt einen weiteren Parameter namens relationship.sync_backrefs, der hier eingeschaltet werden kann, um .current_week_tasks in diesem Fall zu mutieren, dies wird jedoch nicht als Best Practice bei einer Viewonly-Beziehung angesehen, die stattdessen nicht für In-Python-Mutationen herangezogen werden sollte.

In diesem Mapping können Backrefs zwischen User.all_tasks und Task.user konfiguriert werden, da diese beide nicht Viewonly sind und normal synchronisiert werden.

Abgesehen vom Problem, dass Backref-Mutationen für Viewonly-Beziehungen deaktiviert sind, werden auch einfache Änderungen an der User.all_tasks-Sammlung in Python nicht in der User.current_week_tasks-Sammlung widergespiegelt, bis Änderungen in die Datenbank übertragen wurden.

Insgesamt ist für einen Anwendungsfall, bei dem eine benutzerdefinierte Sammlung sofort auf In-Python-Mutationen reagieren soll, die Viewonly-Beziehung im Allgemeinen nicht geeignet. Ein besserer Ansatz ist die Verwendung der Hybrid Attributes-Funktion von Saltakeys oder, für rein instanzbezogene Fälle, die Verwendung eines Python-@property, bei dem eine benutzerdefinierte Sammlung, die sich aus der aktuellen Python-Instanz ergibt, implementiert werden kann. Um unser Beispiel so zu ändern, dass es auf diese Weise funktioniert, reparieren wir den Parameter relationship.back_populates auf Task.user, um auf User.all_tasks zu verweisen, und veranschaulichen dann eine einfache @property, die Ergebnisse in Bezug auf die unmittelbare User.all_tasks-Sammlung liefert.

class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")

Durch die Verwendung einer In-Python-Sammlung, die bei jeder Ausführung on-the-fly berechnet wird, haben wir jederzeit garantiert die richtige Antwort, ohne die Datenbank überhaupt verwenden zu müssen.

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]

Viewonly=True-Sammlungen/Attribute werden nicht erneut abgefragt, bis sie abgelaufen sind

Wenn wir mit dem ursprünglichen Viewonly-Attribut fortfahren und Änderungen an der User.all_tasks-Sammlung eines persistenten Objekts vornehmen, kann die Viewonly-Sammlung das Nettoergebnis dieser Änderung erst anzeigen, nachdem **zwei** Dinge passiert sind. Das erste ist, dass die Änderung an User.all_tasks gefusht wird, sodass die neuen Daten in der Datenbank verfügbar sind, zumindest im Geltungsbereich der lokalen Transaktion. Das zweite ist, dass das Attribut User.current_week_tasks abgelaufen ist und über eine neue SQL-Abfrage an die Datenbank neu geladen wird.

Um diese Anforderung zu unterstützen, ist der einfachste zu verwendende Ablauf der, bei dem die **Viewonly-Beziehung nur in Operationen verwendet wird, die von vornherein hauptsächlich schreibgeschützt sind**. Wie unten gezeigt, wenn wir einen User frisch aus der Datenbank abrufen, ist die Sammlung aktuell.

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

Wenn wir Änderungen an u1.all_tasks vornehmen und diese Änderungen in der Viewonly-Beziehung u1.current_week_tasks sehen möchten, müssen diese Änderungen gefusht werden und das Attribut u1.current_week_tasks muss abgelaufen sein, damit es beim nächsten Zugriff Lazy Load durchführt. Der einfachste Ansatz dafür ist die Verwendung von Session.commit(), wobei der Parameter Session.expire_on_commit auf seinem Standardwert von True belassen wird.

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

Oben hat der Aufruf von Session.commit() die Änderungen an u1.all_tasks an die Datenbank gefusht und dann alle Objekte abgelaufen, sodass beim Zugriff auf u1.current_week_tasks ein :term:`Lazy Load` stattfand, der die Inhalte für dieses Attribut frisch aus der Datenbank abrief.

Um Operationen abzufangen, ohne die Transaktion tatsächlich zu committen, muss das Attribut zuerst explizit abgelaufen werden. Eine einfache Möglichkeit, dies zu tun, ist, es direkt aufzurufen. Im folgenden Beispiel sendet Session.flush() ausstehende Änderungen an die Datenbank, dann wird Session.expire() verwendet, um die u1.current_week_tasks-Sammlung abzulaufen, damit sie beim nächsten Zugriff neu geladen wird.

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

Wir können den Aufruf von Session.flush() überspringen, vorausgesetzt, eine Session, die Session.autoflush auf seinem Standardwert von True belässt, da das abgelaufene Attribut current_week_tasks bei Zugriff nach Ablauf einen Autoflush auslöst.

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

Wenn wir mit dem obigen Ansatz zu etwas Ausgefeilterem übergehen, können wir die Ablaufprogrammierung abfangen, wenn die zugehörige User.all_tasks-Sammlung geändert wird, indem wir Event Hooks verwenden. Dies ist eine **fortgeschrittene Technik**, bei der einfachere Architekturen wie @property oder die Beschränkung auf schreibgeschützte Anwendungsfälle zuerst geprüft werden sollten. In unserem einfachen Beispiel würde dies wie folgt konfiguriert:

from sqlalchemy import event, inspect


@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

Mit den obigen Hooks werden Mutationsoperationen abgefangen und führen dazu, dass die User.current_week_tasks-Sammlung automatisch abgelaufen wird.

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

Die oben verwendeten AttributeEvents-Event-Hooks werden auch durch Backref-Mutationen ausgelöst, sodass mit den obigen Hooks eine Änderung an Task.user ebenfalls abgefangen wird.

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]