Jak mogę szybciej zgarniać

16

Praca tutaj jest zeskrobać API witrynę, która rozpoczyna się od https://xxx.xxx.xxx/xxx/1.jsoncelu https://xxx.xxx.xxx/xxx/1417749.jsoni zapisz go dokładnie MongoDB. Do tego mam następujący kod:

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min = 1
max = 1417749
for n in range(min, max):
    response = requests.get("https:/xx.xxx.xxx/{}.json".format(str(n)))
    if response.status_code == 200:
        parsed = json.loads(response.text)
        inserted = com.insert_one(parsed)
        write_log.write(str(n) + "\t" + str(inserted) + "\n")
        print(str(n) + "\t" + str(inserted) + "\n")
write_log.close()

Ale wykonanie zadania zajmuje dużo czasu. Pytanie brzmi, jak mogę przyspieszyć ten proces.

Tek Nath
źródło
Czy po raz pierwszy próbowałeś porównać, ile czasu zajmuje przetworzenie pojedynczego jsona? Zakładając, że zajmuje to 300 ms na rekord, możesz przetworzyć wszystkie te rekordy sekwencyjnie w ciągu około 5 dni.
tuxdna,

Odpowiedzi:

5

asyncio jest również rozwiązaniem, jeśli nie chcesz używać wielowątkowości

import time
import pymongo
import json
import asyncio
from aiohttp import ClientSession


async def get_url(url, session):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()


async def create_task(sem, url, session):
    async with sem:
        response = await get_url(url, session)
        if response:
            parsed = json.loads(response)
            n = url.rsplit('/', 1)[1]
            inserted = com.insert_one(parsed)
            write_log.write(str(n) + "\t" + str(inserted) + "\n")
            print(str(n) + "\t" + str(inserted) + "\n")


async def run(minimum, maximum):
    url = 'https:/xx.xxx.xxx/{}.json'
    tasks = []
    sem = asyncio.Semaphore(1000)   # Maximize the concurrent sessions to 1000, stay below the max open sockets allowed
    async with ClientSession() as session:
        for n in range(minimum, maximum):
            task = asyncio.ensure_future(create_task(sem, url.format(n), session))
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses


client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min_item = 1
max_item = 100

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(min_item, max_item))
loop.run_until_complete(future)
write_log.close()
Frans
źródło
1
Korzystanie z asynchronizacji działało szybciej niż wielowątkowość.
Tek Nath
Dzięki za opinie. Ciekawy wynik.
Frans
10

Istnieje kilka rzeczy, które możesz zrobić:

  1. Użyj ponownie połączenia. Zgodnie z poniższym testem jest około 3 razy szybszy
  2. Możesz zgarniać wiele procesów równolegle

Kod równoległy stąd

from threading import Thread
from Queue import Queue
q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Czasy od tego pytania do połączenia wielokrotnego użytku

>>> timeit.timeit('_ = requests.get("https://www.wikipedia.org")', 'import requests', number=100)
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
...
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
52.74904417991638
>>> timeit.timeit('_ = session.get("https://www.wikipedia.org")', 'import requests; session = requests.Session()', number=100)
Starting new HTTPS connection (1): www.wikipedia.org
15.770191192626953
keiv.fly
źródło
4

Prawdopodobnie szukasz zgarniania asynchronicznego. Poleciłbym ci stworzyć kilka partii adresów URL, tj. 5 adresów URL (staraj się nie niszczyć strony) i zeskrobać je asynchronicznie. Jeśli nie wiesz zbyt wiele o asynchronizacji, przejdź do Google asyncio libary. Mam nadzieję, że mogę ci pomóc :)

T Piper
źródło
1
Czy możesz dodać więcej szczegółów.
Tek Nath,
3

Spróbuj podzielić żądania i skorzystać z operacji zapisu zbiorczego MongoDB.

  • zgrupuj wnioski (100 żądań na grupę)
  • Iteruj po grupach
  • Użyj modelu żądań asynchronicznych, aby pobrać dane (adres URL w grupie)
  • Zaktualizuj bazę danych po zakończeniu grupy (operacja zapisu zbiorczego)

