Wstaw zbiorczo z SQLAlchemy ORM

131

Czy istnieje sposób, aby SQLAlchemy wykonywać zbiorcze wstawianie zamiast wstawiania poszczególnych obiektów? to znaczy,

robić:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

zamiast:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Właśnie przekonwertowałem kod tak, aby używał sqlalchemy zamiast surowego sql i chociaż teraz praca z nim jest znacznie przyjemniejsza, wydaje się, że jest teraz wolniejsza (do współczynnika 10), zastanawiam się, czy to jest powód.

Może mógłbym poprawić sytuację, efektywniej wykorzystując sesje. W tej chwili mam autoCommit=Falsei robię session.commit()po dodaniu kilku rzeczy. Chociaż wydaje się, że dane stają się nieaktualne, jeśli baza danych zostanie zmieniona w innym miejscu, na przykład nawet jeśli wykonam nowe zapytanie, nadal otrzymuję stare wyniki?

Dzięki za pomoc!

Nick Holden
źródło
1
To może pomóc: stackoverflow.com/questions/270879/…
Sean Vieira,
1
Nick, rozumiem, że to bardzo stary post. Czy byłoby możliwe zaktualizowanie tytułu do czegoś poprawnego, np. „Wstawianie wielu rekordów za pomocą SQLAlchemy ORM”. Instrukcje wstawiania wielu rekordów, takie jak ta, którą podałeś, różnią się znacznie od operacji ładowania zbiorczego na poziomie bazy danych. Zbiorcze wstawki są przeznaczone do przesyłania 1k + danych, zwykle z dużych zbiorów danych i dokonywanych przez menedżerów aplikacji, a nie operacji REST lub kodu na poziomie aplikacji ... Używajmy właściwie naszej nomenklatury.
W4t3randWind
Dla tych, którzy natkną się na to pytanie, szukając informacji o operacjach masowych w sqlalchemy Core (nie ORM), zobacz moją odpowiedź na inne pytanie .
Nickolay

Odpowiedzi:

175

SQLAlchemy wprowadziło to w wersji 1.0.0:

Operacje zbiorcze - SQLAlchemy Docs

Dzięki tym operacjom możesz teraz wykonywać zbiorcze wstawianie lub aktualizacje!

Na przykład możesz:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Tutaj zostanie wykonana wkładka zbiorcza.

Pierre
źródło
30
Potrzebujesz również s.commit (), aby faktycznie zapisać rekordy (trochę mi zajęło zrozumienie tego).
horcle_buzz
3
Próbowałem tego z sqlachemy 1.0.11 i nadal wyświetla 3 instrukcje wstawiania. Ale jest dużo szybszy niż zwykłe operacje ORM.
zidarsk8
3
chociaż nie dotyczy kwestii PO, warto wspomnieć, że narusza to pewne cechy ORM. docs.sqlalchemy.org/en/rel_1_0/orm/ ...
dangel
@dangel tak, dziękuję za opublikowanie tego. Chociaż tytuł OP dotyczy „ładowania zbiorczego”, jego pytanie dotyczące instrukcji wstawiania wielu rekordów nie ma nic wspólnego z funkcją ładowania zbiorczego sqlalchemy.
W4t3randWind
W porównaniu do wstawiania tych samych danych z CSV z \copypsql (od tego samego klienta do tego samego serwera), widzę ogromną różnicę w wydajności po stronie serwera, co daje około 10 razy więcej wstawień / s. Najwyraźniej ładowanie zbiorcze przy użyciu \copy(lub COPYna serwerze) przy użyciu pakietu w komunikacji z klienta do serwera jest DUŻO lepsze niż używanie SQL przez SQLAlchemy. Więcej informacji: Duża masa wkładka różnica wydajności PostgreSQL vs ... .
gertvdijk
42

Dokumentacja sqlalchemy zawiera opis wydajności różnych technik, których można użyć do wstawiania zbiorczego:

ORMy zasadniczo nie są przeznaczone do wysokowydajnych wkładów zbiorczych - to jest cały powód, dla którego SQLAlchemy oferuje rdzeń oprócz ORM jako komponent pierwszej klasy.

W przypadku użycia szybkich wstawień zbiorczych, system generowania i wykonywania kodu SQL, na którym opiera się ORM, jest częścią Rdzenia. Korzystając z tego systemu bezpośrednio, możemy stworzyć INSERT, który jest konkurencyjny w stosunku do bezpośredniego korzystania z surowego interfejsu API bazy danych.

Alternatywnie, SQLAlchemy ORM oferuje zestaw metod operacji zbiorczych, które zapewniają podpięcia do podsekcji procesu jednostki pracy w celu emitowania konstrukcji INSERT i UPDATE na poziomie rdzenia z niewielkim stopniem automatyzacji opartej na ORM.

