Zapętlasz 16 milionów rekordów za pomocą ArcPy?

13

Mam tabelę z 8 kolumnami i ~ 16,7 miliona rekordów. Muszę uruchomić zestaw równań if-else na kolumnach. Napisałem skrypt za pomocą modułu UpdateCursor, ale po kilku milionach rekordów zabrakło mu pamięci. Zastanawiałem się, czy istnieje lepszy sposób na przetworzenie tych 16,7 miliona rekordów.

import arcpy

arcpy.TableToTable_conversion("combine_2013", "D:/mosaic.gdb", "combo_table")

c_table = "D:/mosaic.gdb/combo_table"

fields = ['dev_agg', 'herb_agg','forest_agg','wat_agg', 'cate_2']

start_time = time.time()
print "Script Started"
with arcpy.da.UpdateCursor(c_table, fields) as cursor:
    for row in cursor:
        # row's 0,1,2,3,4 = dev, herb, forest, water, category
        #classficiation water = 1; herb = 2; dev = 3; forest = 4
        if (row[3] >= 0 and row[3] > row[2]):
            row[4] = 1
        elif (row[2] >= 0 and row[2] > row[3]):
            row[4] = 4
        elif (row[1] > 180):
            row[4] = 2
        elif (row[0] > 1):
            row[4] = 3
        cursor.updateRow(row)
end_time = time.time() - start_time
print "Script Complete - " +  str(end_time) + " seconds"

AKTUALIZACJA # 1

Uruchomiłem ten sam skrypt na komputerze z 40 GB pamięci RAM (oryginalny komputer miał tylko 12 GB pamięci RAM). Pomyślnie ukończono po ~ 16 godzinach. Wydaje mi się, że 16 godzin to za długo, ale nigdy nie pracowałem z tak dużym zestawem danych, więc nie wiem, czego się spodziewać. Jedynym nowym dodatkiem do tego skryptu jest arcpy.env.parallelProcessingFactor = "100%". Próbuję dwóch sugerowanych metod (1) zrobienia 1 miliona rekordów partiami i (2) przy użyciu SearchCursor i zapisywania wyników do csv. Niedługo przedstawię sprawozdanie z postępów.

AKTUALIZACJA # 2

SearchCursor i aktualizacja CSV działały doskonale! Nie mam dokładnych czasów uruchomienia, zaktualizuję pocztę, gdy jutro będę w biurze, ale powiedziałbym, że przybliżony czas pracy wynosi ~ 5-6 minut, co jest dość imponujące. Nie spodziewałem się tego. Udostępniam mój nieoczyszczony kod wszelkie uwagi i ulepszenia są mile widziane:

import arcpy, csv, time
from arcpy import env

arcpy.env.parallelProcessingFactor = "100%"

arcpy.TableToTable_conversion("D:/mosaic.gdb/combine_2013", "D:/mosaic.gdb", "combo_table")
arcpy.AddField_management("D:/mosaic.gdb/combo_table","category","SHORT")

# Table
c_table = "D:/mosaic.gdb/combo_table"
fields = ['wat_agg', 'dev_agg', 'herb_agg','forest_agg','category', 'OBJECTID']

# CSV
c_csv = open("D:/combine.csv", "w")
c_writer = csv.writer(c_csv, delimiter= ';',lineterminator='\n')
c_writer.writerow (['OID', 'CATEGORY'])
c_reader = csv.reader(c_csv)

start_time = time.time()
with arcpy.da.SearchCursor(c_table, fields) as cursor:
    for row in cursor:
        #skip file headers
        if c_reader.line_num == 1:
            continue
        # row's 0,1,2,3,4,5 = water, dev, herb, forest, category, oid
        #classficiation water = 1; dev = 2; herb = 3; ; forest = 4
        if (row[0] >= 0 and row[0] > row[3]):
            c_writer.writerow([row[5], 1])
        elif (row[1] > 1):
            c_writer.writerow([row[5], 2])
        elif (row[2] > 180):
            c_writer.writerow([row[5], 3])
        elif (row[3] >= 0 and row[3] > row[0]):
            c_writer.writerow([row[5], 4])

