Pobieranie danych wyjściowych w czasie rzeczywistym przy użyciu podprocesu

138

Próbuję napisać skrypt opakowujący dla programu wiersza poleceń (weryfikacja svnadmin), który będzie wyświetlał ładny wskaźnik postępu operacji. Wymaga to ode mnie możliwości zobaczenia każdego wiersza wyjścia z opakowanego programu, gdy tylko zostanie on wyprowadzony.

Pomyślałem, że po prostu uruchomię program za pomocą subprocess.Popen, użyję stdout=PIPE, a następnie przeczytam każdą linię, gdy się pojawi i odpowiednio z nią postąpię. Jednak gdy uruchomiłem następujący kod, dane wyjściowe wydawały się być gdzieś buforowane, powodując, że pojawiały się w dwóch fragmentach, wierszach od 1 do 332, a następnie od 333 do 439 (ostatnia linia danych wyjściowych)

from subprocess import Popen, PIPE, STDOUT

p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE, 
        stderr = STDOUT, shell = True)
for line in p.stdout:
    print line.replace('\n', '')

Po przyjrzeniu się nieco dokumentacji podprocesu odkryłem, że bufsizeparametr to Popen, więc próbowałem ustawić rozmiar bufora na 1 (bufor każdej linii) i 0 (brak bufora), ale żadna z wartości nie wydawała się zmieniać sposobu dostarczania linii.

W tym momencie zacząłem sięgać po słomki, więc napisałem następującą pętlę wyjściową:

while True:
    try:
        print p.stdout.next().replace('\n', '')
    except StopIteration:
        break

ale uzyskał ten sam wynik.

Czy możliwe jest uzyskanie danych wyjściowych programu w czasie rzeczywistym programu wykonanego przy użyciu podprocesu? Czy jest jakaś inna opcja w Pythonie, która jest kompatybilna z przyszłością (nie exec*)?

Chris Lieb
źródło
1
Czy próbowałeś pominąć, sydout=PIPEwięc podproces zapisuje bezpośrednio na konsoli, pomijając proces nadrzędny?
S.Lott
5
Chodzi o to, że chcę przeczytać wynik. Jeśli jest wysyłany bezpośrednio do konsoli, jak mogę to zrobić? Ponadto nie chcę, aby użytkownik widział dane wyjściowe z opakowanego programu, tylko moje dane wyjściowe.
Chris Lieb
Dlaczego więc wyświetlanie „w czasie rzeczywistym”? Nie rozumiem przypadku użycia.
S.Lott
8
Nie używaj powłoki = True. Niepotrzebnie przywołuje twoją powłokę. Zamiast tego użyj p = Popen (['svnadmin', 'verify', '/ var / svn / repos / config'], stdout = PIPE, stderr = STDOUT)
nosklo
2
@ S.Lott Zasadniczo svnadmin verify drukuje wiersz danych wyjściowych dla każdej zweryfikowanej wersji. Chciałem stworzyć ładny wskaźnik postępu, który nie powodowałby nadmiernej ilości produkcji. Coś jak wget, na przykład
Chris Lieb

Odpowiedzi:

82

Próbowałem tego iz jakiegoś powodu podczas kodu

for line in p.stdout:
  ...

bufory agresywnie, wariant

while True:
  line = p.stdout.readline()
  if not line: break
  ...

nie. Najwyraźniej jest to znany błąd: http://bugs.python.org/issue3907 (problem jest teraz „zamknięty” od 29 sierpnia 2018 r.)

Dave
źródło
To nie jedyny bałagan w starych implementacjach IO Pythona. Dlatego właśnie Py2.6 i Py3k otrzymały całkowicie nową bibliotekę IO.
Tim Lin
3
Ten kod ulegnie awarii, jeśli podproces zwróci pusty wiersz. Lepszym rozwiązaniem byłoby użycie while p.poll() is Nonezamiast while Truei usunięcieif not line
exhuma
7
@exhuma: działa dobrze. readline zwraca „\ n” w pustym wierszu, co nie jest uznawane za prawdę. zwraca pusty ciąg tylko wtedy, gdy potok się zamyka, co nastąpi, gdy podproces się zakończy.
Alice Purcell,
1
@Dave Dla przyszłego ref: wypisz linie utf-8 w py2 + z print(line.decode('utf-8').rstrip()).
Jonathan Komar
3
Również, aby odczytać dane wyjściowe procesu w czasie rzeczywistym, będziesz musiał powiedzieć Pythonowi, że NIE chcesz buforować. Drogi Pythonie, po prostu daj mi bezpośrednio dane wyjściowe. A oto jak: Musisz ustawić zmienną środowiskową PYTHONUNBUFFERED=1. Jest to szczególnie przydatne w przypadku nieskończonych wyników
George Pligoropoulos
39
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
    print line,