Poniższy przykład ilustruje testy oparte na czasie dla kilku różnych metod wstawiania wierszy, od najbardziej zautomatyzowanych do najmniejszych. W cPythonie 2.7 zaobserwowano środowiska wykonawcze:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Scenariusz:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Grant Humphries
źródło
1
Dziękuję Ci. Naprawdę pomocny i dokładny.
Steve B.
Widziałem inny przykład używając bindparams. Składnia wygląda na zwięzłą, czy to dobrze?
Jay
35

O ile wiem, nie ma sposobu, aby ORM wydawał zbiorcze wkładki. Uważam, że głównym powodem jest to, że SQLAlchemy musi śledzić tożsamość każdego obiektu (tj. Nowe klucze podstawowe), a wstawianie zbiorcze koliduje z tym. Na przykład zakładając, że Twoja footabela zawiera idkolumnę i jest odwzorowana na Fooklasę:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Ponieważ SQLAlchemy pobrał wartość x.idbez wydawania kolejnego zapytania, możemy wywnioskować, że pobrał wartość bezpośrednio z INSERTinstrukcji. Jeśli nie potrzebujesz późniejszego dostępu do utworzonych obiektów za pośrednictwem tych samych instancji, możesz pominąć warstwę ORM dla swojej wstawki:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy nie może dopasować tych nowych wierszy do żadnych istniejących obiektów, więc będziesz musiał ponownie zapytać o nie w celu wykonania kolejnych operacji.

Jeśli chodzi o nieaktualne dane, warto pamiętać, że sesja nie ma wbudowanego sposobu, aby dowiedzieć się, kiedy baza danych zostanie zmieniona poza sesją. Aby uzyskać dostęp do danych zmodyfikowanych zewnętrznie za pośrednictwem istniejących wystąpień, należy je oznaczyć jako wygasłe . Dzieje się tak domyślnie session.commit(), ale można to zrobić ręcznie, dzwoniąc session.expire_all()lub session.expire(instance). Przykład (pominięty SQL):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()wygasa x, więc pierwsza instrukcja print niejawnie otwiera nową transakcję i ponownie wysyła zapytanie xdo atrybutów. Jeśli skomentujesz pierwszą instrukcję print, zauważysz, że druga pobiera teraz poprawną wartość, ponieważ nowe zapytanie jest emitowane dopiero po aktualizacji.

Ma to sens z punktu widzenia izolacji transakcyjnej - należy odbierać jedynie zewnętrzne modyfikacje pomiędzy transakcjami. Jeśli sprawia Ci to kłopoty, sugeruję wyjaśnienie lub ponowne przemyślenie granic transakcji w aplikacji zamiast natychmiastowego sięgania po session.expire_all().

dhaffey
źródło
Dziękuję za odpowiedź, spróbuję. WRT wygasający problem, to, co zobaczyłem, nie było takie samo. Używam sesji z lunetą w turbogears. Wykonanie getSession (). Query (Foo) .filter .... all () zwróciło różne rzeczy w zależności od żądania, również nie zwróciło zaktualizowanych rekordów, które znajdowały się w bazie danych, dopóki jej nie zrestartowałem. Naprawiłem ten problem, wykonując autocommit = True i dodając coś, co .remove () d sesji po zakończeniu żądania (rozumiem, że i tak masz to zrobić).
Nick Holden
Myślę, że zwrócił różne rzeczy w zależności od żądania, ponieważ miał sesję w określonym zakresie na wątek w puli, a sesje były w różnych stanach? Wydawało się jednak trochę dziwne, że sa nie otrzymają nowych danych po nowym żądaniu. Spodziewam się, że nie rozumiem, co robi autocommit = False
Nick Holden
Z autocommit=Falsewierzę, należy dzwonić session.commit()po zakończeniu żądanie (nie jestem zaznajomiony z TurboGears, więc ignorować tego, że jeśli jest obsługiwane przez ciebie na poziomie ramowej). Oprócz upewnienia się, że zmiany wprowadziły je do bazy danych, spowodowałoby to wygaśnięcie wszystkiego w sesji. Następna transakcja nie rozpocznie się do następnego użycia tej sesji, więc przyszłe żądania w tym samym wątku nie będą widzieć nieaktualnych danych.
dhaffey
10
Styl alternatywny:session.execute(Foo.__table__.insert(), values)
Joril
6
Zauważ, że nowsze wersje sqlalchemy mają możliwości wstawiania zbiorczego: docs.sqlalchemy.org/en/latest/orm/ ...
Wayne Werner
18

