Jak mogę uruchomić zewnętrzne polecenie asynchronicznie z poziomu języka Python?

120

Muszę asynchronicznie uruchomić polecenie powłoki ze skryptu w języku Python. Rozumiem przez to, że chcę, aby mój skrypt Pythona działał dalej, podczas gdy zewnętrzne polecenie wyłącza się i robi wszystko, co musi.

Przeczytałem ten post:

Wywołanie polecenia zewnętrznego w Pythonie

Potem wyszedłem i zrobiłem kilka testów i wygląda na to, że os.system()wykona pracę, pod warunkiem, że użyję &na końcu polecenia, aby nie musieć czekać na jego powrót. Zastanawiam się, czy jest to właściwy sposób osiągnięcia czegoś takiego? Próbowałem commands.call()ale u mnie to nie zadziała bo blokuje się na zewnętrzne polecenie.

Daj mi znać, jeśli użycie os.system()w tym celu jest wskazane lub jeśli powinienem spróbować innej trasy.

Społeczność
źródło

Odpowiedzi:

135

subprocess.Popen robi dokładnie to, co chcesz.

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(Edytuj, aby uzupełnić odpowiedź z komentarzy)

Instancja Popen może robić różne inne rzeczy, na przykład poll()sprawdzać, czy nadal działa, a communicate()także wysyłać jej dane na stdin i czekać, aż się zakończy.

Ali Afshar
źródło
4
Możesz także użyć funkcji poll (), aby sprawdzić, czy proces potomny się zakończyła, lub funkcji wait (), aby zaczekać na jego zakończenie.
Adam Rosenfield,
Adam, bardzo prawda, chociaż lepiej byłoby użyć komunikacji () do czekania, ponieważ ma to lepszą obsługę buforów wejścia / wyjścia i są sytuacje, w których ich zalanie może blokować.
Ali Afshar
Adam: dokumentacja mówi: „Ostrzeżenie To zakleszczy się, jeśli proces potomny wygeneruje wystarczającą ilość danych wyjściowych do potoku stdout lub stderr, tak że blokuje oczekiwanie na przyjęcie większej ilości danych przez bufor potoku systemu operacyjnego. Aby tego uniknąć, użyj funkcji komunikacja ().”
Ali Afshar
14
Jednak komunikacja () i wait () są operacjami blokującymi. Nie będziesz zrównoleglać poleceń, tak jak wydaje się, że OP pyta, czy ich używasz.
cdleary
1
Cdleary jest absolutnie poprawne, należy wspomnieć, że komunikuj się i czekaj, blokuj, więc rób to tylko wtedy, gdy czekasz, aż coś się wyłączy. (Co naprawdę powinieneś zrobić, aby być grzecznym)
Ali Afshar
48

Jeśli chcesz uruchomić wiele procesów równolegle, a następnie obsłużyć je, gdy przyniosą wyniki, możesz użyć sondowania, jak poniżej:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

Przepływ sterowania jest trochę zawikłany, ponieważ staram się go zmniejszyć - możesz dostosować go do swojego gustu. :-)

Ma to tę zaletę, że najpierw obsługuje żądania wczesnego kończenia. Jeśli wywołasz communicatepierwszy działający proces i okaże się, że działa on najdłużej, pozostałe uruchomione procesy będą bezczynne, podczas gdy mógłbyś obsługiwać ich wyniki.

cdleary
źródło
3
@Tino Zależy to od tego, jak zdefiniujesz zajęte oczekiwanie. Zobacz Jaka jest różnica między oczekiwaniem zajętości a odpytywaniem?
Piotr Dobrogost
1
Czy istnieje sposób na odpytywanie zestawu procesów, a nie tylko jednego?
Piotr Dobrogost
1
uwaga: może się zawiesić, jeśli proces wygeneruje wystarczającą ilość danych wyjściowych. Powinieneś zużywać stdout jednocześnie, jeśli używasz PIPE (jest (za dużo, ale za mało) ostrzeżeń na ten temat w dokumentacji podprocesu).
jfs
@PiotrDobrogost: możesz użyć os.waitpidbezpośrednio, który pozwala sprawdzić, czy jakiś proces potomny zmienił swój status.
jfs
5
użyj ['/usr/bin/my_cmd', '-i', path]zamiast['/usr/bin/my_cmd', '-i %s' % path]
jfs
11

Zastanawiam się, czy to [os.system ()] jest właściwym sposobem osiągnięcia czegoś takiego?

Nie, os.system()to nie jest właściwy sposób. Dlatego wszyscy mówią, aby używać subprocess.

Aby uzyskać więcej informacji, przeczytaj http://docs.python.org/library/os.html#os.system