Może to zaoszczędzić dużo czasu na następujące sposoby * Opóźnienie zapisu MongoDB * Opóźnienie synchronicznego połączenia sieciowego

Ale nie zwiększaj liczby żądań równoległych (wielkość porcji), zwiększy to obciążenie sieci serwera i serwer może uznać to za atak DDoS.

  1. https://api.mongodb.com/python/current/examples/bulk.html
thuva4
źródło
1
Czy możesz pomóc z kodem do grupowania żądań i pobierania grupy
Tek Nath,
3

Zakładając, że nie zostaniesz zablokowany przez API i że nie ma żadnych limitów prędkości, ten kod powinien przyspieszyć proces 50 razy (być może więcej, ponieważ wszystkie żądania są teraz wysyłane przy użyciu tej samej sesji).

import pymongo
import threading

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
logs=[]

number_of_json_objects=1417750
number_of_threads=50

session=requests.session()

def scrap_write_log(session,start,end):
    for n in range(start, end):
        response = session.get("https:/xx.xxx.xxx/{}.json".format(n))
        if response.status_code == 200:
            try:
                logs.append(str(n) + "\t" + str(com.insert_one(json.loads(response.text))) + "\n")
                print(str(n) + "\t" + str(inserted) + "\n")
            except:
                logs.append(str(n) + "\t" + "Failed to insert" + "\n")
                print(str(n) + "\t" + "Failed to insert" + "\n")

thread_ranges=[[x,x+number_of_json_objects//number_of_threads] for x in range(0,number_of_json_objects,number_of_json_objects//number_of_threads)]

threads=[threading.Thread(target=scrap_write_log, args=(session,start_and_end[0],start_and_end[1])) for start_and_end in thread_ranges]

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

with open("logging.log", "a") as f:
    for line in logs:
        f.write(line)
Ibrahim Dar
źródło
2

Zdarzyło mi się mieć to samo pytanie wiele lat temu. Nigdy nie jestem zadowolony z odpowiedzi opartych na pythonie, które są dość powolne lub zbyt skomplikowane. Po przejściu na inne dojrzałe narzędzia prędkość jest duża i nigdy nie wracam.

Ostatnio wykonuję takie kroki, aby przyspieszyć proces w następujący sposób.

  1. wygeneruj kilka adresów URL w txt
  2. użyj, aria2c -x16 -d ~/Downloads -i /path/to/urls.txtaby pobrać te pliki
  3. parsuj lokalnie

To najszybszy jak do tej pory proces.

Jeśli chodzi o skrobanie stron internetowych, pobieram nawet niezbędne * .html, zamiast odwiedzać stronę raz na raz, co w rzeczywistości nie ma znaczenia. Kiedy trafisz odwiedzenia strony, za pomocą narzędzi takich jak Pythona requestslub scrapyczy urllib, to nadal cache i pobrać całą zawartość internetową dla Ciebie.

anonimowy
źródło
1

Najpierw utwórz listę wszystkich linków, ponieważ wszystkie są takie same, po prostu zmień iterację.

list_of_links=[]
for i in range(1,1417749):
    list_of_links.append("https:/xx.xxx.xxx/{}.json".format(str(i)))

t_no=2
for i in range(0, len(list_of_links), t_no):
    all_t = []
    twenty_links = list_of_links[i:i + t_no]
    for link in twenty_links:
        obj_new = Demo(link,)
        t = threading.Thread(target=obj_new.get_json)
        t.start()
        all_t.append(t)
    for t in all_t:
        t.join()

class Demo:
    def __init__(self, url):
        self.json_url = url

def get_json(self):
    try:
       your logic
    except Exception as e:
       print(e)

Po prostu zwiększając lub zmniejszając t_no możesz zmienić liczbę wątków ..

mobin alhassan
źródło