p.stdout.close()
p.wait()
Corey Goldberg
źródło
1
@nbro prawdopodobnie dlatego, że p.stdout.close()jest niejasne.
anatoly techtonik
2
@nbro prawdopodobnie dlatego, że kod został podany bez wyjaśnienia ...: /
Aaron Hall
4
O czym jest ten b ''?
ManuelSchneid3r
@ ManuelSchneid3r iter(<callable>, <string>)tworzy iterowalne przy użyciu każdego wyjścia <callable>, dopóki nie zwróci <string> (wywoływane sentinel). Jeśli spróbujesz uruchomić p.stdout.readlinewiele razy, zobaczysz, że gdy nie ma nic innego do wydrukowania, drukuje b'', a zatem jest to właściwy wskaźnik do użycia w tym przypadku.
Mydło
30

Dane wyjściowe podprocesu można skierować bezpośrednio do strumieni. Uproszczony przykład:

subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)
Aidan Feldman
źródło
Czy to pozwala uzyskać zawartość również po fakcie .communicate()? A może zawartość została utracona na rzecz nadrzędnych strumieni stderr / stdout?
theferrit32
Nie, nie ma communicate()metody na zwrócone CompletedProcess. Ponadto capture_outputwyklucza się wzajemnie z stdouti stderr.
Aidan Feldman
To nie jest „w czasie rzeczywistym”, o co chodzi w tym pytaniu. To czeka, aż lszakończy działanie i nie daje dostępu do jego danych wyjściowych. (Ponadto argumenty słów kluczowych stdouti stderrsą zbędne - po prostu jawnie
określasz
20

Możesz spróbować tego:

import subprocess
import sys

process = subprocess.Popen(
    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

while True:
    out = process.stdout.read(1)
    if out == '' and process.poll() != None:
        break
    if out != '':
        sys.stdout.write(out)
        sys.stdout.flush()

Jeśli użyjesz readline zamiast read, w niektórych przypadkach komunikat wejściowy nie zostanie wydrukowany. Wypróbuj to za pomocą polecenia, które wymaga wejścia inline i przekonaj się sam.

Nadia Alramli
źródło
Tak, użycie readline () zatrzyma drukowanie (nawet przy wywołaniu sys.stdout.flush ())
Mark Ma
3
Czy to ma wisieć w nieskończoność? Chciałbym, aby dane rozwiązanie zawierało również standardowy kod do edycji pętli po zakończeniu początkowego podprocesu. Przepraszam, nieważne, ile razy się tym zajmuję, podproces itp. Jest czymś, czego po prostu nie mogę dostać do pracy.
ThorSummoner
1
Po co testować pod kątem „”, skoro w Pythonie możemy po prostu użyć, jeśli nie out?
Greg Bell
2
jest to najlepsze rozwiązanie w przypadku długotrwałych prac. ale powinno używać wartości nie Brak i nie! = Brak. Nie powinieneś używać! = Z None.
Cari
Czy stderr jest również wyświetlany przez to?
Pieter Vogelaar
7

Żywo podproces stdin i stdout z asyncio w Pythonie blogu przez Kevin McCarthy pokazuje jak zrobić to z asyncio:

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def run(command):
    process = await create_subprocess_exec(
        *command, stdout=PIPE, stderr=PIPE
    )

    await asyncio.wait(
        [
            _read_stream(
                process.stdout,
                lambda x: print(
                    "STDOUT: {}".format(x.decode("UTF8"))
                ),
            ),
            _read_stream(
                process.stderr,
                lambda x: print(
                    "STDERR: {}".format(x.decode("UTF8"))
                ),
            ),
        ]
    )

    await process.wait()


async def main():
    await run("docker build -t my-docker-image:latest .")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Pablo
źródło
Cześć @Jeef, czy możesz wskazać poprawkę, abym mógł zaktualizować odpowiedź?
Pablo
1
Cześć, to zadziałało, ale musiałem dodać następujące, aby pozbyć się niektórych komunikatów o błędach: import nest_asyncio; nest_asyncio.apply()i użyć polecenia powłoki, tj. process = await create_subprocess_shell(*command, stdout=PIPE, stderr=PIPE, shell=True)Zamiast process = await create_subprocess_exec(...). Twoje zdrowie!
user319436
5

W Pythonie 3.x proces może się zawiesić, ponieważ dane wyjściowe są tablicą bajtów zamiast ciągu. Upewnij się, że dekodujesz go na ciąg.

Począwszy od Pythona 3.6 można to zrobić za pomocą parametru encodingw Popen Constructor . Kompletny przykład:

process = subprocess.Popen(
    'my_command',
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    shell=True,
    encoding='utf-8',
    errors='replace'
)

while True:
    realtime_output = process.stdout.readline()

    if realtime_output == '' and process.poll() is not None:
        break

    if realtime_output:
        print(realtime_output.strip(), flush=True)

Uwaga, że ten kod przekierowuje stderr do stdouti obsługuje błędy wyjściowych .

pavelnazimok
źródło
1
Tylko ta odpowiedź działa!
Роман Сергеевич
4

Rozwiązany problem z danymi wyjściowymi w czasie rzeczywistym: napotkałem podobny problem w Pythonie podczas przechwytywania danych wyjściowych w czasie rzeczywistym z programu C. Dodałem fflush(stdout);w moim kodzie C. U mnie to zadziałało. Oto kod.

Program C:

#include <stdio.h>
void main()
{
    int count = 1;
    while (1)
    {
        printf(" Count  %d\n", count++);
        fflush(stdout);
        sleep(1);
    }
}

Program w Pythonie:

#!/usr/bin/python

import os, sys
import subprocess


procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

while procExe.poll() is None:
    line = procExe.stdout.readline()
    print("Print:" + line)

Wynik:

Print: Count  1
Print: Count  2
Print: Count  3
sairam
źródło
1
To była jedyna rzecz, która faktycznie pomogła. Użyłem tego samego kodu ( flush(stdout)) w C ++. Dzięki!
Gerhard Hagerer
Miałem ten sam problem ze skryptem w języku Python, który wywoływał inny skrypt w języku Python jako podproces. W przypadku wydruków podprocesu „flush” było konieczne (print („hello”, flush = True) w Pythonie 3). Ponadto, wiele przykładów wciąż jest (2020) python 2, to jest python 3, więc +1
smajtkst
3

Jakiś czas temu napotkałem ten sam problem. Moim rozwiązaniem było porzucenie iteracji dla readmetody, która wróci natychmiast, nawet jeśli podproces nie zostanie zakończony itp.

Eli Courtwright
źródło
3

W zależności od przypadku użycia możesz również chcieć wyłączyć buforowanie w samym podprocesie.

Jeśli podproces będzie procesem w Pythonie, możesz to zrobić przed wywołaniem:

os.environ["PYTHONUNBUFFERED"] = "1"

Lub alternatywnie przekaż to w envargumencie doPopen .

W przeciwnym razie, jeśli korzystasz z systemu Linux / Unix, możesz użyć stdbufnarzędzia. Np. Jak:

cmd = ["stdbuf", "-oL"] + cmd

Zobacz także tutaj ostdbuf lub inne opcje.

(Zobacz także tutaj, aby uzyskać tę samą odpowiedź).

Albert
źródło
2

Użyłem tego rozwiązania, aby uzyskać dane wyjściowe w czasie rzeczywistym w podprocesie. Ta pętla zostanie zatrzymana, gdy tylko proces się zakończy, pomijając potrzebę instrukcji break lub możliwej nieskończonej pętli.

sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

while sub_process.poll() is None:
    out = sub_process.stdout.read(1)
    sys.stdout.write(out)
    sys.stdout.flush()
Jason Hedlund
źródło
5
czy to możliwe, że zakończy to pętlę bez pustego bufora stdout?
jayjay
Dużo szukałem odpowiedniej odpowiedzi, która nie zawiesiła się po zakończeniu! Znalazłem to jako rozwiązanie, dodając if out=='': breakpoout = sub_process...
Sos
2

Znalazłem tę funkcję „plug-and-play” tutaj . Działał jak urok!

import subprocess

def myrun(cmd):
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)
Deena
źródło
1
Dodanie stderr=subprocess.STDOUTfaktycznie bardzo pomaga w przechwytywaniu danych strumieniowych. Głosuję za tym.
khan
1
Wydaje się, że główna wołowina pochodzi z zaakceptowanej odpowiedzi
tripleee
2

Możesz użyć iteratora dla każdego bajtu na wyjściu podprocesu. Pozwala to na aktualizację inline (linie kończące się '\ r' nadpisują poprzednią linię wyjściową) z podprocesu:

from subprocess import PIPE, Popen

command = ["my_command", "-my_arg"]

# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)


# read each byte of subprocess
while subprocess.poll() is None:
    for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
        c = c.decode('ascii')
        sys.stdout.write(c)
sys.stdout.flush()

if subprocess.returncode != 0:
    raise Exception("The subprocess did not terminate correctly.")
rhyno183
źródło
1

Kompletne rozwiązanie:

import contextlib
import subprocess

# Unix, Windows and old Macintosh end-of-line
newlines = ['\n', '\r\n', '\r']
def unbuffered(proc, stream='stdout'):
    stream = getattr(proc, stream)
    with contextlib.closing(stream):
        while True:
            out = []
            last = stream.read(1)
            # Don't loop forever
            if last == '' and proc.poll() is not None:
                break
            while last not in newlines:
                # Don't loop forever
                if last == '' and proc.poll() is not None:
                    break
                out.append(last)
                last = stream.read(1)
            out = ''.join(out)
            yield out

def example():
    cmd = ['ls', '-l', '/']
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        # Make all end-of-lines '\n'
        universal_newlines=True,
    )
    for line in unbuffered(proc):
        print line