Moduł podprocesów zapewnia bardziej zaawansowane narzędzia do tworzenia nowych procesów i pobierania ich wyników; używanie tego modułu jest lepsze niż korzystanie z tej funkcji. Użyj modułu podprocesu. Sprawdź zwłaszcza sekcję Zastępowanie starszych funkcji sekcją modułu podprocesu.

S.Lott
źródło
8

Odniosłem dobry sukces z modułem asyncproc , który ładnie radzi sobie z danymi wyjściowymi z procesów. Na przykład:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out
Noe
źródło
czy to jest gdzieś na githubie?
Nick
To licencja gpl, więc jestem pewien, że jest tam wiele razy. Oto jeden: github.com/albertz/helpers/blob/master/asyncproc.py
Noah
Dodałem sedno z pewnymi modyfikacjami, aby działał z python3. (przeważnie zastępuje str bajtami). Zobacz gist.github.com/grandemk/cbc528719e46b5a0ffbd07e3054aab83
Tic
1
Ponadto po wyjściu z pętli należy jeszcze raz odczytać wyjście, w przeciwnym razie część wyjścia zostanie utracona.
Tic
7

Użycie pexpect z nieblokującymi readlines to kolejny sposób na zrobienie tego. Pexpect rozwiązuje problemy z zakleszczeniem, umożliwia łatwe uruchamianie procesów w tle i zapewnia łatwe sposoby wywoływania zwrotów, gdy proces wypluwa predefiniowane ciągi i ogólnie ułatwia interakcję z procesem.

Gabe
źródło
4

Biorąc pod uwagę „Nie muszę czekać, aż wróci”, jednym z najłatwiejszych rozwiązań będzie:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

Ale ... Z tego, co przeczytałem, nie jest to „właściwy sposób osiągnięcia czegoś takiego” z powodu zagrożeń bezpieczeństwa, jakie stwarza subprocess.CREATE_NEW_CONSOLE flagę.

Najważniejsze, co się tutaj dzieje, to użycie programu subprocess.CREATE_NEW_CONSOLEdo tworzenia nowej konsoli i .pid(zwraca identyfikator procesu, abyś mógł później sprawdzić program, jeśli chcesz), aby nie czekać, aż program zakończy swoje zadanie.

Pugsley
źródło
3

Mam ten sam problem podczas próby połączenia się z terminalem 3270 za pomocą oprogramowania skryptowego s3270 w języku Python. Teraz rozwiązuję problem z podklasą Process, którą znalazłem tutaj:

http://code.activestate.com/recipes/440554/

A oto próbka pobrana z pliku:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()
Patrizio Rullo
źródło
3

Przyjęta odpowiedź jest bardzo stara.

Znalazłem tutaj lepszą nowoczesną odpowiedź:

https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

i wprowadził kilka zmian:

  1. spraw, aby działał w systemie Windows
  2. spraw, aby działał z wieloma poleceniami
import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
    )

    print(execute(*runners(cmds)))

Jest mało prawdopodobne, aby przykładowe polecenia działały idealnie w twoim systemie i nie obsługują dziwnych błędów, ale ten kod demonstruje jeden sposób uruchamiania wielu podprocesów przy użyciu asyncio i przesyłania strumieniowego danych wyjściowych.

Terrel Shumway
źródło
Przetestowałem to na cpythonie 3.7.4 działającym pod Windows i cpythonie 3.7.3 działającym na Ubuntu WSL i natywnym Alpine Linux
Terrel Shumway,
1

Jest tutaj kilka odpowiedzi, ale żadna z nich nie spełniła moich poniższych wymagań:

  1. Nie chcę czekać na zakończenie polecenia ani zaśmiecać mojego terminala wyjściami podprocesu.

  2. Chcę uruchomić skrypt bash z przekierowaniami.

  3. Chcę obsługiwać potokowanie w moim skrypcie bash (na przykład find ... | tar ...).

Jedyną kombinacją, która spełnia powyższe wymagania, jest:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)
Shital Shah
źródło
0

Jest to omówione w przykładach podprocesów w Pythonie 3 w sekcji „Czekaj, aż polecenie zakończy asynchronicznie”:

import asyncio

proc = await asyncio.create_subprocess_exec(
    'ls','-lha',
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE)

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

Proces rozpocznie się natychmiast po await asyncio.create_subprocess_exec(...)zakończeniu. Jeśli nie zakończy się do czasu, gdy zadzwonisz await proc.communicate(), będzie tam czekał, aby przekazać ci status wyjścia. Jeśli się skończy, proc.communicate()wróci natychmiast.

Istota tutaj jest podobna do odpowiedzi Terrelsa ale myślę, że odpowiedź Terrelsa wydaje się nadmiernie skomplikować.

Zobacz, asyncio.create_subprocess_execaby uzyskać więcej informacji.

gerrit
źródło