psycopg2: wstaw wiele wierszy za pomocą jednego zapytania

141

Muszę wstawić wiele wierszy za pomocą jednego zapytania (liczba wierszy nie jest stała), więc muszę wykonać zapytanie takie jak to:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

Znam tylko jeden sposób

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

ale ja chcę prostszego sposobu.

Sergey Fedoseev
źródło

Odpowiedzi:

219

Zbudowałem program, który wstawia wiele wierszy do serwera znajdującego się w innym mieście.

Dowiedziałem się, że użycie tej metody było około 10 razy szybsze niż executemany. W moim przypadku tupjest to krotka zawierająca około 2000 wierszy. Przy użyciu tej metody zajęło to około 10 sekund:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

i 2 minuty przy użyciu tej metody:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)
ant32
źródło
15
Nadal bardzo aktualne prawie dwa lata później. Dzisiejsze doświadczenie sugeruje, że wraz ze wzrostem liczby rzędów, które chcesz przesunąć, lepiej jest użyć tej executestrategii. Dzięki temu zauważyłem przyspieszenie około 100x!
Rob Watts
4
Być może executemanyuruchamia zatwierdzenie po każdym wstawieniu. Jeśli zamiast tego zawrzesz całość w transakcję, może to przyspieszy sprawę?
Richard
4
Właśnie sam potwierdziłem tę poprawę. Z tego, co przeczytałem, psycopg2 executemanynie robi nic optymalnego, po prostu zapętla się i robi wiele executeinstrukcji. Korzystając z tej metody, wstawianie 700 wierszy na zdalny serwer przeszło z 60 do <2 sekund.
Nelson
5
Może jestem paranoikiem, ale łączenie zapytania z zapytaniem +wydaje się, że może otworzyć się na wstrzyknięcie sql, czuję, że execute_values()rozwiązanie @Clodoaldo Neto jest bezpieczniejsze.
Will Munn,
26
w przypadku, gdy ktoś napotka następujący błąd: [TypeError: sequence item 0: oczekiwana instancja str, znalezione bajty] uruchom to polecenie [args_str = ','. join (cur.mogrify ("(% s,% s)", x ) .decode ("utf-8") for x in tup)]
mrt
147

Nowa execute_valuesmetoda w Psycopg 2.7:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

Pythonowy sposób na zrobienie tego w Psycopg 2.6:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

Objaśnienie: Jeśli dane do wstawienia są podane jako lista krotek, jak w

data = [(1,'x'), (2,'y')]

to jest już w dokładnie wymaganym formacie, jak

  1. valuesskładnia insertklauzuli spodziewa listę rekordów, jak w

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopgdostosowuje Pythona tupledo Postgresql record.

Jedyną konieczną pracą jest dostarczenie szablonu listy rekordów do wypełnienia przez psycopg

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

i umieść go w insertzapytaniu

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

Drukowanie insert_querywyników

insert into t (a, b) values %s,%s

Teraz do zwykłego Psycopgpodstawiania argumentów

cursor.execute(insert_query, data)

Lub po prostu testowanie tego, co zostanie wysłane na serwer

print (cursor.mogrify(insert_query, data).decode('utf8'))

Wynik:

insert into t (a, b) values (1, 'x'),(2, 'y')
Clodoaldo Neto
źródło
1
Jak wypada wydajność tej metody w porównaniu z cur.copy_from?
Michael Goldshteyn
1
Oto streszczenie ze wzorcem . copy_from skaluje się do około 6,5 razy szybciej na moim komputerze z 10 milionami rekordów.
Joseph Sheedy,
Wygląda ładnie - myślę, że na końcu swojej początkowej definicji insert_query masz zbłąkaną definicję insert_query (chyba że próbowałeś zrobić z niej krotkę?) I brakuje jej jak po% for% s również w początkowej definicji insert_query.
deadcode
2
używając execute_valuesbyłem w stanie uruchomić mój system z prędkością 1 tys. rekordów na minutę do 128 tys. rekordów na minutę
Conrad.Dean
66

Aktualizacja za pomocą psycopg2 2.7:

Wersja klasyczna executemany()jest około 60 razy wolniejsza niż implementacja @ ant32 (zwana „złożoną”), jak wyjaśniono w tym wątku: https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

Ta implementacja została dodana do psycopg2 w wersji 2.7 i nazywa się execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

Poprzednia odpowiedź:

Aby wstawić wiele wierszy, użycie VALUESskładni multirow z execute()jest około 10x szybsze niż użycie psycopg2 executemany(). Rzeczywiście, executemany()po prostu uruchamia wiele indywidualnych INSERTinstrukcji.