example()
Andres Restrepo
źródło
1
Ponieważ korzystasz universal_newlines=Truez Popen()połączenia, prawdopodobnie nie musisz też samodzielnie obsługiwać ich - o to chodzi w tej opcji.
martineau
1
wydaje się to niepotrzebnie skomplikowane. Nie rozwiązuje problemów z buforowaniem. Zobacz linki w mojej odpowiedzi .
jfs
Tylko w ten sposób mogę uzyskać dane wyjściowe postępu rsync w czasie rzeczywistym (- outbuf = L)! dzięki
Mohammadhzp
1

To jest podstawowy szkielet, którego zawsze używam do tego. Ułatwia wdrażanie limitów czasu i jest w stanie poradzić sobie z nieuniknionymi zawieszonymi procesami.

import subprocess
import threading
import Queue

def t_read_stdout(process, queue):
    """Read from stdout"""

    for output in iter(process.stdout.readline, b''):
        queue.put(output)

    return

process = subprocess.Popen(['dir'],
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT,
                           bufsize=1,
                           cwd='C:\\',
                           shell=True)

queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()

while process.poll() is None or not queue.empty():
    try:
        output = queue.get(timeout=.5)

    except Queue.Empty:
        continue

    if not output:
        continue

    print(output),

t_stdout.join()
Badslacks
źródło
1