c_csv.close()
end_time =  time.time() - start_time
print str(end_time) + " - Seconds"

AKTUALIZACJA # 3 Ostatnia aktualizacja. Całkowity czas działania skryptu wynosi ~ 199,6 sekundy / 3,2 minuty.

cptpython
źródło
1
Czy używasz wersji 64-bitowej (w tle, na serwerze lub w wersji Pro)?
KHibma,
Zapomniałem wspomnieć. Używam 10.4 x64 w tle.
cptpython
Adwokat diabłów - czy próbowałeś uruchomić go na pierwszym planie lub z IDLE, ponieważ patrząc na swój skrypt nie musisz mieć otwartej ArcMap?
Hornbydd,
uruchom go jako samodzielny skrypt lub jeśli znasz SQL, prześlij
plik shapefile
1
Rozumiem, że jest to oprogramowanie typu open source, ale proces zatwierdzania zajmuje ~ 1–2 tygodnie i jest to czasochłonne, więc nie sądzę, aby było to możliwe w tym przypadku.
cptpython

Odpowiedzi:

4

Możesz zapisać Objectid i wynik obliczeń (cate_2) do pliku csv. Następnie dołącz plik csv do oryginalnego pliku, wypełnij pole, aby zachować wynik. W ten sposób nie aktualizujesz tabeli za pomocą kursora DA. Możesz użyć kursora wyszukiwania.

Klewis
źródło
Myślałem tak samo, jak tutaj jest dyskusja , a oni mówią o jeszcze większych zestawach danych.
Hornbydd,
Dzięki, Klewis. Brzmi obiecująco. Spróbuję tego wraz z sugestią FelixIP i ciekawą dyskusją, choć będę musiał to zrobić kilkadziesiąt razy.
cptpython
Działa świetnie! Zaktualizowałem pytanie o najnowszy skrypt. Dzięki!
cptpython
2

Przepraszam, jeśli wciąż ożywiam ten stary wątek. Pomysł polegał na wykonaniu instrukcji if-else na rastrze kombajnu, a następnie użyciu nowego pola w odnośniku w celu utworzenia nowego rastra. Skomplikowałem problem, eksportując dane jako tabelę i wprowadziłem nieefektywny przepływ pracy, którym zajął się @Alex Tereshenkov. Po zrozumieniu tego, co oczywiste, podzieliłem dane na 17 zapytań (po 1 milion), zgodnie z sugestią @FelixIP. Każda partia trwała średnio ~ 1,5 minuty, a całkowity czas pracy wynosił ~ 23,3 minuty. Ta metoda eliminuje potrzebę łączenia i myślę, że ta metoda najlepiej spełnia zadanie. Oto poprawiony skrypt do wykorzystania w przyszłości:

import arcpy, time
from arcpy import env

def cursor():
    combine = "D:/mosaic.gdb/combine_2013"
    #arcpy.AddField_management(combine,"cat_1","SHORT")
    fields = ['wat_agg', 'dev_agg', 'herb_agg','forest_agg', 'cat_1']
    batch = ['"OBJECTID" >= 1 AND "OBJECTID" <= 1000000', '"OBJECTID" >= 1000001 AND "OBJECTID" <= 2000000', '"OBJECTID" >= 2000001 AND "OBJECTID" <= 3000000', '"OBJECTID" >= 3000001 AND "OBJECTID" <= 4000000', '"OBJECTID" >= 4000001 AND "OBJECTID" <= 5000000', '"OBJECTID" >= 5000001 AND "OBJECTID" <= 6000000', '"OBJECTID" >= 6000001 AND "OBJECTID" <= 7000000', '"OBJECTID" >= 7000001 AND "OBJECTID" <= 8000000', '"OBJECTID" >= 8000001 AND "OBJECTID" <= 9000000', '"OBJECTID" >= 9000001 AND "OBJECTID" <= 10000000', '"OBJECTID" >= 10000001 AND "OBJECTID" <= 11000000', '"OBJECTID" >= 11000001 AND "OBJECTID" <= 12000000', '"OBJECTID" >= 12000001 AND "OBJECTID" <= 13000000', '"OBJECTID" >= 13000001 AND "OBJECTID" <= 14000000', '"OBJECTID" >= 14000001 AND "OBJECTID" <= 15000000', '"OBJECTID" >= 15000001 AND "OBJECTID" <= 16000000', '"OBJECTID" >= 16000001 AND "OBJECTID" <= 16757856']
    for i in batch:
        start_time = time.time()
        with arcpy.da.UpdateCursor(combine, fields, i) as cursor:
            for row in cursor:
            # row's 0,1,2,3,4,5 = water, dev, herb, forest, category
            #classficiation water = 1; dev = 2; herb = 3; ; forest = 4
                if (row[0] >= 0 and row[0] >= row[3]):
                    row[4] = 1
                elif (row[1] > 1):
                    row[4] = 2
                elif (row[2] > 180):
                    row[4] = 3
                elif (row[3] >= 0 and row[3] > row[0]):
                    row[4] = 4
                cursor.updateRow(row)
        end_time =  time.time() - start_time
        print str(end_time) + " - Seconds"