Zwykle robię to za pomocą add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
źródło
2
Czy na pewno to działa? To nie jest równoznaczne z wprowadzaniem .addich na sesję pojedynczo?
Alec
Byłoby to sprzeczne z intuicją, biorąc pod uwagę nazwę metody, dokumentacja nie zawiera szczegółowych informacji: Add the given collection of instances to this Session.Czy masz jakiś powód, by sądzić, że nie wykonuje ona zbiorczego wstawiania?
reubano
3
Nie sądzę, że jest to zbyt sprzeczne z intuicją - w rzeczywistości dodaje wszystkie rzeczy, o które go prosisz. Wydaje się, że nic z dodawania wszystkich rzeczy do sesji nie sugeruje, jakie podstawowe instrukcje SQL zostaną wydane. Patrząc na źródło: github.com/zzzeek/sqlalchemy/blob/… w rzeczywistości wydaje się, że .addkażdy element jest osobno.
Alec
Działa dobrze, w porównaniu do bulk_save_objects(), flush()gdy możemy uzyskać identyfikator obiektu, ale bulk_save_objects()nie możemy (zdarzenie z flush()wywołane).
coanor
14

Bezpośrednie wsparcie zostało dodane do SQLAlchemy w wersji 0.8

Zgodnie z dokumentacją , connection.execute(table.insert().values(data))powinno załatwić sprawę. (Zwróć uwagę, że to nie to samo, connection.execute(table.insert(), data)co powoduje wstawianie wielu pojedynczych wierszy przez wywołanie executemany). W przypadku wszystkich połączeń innych niż lokalne różnica w wydajności może być ogromna.

user3805082
źródło
Czy mógłbyś wyjaśnić, który z nich jest bardziej skuteczny?
Jacob Lee
10

SQLAlchemy wprowadziło to w wersji 1.0.0:

Operacje zbiorcze - SQLAlchemy Docs

Dzięki tym operacjom możesz teraz wykonywać zbiorcze wstawianie lub aktualizacje!

Na przykład (jeśli chcesz najmniejszego narzutu dla prostych wstawek w tabeli), możesz użyć Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Lub, jeśli chcesz, pomiń loadmekrotki i napisz słowniki bezpośrednio do dicts(ale łatwiej mi jest pozostawić całą rozmowę z danymi i załadować listę słowników w pętli).

juanitogan
źródło
7

Odpowiedź Piere'a jest poprawna, ale jedną kwestią jest to, że bulk_save_objectsdomyślnie nie zwraca kluczy głównych obiektów, jeśli cię to interesuje. Ustaw, return_defaultsaby Trueuzyskać takie zachowanie.

Dokumentacja jest tutaj .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Matthew Moisen
źródło
2
Należy zachować ostrożność przy fladze. Wstawi on po kolei jeden obiekt na raz, a znaczący wzrost wydajności może nie występować [1]. W moim przypadku wydajność pogorszyła się, co podejrzewałem z powodu narzutu. [1]: docs.sqlalchemy.org/en/13/orm/…
dhfromkorea
6

Wszystkie drogi prowadzą do Rzymu , ale niektóre z nich przecinają góry, wymagają promów, ale jeśli chcesz się tam szybko dostać, po prostu jedź autostradą.


W tym przypadku autostrady jest użycie execute_batch () cechę psycopg2 . Dokumentacja mówi to najlepiej:

Obecna implementacja executemany()(używając niezwykle charytatywnego niedomówienia) nie jest szczególnie skuteczna. Funkcje te mogą służyć do przyspieszenia powtarzania instrukcji dla zestawu parametrów. Zmniejszając liczbę obiegów serwera w obie strony, wydajność może być o rząd wielkości lepsza niż przy użyciu executemany().

W moim własnym teście execute_batch()jest około dwa razy szybszy niż executemany()i daje możliwość skonfigurowania page_size do dalszych poprawek (jeśli chcesz wycisnąć ostatnie 2-3% wydajności ze sterownika).

Tę samą funkcję można łatwo włączyć, jeśli używasz SQLAlchemy, ustawiając use_batch_mode=Truejako parametr podczas tworzenia wystąpienia silnika za pomocącreate_engine()

chjortlund
źródło
Uwaga: psycopg2 execute_valuesjest szybszy niż psycopg2 execute_batchpodczas wstawiania zbiorczego!
Fierr
5

To jest sposób:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

To wstawi w ten sposób:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Odniesienie: FAQ SQLAlchemy zawiera testy porównawcze dla różnych metod zatwierdzania.

Eefret
źródło
3

Najlepszą odpowiedzią, jaką do tej pory znalazłem, była dokumentacja sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Istnieje kompletny przykład wzorców możliwych rozwiązań.

Jak pokazano w dokumentacji:

bulk_save_objects nie jest najlepszym rozwiązaniem, ale jego wydajność jest poprawna.

Wydaje mi się, że drugą najlepszą implementacją pod względem czytelności był rdzeń SQLAlchemy:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Kontekst tej funkcji podano w artykule dokumentacji.

lelabo_m
źródło