Użycie pexpect z nieblokującymi readlines rozwiąże ten problem. Wynika to z faktu, że potoki są buforowane, a więc wyjście aplikacji jest buforowane przez potok, dlatego nie możesz dostać się do tego wyjścia, dopóki bufor się nie zapełni lub proces nie umrze.

Gabe
źródło
0

(To rozwiązanie zostało przetestowane w Pythonie 2.7.15)
Wystarczy sys.stdout.flush () po każdym czytaniu / zapisie linii:

while proc.poll() is None:
    line = proc.stdout.readline()
    sys.stdout.write(line)
    # or print(line.strip()), you still need to force the flush.
    sys.stdout.flush()
dan
źródło
0

Kilka odpowiedzi sugerujących python 3.x lub pthon 2.x, poniższy kod będzie działał w obu przypadkach.

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
    stdout = []
    while True:
        line = p.stdout.readline()
        if not isinstance(line, (str)):
            line = line.decode('utf-8')
        stdout.append(line)
        print (line)
        if (line == '' and p.poll() != None):
            break
Djai
źródło
0

jeśli chcesz tylko przekazać dziennik do konsoli w czasie rzeczywistym

Poniższy kod będzie działał dla obu

 p = subprocess.Popen(cmd,
                         shell=True,
                         cwd=work_dir,
                         bufsize=1,
                         stdin=subprocess.PIPE,
                         stderr=sys.stderr,
                         stdout=sys.stdout)
zegar
źródło