cursor()
cptpython
źródło
Tak więc, aby upewnić się, że rozumiem to poprawnie. W swoim oryginalnym poście powiedziałeś, że kiedy uruchomiłeś to na komputerze z 40 GB pamięci RAM, zajęło to około 16 godzin. Ale teraz, kiedy podzieliłeś go na 17 partii i zajęło to łącznie ~ 23 minuty. Czy to jest poprawne?
ianbroad
Poprawny. Pierwsze uruchomienie zajęło ~ 16 godzin z 40 GB pamięci RAM, a drugie uruchomienie zajęło ~ 23 minut + kolejne ~ 15 minut na wykonanie Lookupi wyeksportowanie rastra z nowo zdefiniowanymi kategoriami.
cptpython
Tylko uwaga, która arcpy.env.parallelProcessingFactor = "100%"nie ma wpływu na twój skrypt. Nie widzę tam żadnych narzędzi, które mogłyby wykorzystać to środowisko.
KHibma,
Masz rację. Zmienię kod.
cptpython
1

Możesz spróbować zmienić na użycie CalculateField_management . Zapobiega to zapętlaniu się za pomocą kursorów, a po wyglądzie opcji dla wartości kategorii można ustawić to jako cztery podprocesy odradzające się sekwencyjnie. Po zakończeniu każdego podprocesu pamięć jest zwalniana przed rozpoczęciem następnego. Oddajesz małe (milisekundy) trafienie spawnujące każdy podproces.

Lub, jeśli chcesz zachować obecne podejście, skorzystaj z podprocesu, który pobiera x wierszy naraz. Przeprowadź główny proces, aby go poprowadzić i, jak poprzednio, za każdym razem, gdy się skończy, wciąż przeszukujesz swoją pamięć. Dodatkową zaletą robienia tego w ten sposób (szczególnie poprzez samodzielny proces pythona) jest to, że możesz lepiej wykorzystać wszystkie swoje rdzenie jako spawnujące podprocesy w wielowątkowości Pythona wokół GIL. Jest to możliwe dzięki ArcPy i podejściu, które stosowałem w przeszłości do masowej utraty danych. Oczywiście zmniejsz ilość danych, w przeciwnym razie skończy Ci się pamięć szybciej!

MappaGnosis
źródło
Z mojego doświadczenia wynika, że ​​używanie arcpy.da.UpdateCursor jest znacznie szybsze niż arcpy.CalculateField_management. Napisałem skrypt, który działa na 55 000 000 funkcji budynku, był około 5 razy wolniejszy dzięki narzędziu CalculateField.
offermann
Chodzi o to, aby ustawić cztery podprocesy i oczyścić pamięć, ponieważ jest to tutaj prawdziwy punkt uszczypnięcia. Jak zarysowuję w drugim akapicie, podprocesy można podzielić na wiersze, ale zarządzanie to wymaga nieco więcej zarządzania niż pojedynczego wyboru.
MappaGnosis,
1