Kod @ ant32 działa doskonale w Pythonie 2. Ale w Pythonie 3 cursor.mogrify()zwraca bajty, cursor.execute()pobiera bajty lub łańcuchy i ','.join()oczekuje strwystąpienia.

Dlatego w Pythonie 3 może zajść potrzeba zmodyfikowania kodu @ ant32, dodając .decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

Lub używając tylko bajtów (z b''lub b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 
Antoine Dusséaux
źródło
26

kursor.copy_from to zdecydowanie najszybsze rozwiązanie, jakie znalazłem dla wstawiania zbiorczego. Oto streszczenie, które stworzyłem, zawierające klasę o nazwie IteratorFile, która umożliwia iteratorowi, który generuje ciągi, odczytywanie jak pliku. Możemy przekonwertować każdy rekord wejściowy na łańcuch za pomocą wyrażenia generatora. Więc rozwiązaniem byłoby

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

W przypadku tego trywialnego rozmiaru argumentów nie spowoduje to dużej różnicy w szybkości, ale widzę duże przyspieszenia w przypadku tysięcy + wierszy. Będzie to również bardziej wydajne w pamięci niż tworzenie gigantycznego ciągu zapytania. Iterator może przechowywać tylko jeden rekord wejściowy w pamięci naraz, gdzie w pewnym momencie zabraknie pamięci w procesie Pythona lub w Postgres, budując ciąg zapytania.

Joseph Sheedy
źródło
3
Oto test porównawczy porównujący copy_from / IteratorFile z rozwiązaniem do tworzenia zapytań. copy_from skaluje się do około 6,5 razy szybciej na moim komputerze z 10 milionami rekordów.
Joseph Sheedy,
3
czy musisz się bawić z uciekającymi ciągami znaków i znacznikami czasu itp.?
CpILL,
Tak, musisz się upewnić, że masz dobrze uformowane rekordy TSV.
Joseph Sheedy,
24

Fragment ze strony samouczka Psycopg2 na Postgresql.org (patrz na dole) :

Ostatnią rzeczą, którą chciałbym wam pokazać, jest wstawianie wielu wierszy za pomocą słownika. Gdybyś miał:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

Możesz łatwo wstawić wszystkie trzy wiersze w słowniku, używając:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

Nie oszczędza dużo kodu, ale zdecydowanie wygląda lepiej.

ptrn
źródło
35
Spowoduje to uruchomienie wielu indywidualnych INSERTinstrukcji. Przydatne, ale nie to samo, co pojedyncza VALUEwkładka multi- d.
Craig Ringer
7

Wszystkie te techniki w terminologii Postgres nazywane są „Extended Inserts”, a od 24 listopada 2016 r. Są one wciąż o tonę szybsze niż executemany () psychopg2 i wszystkie inne metody wymienione w tym wątku (które wypróbowałem przed przejściem do tego odpowiedź).

Oto kod, który nie używa cur.mogrify i jest przyjemny i prosty do zrozumienia:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

Ale należy zauważyć, że jeśli możesz użyć copy_from (), powinieneś użyć copy_from;)

JJ
źródło
Wychowywanie z martwych, ale co się dzieje w sytuacji z kilku ostatnich rzędów? Zakładam, że faktycznie uruchomiłeś tę ostatnią klauzulę ponownie w ostatnich pozostałych wierszach, w przypadku, gdy masz parzystą liczbę wierszy?
Mcpeterson
Prawidłowo, przepraszam, że musiałem o tym zapomnieć, kiedy pisałem przykład - to dość głupie z mojej strony. Nie zrobienie tego nie dałoby ludziom błędu, co sprawia, że ​​martwię się, ile osób skopiowało / wkleiło rozwiązanie i zajęło się swoimi sprawami… W każdym razie, bardzo wdzięczny mcpeterson - dziękuję!
JJ
2

Od kilku lat korzystam z powyższej odpowiedzi ant32. Jednak odkryłem, że jest to błąd w Pythonie 3, ponieważ mogrifyzwraca ciąg bajtów.

Konwersja jawna do ciągów bajtowych jest prostym rozwiązaniem umożliwiającym dostosowanie kodu do języka Python 3.

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)
jprockbelly
źródło
1

Innym przyjemnym i wydajnym podejściem jest przekazywanie wierszy do wstawienia jako 1 argument, czyli tablica obiektów json.

Np. Przekazujesz argument:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

Jest to tablica, która może zawierać dowolną ilość obiektów wewnątrz. Wtedy twój SQL wygląda tak:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

Uwaga: Twój postgress musi być wystarczająco nowy, aby obsługiwał json

Daniel Garmoshka
źródło
1