Logikę manipulacji danymi można zapisać jako instrukcję SQL UPDATE przy użyciu wyrażenia CASE, którą można wykonać za pomocą GDAL / OGR, np. Przez OSGeo4W z gdal-filegdbzainstalowanym programem .

Oto przepływ pracy, który wykorzystuje osgeo.ogrzamiast arcpy:

import time
from osgeo import ogr

ds = ogr.Open('D:/mosaic.gdb', 1)
if ds is None:
    raise ValueError("You don't have a 'FileGDB' driver, or the dataset doesn't exist")
sql = '''\
UPDATE combo_table SET cate_2 = CASE
    WHEN wat_agg >= 0 AND wat_agg > forest_agg THEN 1
    WHEN dev_agg > 1 THEN 2
    WHEN herb_agg > 180 THEN 3
    WHEN forest_agg >= 0 AND forest_agg > wat_agg THEN 4
    END
'''
start_time = time.time()
ds.ExecuteSQL(sql, dialect='sqlite')
ds = None  # save, close
end_time =  time.time() - start_time
print("that took %.1f seconds" % end_time)

Na podobnej tabeli z nieco ponad 1 milionem rekordów zapytanie zajęło 18 minut. Przetwarzanie 16 milionów rekordów może potrwać około 4–5 godzin.

Mike T.
źródło
Niestety skrypt jest częścią większego skryptu napisanego przy użyciu, arcpyale doceniam odpowiedź. Powoli próbuję więcej używać GDAL.
cptpython
1

Aktualizacja kodu w sekcji 2 w twoim pytaniu nie pokazuje, w jaki sposób łączysz .csvplik z powrotem do oryginalnej tabeli w geobazie plików. Mówisz, że uruchomienie skryptu zajęło ~ 5 minut. Brzmi to uczciwie, jeśli .csvplik został wyeksportowany tylko bez wykonywania połączeń. Gdy spróbujesz przywrócić .csvplik do ArcGIS, napotkasz problemy z wydajnością.

1) Nie można wykonywać połączeń bezpośrednio z .csvtabeli geobazy, ponieważ .csvplik nie ma identyfikatora OID (pole obliczone przy użyciu unikalnych wartości nie pomoże, ponieważ nadal trzeba będzie przekonwertować .csvplik na tabelę geobazy). Tak więc kilka minut na Table To Tablenarzędzie GP (możesz użyć in_memoryobszaru roboczego do utworzenia tabeli tymczasowej, będzie nieco szybsze).

2) Po załadowaniu .csvdo tabeli geobazy, chciałbyś zbudować indeks na polu, w którym wykonasz łączenie (w twoim przypadku, wartość źródłowa objectidz .csvpliku. Zajmie to kilka minut na tabeli wierszy 16mln.

3) Następnie musisz użyć narzędzi Add Joinlub Join FieldGP. Żadne z nich nie będzie dobrze działać na dużych stołach.

4) Następnie musisz wykonać Calculate Fieldnarzędzie GP, aby obliczyć nowo połączone pola (pola). Wiele minut tutaj; co więcej, obliczanie pól zajmuje więcej czasu, gdy pola biorące udział w obliczeniach pochodzą z połączonej tabeli.

Jednym słowem, nie dostaniesz niczego, co wspominasz o 5 minutach. Jeśli zrobisz to za godzinę, będę pod wrażeniem.

Aby uniknąć przetwarzania dużych zestawów danych w ArcGIS, sugeruję przeniesienie danych poza ArcGIS do pandasramki danych i wykonanie tam wszystkich obliczeń. Kiedy skończysz, po prostu zapisz wiersze ramki danych z powrotem w nowej tabeli geobazy za pomocą da.InsertCursor(lub możesz obciąć istniejącą tabelę i zapisać wiersze w źródłowej).

Pełny kod, który napisałem w celu przeprowadzenia tego testu, znajduje się poniżej:

import time
from functools import wraps
import arcpy
import pandas as pd

def report_time(func):
    '''Decorator reporting the execution time'''
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(func.__name__, round(end-start,3))
        return result
    return wrapper