Rozwiązanie kursor.copyfrom dostarczone przez @ jopseph.sheedy ( https://stackoverflow.com/users/958118/joseph-sheedy ) powyżej ( https://stackoverflow.com/a/30721460/11100064 ) jest rzeczywiście błyskawiczne.

Jednak podany przez niego przykład nie jest generalnie użyteczny dla rekordu z dowolną liczbą pól i zajęło mi trochę czasu, aby dowiedzieć się, jak go poprawnie użyć.

IteratorFile musi być rutworzony z polami oddzielonymi tabulatorami, takimi jak to ( jest to lista dykt, gdzie każdy dykt jest rekordem):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

Aby uogólnić dla dowolnej liczby pól, najpierw utworzymy ciąg linii z odpowiednią liczbą tabulatorów i symboli zastępczych: "{}\t{}\t{}....\t{}"a następnie użyjemy .format()do wypełnienia wartości pól *list(r.values())) for r in records:

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

pełna funkcja w skrócie tutaj .

Bart Jonk
źródło
0

Jeśli używasz SQLAlchemy, nie musisz mieszać z ręcznym tworzeniem ciągu, ponieważ SQLAlchemy obsługuje generowanie wielowierszowej VALUESklauzuli dla pojedynczej INSERTinstrukcji :

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)
Jeff Widman
źródło
Pod maską SQLAlchemy używa metody executemany () psychopg2 dla wywołań takich jak to i dlatego ta odpowiedź będzie miała poważne problemy z wydajnością dla dużych zapytań. Zobacz metodę wykonywania docs.sqlalchemy.org/en/latest/orm/session_api.html .
sage88,
2
Nie sądzę, żeby tak było. Minęło trochę czasu, odkąd to obejrzałem, ale IIRC, to właściwie buduje pojedynczą instrukcję wstawiania w insert_querylinii. Następnie session.execute()wywołuje po prostu execute()instrukcję psycopg2 z pojedynczym ogromnym ciągiem. Tak więc „sztuczka” polega na zbudowaniu najpierw całego obiektu instrukcji wstawiania. Używam tego do wstawiania 200 000 wierszy naraz i zauważyłem ogromny wzrost wydajności przy użyciu tego kodu w porównaniu do normalnego executemany().
Jeff Widman,
1
Dokument SQLAlchemy, do którego utworzono łącze, zawiera sekcję, która dokładnie pokazuje, jak to działa, a nawet mówi: „Należy pamiętać, że przekazywanie wielu wartości NIE jest tym samym, co używanie tradycyjnej formy executemany ()”. Dlatego wyraźnie stwierdza, że ​​to działa.
Jeff Widman,
1
Poprawiono mnie. Nie zauważyłem użycia metody values ​​() (bez niej SQLAlchemy robi po prostu executemany). Powiedziałbym, że edytuj odpowiedź, aby dołączyć link do tego dokumentu, abym mógł zmienić mój głos, ale oczywiście już go uwzględniłeś. Być może wspomnij, że to nie to samo, co wywołanie metody insert () za pomocą funkcji execute () z listą poleceń?
sage88,
jak to działa w porównaniu z execute_values?
MrR
0

execute_batch zostało dodane do psycopg2 od czasu wysłania tego pytania.

Jest wolniejszy niż execute_values, ale prostszy w użyciu.

gerardw
źródło
2
Zobacz inne komentarze. Metoda psycopg2 execute_valuesjest szybsza niżexecute_batch
Fierr
0

executemany akceptuje tablicę krotek

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
Grigorij
źródło
-1

Jeśli chcesz wstawić wiele wierszy w ramach jednej statystyki wstawiania (zakładając, że nie używasz ORM), najłatwiejszym sposobem byłoby dla mnie jak dotąd użycie listy słowników. Oto przykład:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

Jak widać, zostanie wykonane tylko jedno zapytanie:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT
Alex
źródło
Pokazywanie logowania z silnika sqlalchemy NIE jest demonstracją wykonania tylko jednego zapytania, oznacza po prostu, że silnik sqlalchemy wykonał jedną komendę. Pod maską jest to użycie executemany psychopg2, co jest bardzo nieefektywne. Zobacz metodę wykonywania docs.sqlalchemy.org/en/latest/orm/session_api.html .
sage88,
-3

Korzystanie z aiopg - poniższy fragment działa doskonale

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)
Nihal Sharma
źródło
-4

Wreszcie w wersji SQLalchemy1.2, ta nowa implementacja jest dodawana do używania psycopg2.extras.execute_batch () zamiast executemany podczas inicjalizacji silnika za pomocą use_batch_mode = True, na przykład:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

Wtedy ktoś musiałby używać SQLalchmey, nie zawracałby sobie głowy próbowaniem różnych kombinacji sqla i psycopg2 oraz bezpośredniego SQL.

user2189731
źródło