#----------------------------------------------------------------------
@report_time
def make_df(in_table,limit):
    columns = [f.name for f in arcpy.ListFields(in_table) if f.name != 'OBJECTID']
    cur = arcpy.da.SearchCursor(in_table,columns,'OBJECTID < {}'.format(limit))
    rows = (row for row in cur)
    df = pd.DataFrame(rows,columns=columns)
    return df

#----------------------------------------------------------------------
@report_time
def calculate_field(df):
    df.ix[(df['DataField2'] % 2 == 0), 'Category'] = 'two'
    df.ix[(df['DataField2'] % 4 == 0), 'Category'] = 'four'
    df.ix[(df['DataField2'] % 5 == 0), 'Category'] = 'five'
    df.ix[(df['DataField2'] % 10 == 0), 'Category'] = 'ten'
    df['Category'].fillna('other', inplace=True)
    return df

#----------------------------------------------------------------------
@report_time
def save_gdb_table(df,out_table):
    rows_to_write = [tuple(r[1:]) for r in df.itertuples()]
    with arcpy.da.InsertCursor(out_table,df.columns) as ins_cur:
        for row in rows_to_write:
            ins_cur.insertRow(row)

#run for tables of various sizes
for limit in [100000,500000,1000000,5000000,15000000]:
    print '{:,}'.format(limit).center(50,'-')

    in_table = r'C:\ArcGIS\scratch.gdb\BigTraffic'
    out_table = r'C:\ArcGIS\scratch.gdb\BigTrafficUpdated'
    if arcpy.Exists(out_table):
        arcpy.TruncateTable_management(out_table)

    df = make_df(in_table,limit=limit)
    df = calculate_field(df)
    save_gdb_table(df, out_table)
    print

Poniżej przedstawiono dane wyjściowe debugowania we / wy (podana liczba to liczba użytych wierszy w tabeli) z informacją o czasie wykonania poszczególnych funkcji:

---------------------100,000----------------------
('make_df', 1.141)
('calculate_field', 0.042)
('save_gdb_table', 1.788)

---------------------500,000----------------------
('make_df', 4.733)
('calculate_field', 0.197)
('save_gdb_table', 8.84)

--------------------1,000,000---------------------
('make_df', 9.315)
('calculate_field', 0.392)
('save_gdb_table', 17.605)

--------------------5,000,000---------------------
('make_df', 45.371)
('calculate_field', 1.903)
('save_gdb_table', 90.797)

--------------------15,000,000--------------------
('make_df', 136.935)
('calculate_field', 5.551)
('save_gdb_table', 275.176)

Wstawienie wiersza z da.InsertCursorzajmuje stały czas, to znaczy, jeśli wstawienie 1 rzędu zajmie, powiedzmy, 0,1 sekundy, wstawienie 100 rzędów zajmie 10 sekund. Niestety, ponad 95% całkowitego czasu wykonania spędza na czytaniu tabeli geobazy, a następnie wstawianiu wierszy z powrotem do geobazy.

To samo dotyczy tworzenia pandasramki danych z da.SearchCursorgeneratora i obliczania pola (pól). Wraz ze podwojeniem liczby wierszy w źródłowej tabeli geobazy, czas wykonania powyższego skryptu również się wydłuża. Oczywiście nadal musisz używać 64-bitowego Pythona, ponieważ podczas wykonywania niektóre większe struktury danych będą obsługiwane w pamięci.

Alex Tereshenkov
źródło
Właściwie zamierzałem zadać kolejne pytanie, które mówiłoby o ograniczeniach zastosowanej przeze mnie metody, ponieważ natrafiłem na problemy, które rozwiązałeś powyżej, więc dziękuję! Co próbuję osiągnąć: połącz cztery rastry, a następnie wykonaj instrukcję if-else na podstawie kolumn i wypisz dane wyjściowe do nowej kolumny, a na końcu wykonaj, Lookupaby utworzyć raster na podstawie wartości w nowej kolumnie. Moja metoda miała wiele niepotrzebnych kroków i nieefektywnego przepływu pracy, powinienem był o tym wspomnieć w moim pierwotnym pytaniu. Żyj i ucz się. Wypróbuję twój skrypt później w tym tygodniu.
cptpython