Nieblokujący odczyt w podprocesie.PIPE w pythonie

506

Korzystam z modułu podprocesu, aby uruchomić podproces i połączyć się z jego strumieniem wyjściowym (standardowym wyjściem). Chcę mieć możliwość wykonywania nieblokujących odczytów na swoim standardowym wyjściu. Czy istnieje sposób, aby blokować .readline lub sprawdzić, czy w strumieniu znajdują się dane przed wywołaniem .readline? Chciałbym, żeby to było przenośne lub przynajmniej działało pod Windows i Linux.

oto jak teraz to robię (blokuje, .readlinejeśli żadne dane nie są dostępne):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Mathieu Pagé
źródło
14
(Pochodzi z Google?) Wszystkie PIPE zablokują się, gdy jeden z buforów PIPE zostanie zapełniony i nie będzie czytany. np. zakleszczenie standardowego wejścia, gdy stderr jest wypełniony. Nigdy nie przechodź RURY, której nie zamierzasz czytać.
Nasser Al-Wohaibi
@ NasserAl-Wohaibi czy to oznacza, że ​​lepiej jest zawsze tworzyć pliki?
Charlie Parker
Ciekawe, dlaczego to blokuje ... Pytam, bo widziałem komentarz:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
Charlie Parker
Jest „w fazie projektowania” i oczekuje na dane wejściowe.
Mathieu Pagé
powiązane: stackoverflow.com/q/19880190/240515
user240515

Odpowiedzi:

403

fcntl, select, asyncprocNie pomoże w tej sprawie.

Niezawodny sposób na odczytanie strumienia bez blokowania niezależnie od systemu operacyjnego to użycie Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
jfs
źródło
6
Tak, to działa dla mnie, ale wiele usunąłem. Obejmuje dobre praktyki, ale nie zawsze konieczne. Kompatybilny z Python 3.x 2.X i close_fds może być pominięty, nadal będzie działał. Pamiętaj jednak o tym, co robi wszystko i nie kopiuj go na ślepo, nawet jeśli to po prostu działa! (Właściwie najprostszym rozwiązaniem jest użycie wątku i wykonanie readline tak jak Seb, Qeues to po prostu łatwy sposób na uzyskanie danych, są inne, wątki są odpowiedzią!)
Aki
3
Wewnątrz wątku wezwanie do out.readlinezablokowania wątku i głównego wątku, i muszę czekać, aż readline powróci, zanim wszystko inne będzie kontynuowane. Jest jakiś prosty sposób na obejście tego? (Czytam wiele wierszy z mojego procesu, który jest również innym plikiem .py, który robi DB i inne rzeczy)
Justin
3
@Justin: „out.readline” nie blokuje głównego wątku, jest on wykonywany w innym wątku.
jfs
4
co jeśli nie uda mi się zamknąć podprocesu, np. z powodu wyjątków? wątek czytnika stdout nie umrze, a python się zawiesi, nawet jeśli główny wątek zostanie zakończony, prawda? jak można to obejść? Python 2.x nie obsługuje zabijania wątków, co gorsza, nie obsługuje ich przerywania. :( (oczywiście należy poradzić sobie z wyjątkami, aby upewnić się, że podproces jest zamknięty, ale na wszelki wypadek, co można zrobić?)
n611x007
3
Stworzyłem kilka przyjaznych opakowań tego w pakiecie shelljob pypi.python.org/pypi/shelljob
edA-qa mort-ora-y
77

Często miałem podobny problem; Programy w języku Python, które piszę często muszą mieć możliwość wykonywania podstawowych funkcji, jednocześnie akceptując dane wejściowe użytkownika z wiersza poleceń (standardowe wejście). Po prostu umieszczenie funkcji obsługi danych wejściowych przez użytkownika w innym wątku nie rozwiązuje problemu, ponieważ readline()blokuje i nie ma limitu czasu. Jeśli podstawowa funkcjonalność jest kompletna i nie trzeba już czekać na dalsze dane wejściowe użytkownika, zwykle chcę, aby mój program zakończył działanie, ale nie może, ponieważ readline()nadal blokuje się w drugim wątku, czekając na linię. Rozwiązaniem, które znalazłem dla tego problemu, jest uczynienie stdin plikiem nieblokującym za pomocą modułu fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

Moim zdaniem jest to nieco czystsze niż użycie modułów select lub signal do rozwiązania tego problemu, ale znowu działa tylko w systemie UNIX ...

Jesse
źródło
1
Zgodnie z dokumentacją fcntl () może otrzymać deskryptor pliku lub obiekt, który ma metodę .fileno ().
Denilson Sá Maia
10
Odpowiedź Jessego jest nieprawidłowa. Według Guido readline nie działa poprawnie w trybie nieblokującym i nie będzie wcześniej niż Python 3000. bugs.python.org/issue1175#msg56041 Jeśli chcesz użyć fcntl, aby ustawić plik w tryb nieblokujący, musisz użyć os.read () niższego poziomu i samodzielnie oddzielić wiersze. Mieszanie fcntl z wywołaniami wysokiego poziomu wykonującymi buforowanie linii wymaga kłopotów.
anonnn
2
Używanie readline wydaje się nieprawidłowe w Pythonie 2. Zobacz odpowiedź anonnn stackoverflow.com/questions/375427/…
Catalin Iacob
10
Proszę nie używać zajętych pętli. Użyj poll () z limitem czasu, aby poczekać na dane.
Ivo Danihelka
@Stefano co to jest buffer_sizezdefiniowane?
cat
39

Python 3.4 wprowadza nowe tymczasowe API dla asynchronicznego modułu IOasyncio .

Podejście jest podobne do twistedopartej na odpowiedzi przez @Bryan Ward - zdefiniuj protokół, a jego metody będą wywoływane, gdy tylko dane będą gotowe:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Zobacz „Podproces” w dokumentacji .

Istnieje interfejs wysokiego poziomu, asyncio.create_subprocess_exec()który zwraca Processobiekty które pozwalają na asynchroniczny odczyt linii za pomocą StreamReader.readline()coroutine (ze składnią async/ awaitPython 3.5+ ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() wykonuje następujące zadania:

  • uruchom podproces, przekieruj swoje standardowe wyjście na potok
  • asynchronicznie odczytuje wiersz ze standardowego podprocesu
  • zabij podproces
  • poczekaj na zakończenie

W razie potrzeby każdy krok może być ograniczony limitem sekund.

jfs
źródło
Kiedy próbuję czegoś takiego przy użyciu coroutines w Pythonie 3.4, otrzymuję dane wyjściowe dopiero po uruchomieniu całego skryptu. Chciałbym zobaczyć wydrukowany wiersz wyniku, gdy tylko podproces wydrukuje wiersz. Oto, co mam: pastebin.com/qPssFGep .
flutefreak7
1
@ flutefreak7: problemy z buforowaniem nie są związane z bieżącym pytaniem. Kliknij link, aby znaleźć możliwe rozwiązania.
jfs
dzięki! Rozwiązałem problem z moim skryptem, po prostu używając, print(text, flush=True)aby wydrukowany tekst był natychmiast dostępny dla wywołującego obserwatora readline. Kiedy przetestowałem go z plikiem wykonywalnym opartym na Fortranie, tak naprawdę chcę go zawijać / oglądać, nie buforuje on swoich danych wyjściowych, więc zachowuje się zgodnie z oczekiwaniami.
flutefreak7
Czy można zezwolić podprocesowi na zachowanie i wykonanie dalszych operacji odczytu / zapisu. readline_and_kill, w drugim skrypcie działa bardzo podobnie subprocess.comunicate, ponieważ kończy proces po jednej operacji odczytu / zapisu. Widzę również, że używasz pojedynczego potoku stdout, który podproces obsługuje jako nieblokujący. Próbuję użyć obu stdouti stderr okazuje się, że blokuję .
Carel
@Carel kod w odpowiedzi działa zgodnie z przeznaczeniem, jak opisano wyraźnie w odpowiedzi. W razie potrzeby można zastosować inne zachowanie. Obie potoki są jednakowo nieblokujące, jeśli są używane, oto przykład, jak czytać z obu potoków jednocześnie .
jfs
19

Wypróbuj moduł asyncproc . Na przykład:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Moduł zajmuje się wszystkimi wątkami, jak sugeruje S.Lott.

Noah
źródło
1
Absolutnie genialny. Znacznie łatwiejsze niż nieprzetworzony moduł podprocesu. Działa idealnie dla mnie na Ubuntu.
Cerin,
12
asyncproc nie działa w systemie Windows, a system Windows nie obsługuje systemu OS.WNOHANG :-(
Bryan Oakley
26
asyncproc jest GPL, co dodatkowo ogranicza jego użycie :-(
Bryan Oakley
Dzięki. Jedna drobna rzecz: wydaje się, że zastąpienie tabulatorów 8 spacjami w asyncproc.py jest właściwym rozwiązaniem :)
benjaoming,
Nie wygląda na to, że można uzyskać kod powrotu procesu uruchomionego przez moduł asyncproc; tylko wynik, który wygenerował.
grayaii
17

Możesz to zrobić naprawdę łatwo w Twisted . W zależności od istniejącej bazy kodu może nie być to łatwe w użyciu, ale jeśli budujesz pokręconą aplikację, rzeczy takie jak ta stają się prawie banalne. Tworzysz ProcessProtocolklasę i zastępujesz outReceived()metodę. Skręcona (w zależności od zastosowanego reaktora) jest zwykle tylko dużą select()pętlą z zainstalowanymi wywołaniami zwrotnymi do obsługi danych z różnych deskryptorów plików (często gniazd sieciowych). Tak więc outReceived()metodą jest po prostu instalacja wywołania zwrotnego do obsługi danych pochodzących z STDOUT. Prosty przykład demonstrujący to zachowanie jest następujący:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Dokumentacja Twisted zawiera kilka dobrych informacji na ten temat.

Jeśli zbudujesz całą aplikację wokół Twisted, dzięki temu komunikacja asynchroniczna z innymi procesami, lokalnymi lub zdalnymi, będzie naprawdę elegancka. Z drugiej strony, jeśli twój program nie jest zbudowany na Twisted, to tak naprawdę nie będzie tak pomocne. Mamy nadzieję, że może to być pomocne dla innych czytelników, nawet jeśli nie dotyczy to konkretnej aplikacji.

Bryan Ward
źródło
nie dobrze. selectnie powinien działać w
systemie
2
@naxa Nie sądzę, że select()on ma na myśli to samo, co ty. Zakładam, że to Twisteddziała, ponieważ działa w
systemie
1
„Skręcona (w zależności od zastosowanego reaktora) jest zwykle tylko dużą pętlą select ()” oznacza, że ​​istnieje kilka reaktorów do wyboru. Ten select()jest najbardziej przenośny w systemach uniksowych i uniksowych, ale dla systemu Windows dostępne są również dwa reaktory: twistedmatrix.com/documents/current/core/howto/…
clacke
14

Użyj wybierz i przeczytaj (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Dla readline () - jak:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Andy Jackson
źródło
6
nie dobrze. selectnie powinien działać w
systemie
O MÓJ BOŻE. Czytaj megabajty, a może gigabajty po jednym znaku na raz ... to najgorszy pomysł, jaki widziałem od dłuższego czasu ... nie trzeba dodawać, że ten kod nie działa, ponieważ proc.stdout.read()bez względu na to, jak mały jest argument połączenie blokujące.
wvxvw
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
nmz787
8

Jednym z rozwiązań jest wykonanie innego procesu w celu odczytania procesu lub wykonanie wątku procesu z przekroczeniem limitu czasu.

Oto wątkowa wersja funkcji limitu czasu:

http://code.activestate.com/recipes/473878/

Czy jednak musisz przeczytać standardowe wejście? Innym rozwiązaniem może być zrzucenie danych wyjściowych do pliku i oczekiwanie na zakończenie procesu za pomocą p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
monkut
źródło
wygląda na to, że wątek recpie nie wyszedł po przekroczeniu limitu czasu, a zabicie go zależy od możliwości zabicia podprocesu (np. niezwiązane w tym względzie), czyta (coś, co powinieneś być w stanie zrobić, ale na wypadek, gdybyś nie mógł ...) .
n611x007
7

Oświadczenie: działa tylko w przypadku tornada

Możesz to zrobić, ustawiając fd na nieblokujący, a następnie użyj ioloop, aby zarejestrować wywołania zwrotne. Zapakowałem to w jajko o nazwie tornado_subprocess i możesz je zainstalować za pomocą PyPI:

easy_install tornado_subprocess

teraz możesz zrobić coś takiego:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

możesz także użyć go z RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Vukasin Toroman
źródło
Dzięki za miłą funkcję! Aby wyjaśnić, dlaczego nie możemy po prostu używać threading.Threaddo tworzenia nowych nieblokujących procesów? Użyłem go w on_messageinstancji websocket Tornado i dobrze się spisało.
VisioN
1
w tornado odradza się wątki. nadają się do małych, krótko działających funkcji. Możesz przeczytać o tym tutaj: stackoverflow.com/questions/7846323/tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrency
Vukasin Toroman
@ VukasinToroman naprawdę uratowałeś mnie tutaj. bardzo dziękuję za moduł tornado_subprocess :)
James Gentes
czy to działa w systemie Windows? (zauważ, że selectprzy deskryptorach plików tak nie jest )
n611x007
Ta biblioteka nie korzysta z selectpołączenia. Nie próbowałem tego w systemie Windows, ale prawdopodobnie miałbyś kłopoty, ponieważ lib używa fcntlmodułu. Krótko mówiąc: nie, to prawdopodobnie nie będzie działać w systemie Windows.
Vukasin Toroman,
6

Istniejące rozwiązania nie działały dla mnie (szczegóły poniżej). W końcu udało się zaimplementować readline przy użyciu read (1) (na podstawie tej odpowiedzi ). Ten ostatni nie blokuje:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Dlaczego istniejące rozwiązania nie działały:

  1. Rozwiązania wymagające readline (w tym oparte na kolejce) zawsze blokują. Trudno (niemożliwe?) Zabić wątek, który wykonuje readline. Zostaje zabity dopiero po zakończeniu procesu, który go stworzył, ale nie po zabiciu procesu produkcyjnego.
  2. Mieszanie niskopoziomowego fcntl z wysokopoziomowymi wywołaniami readline może nie działać poprawnie, jak zauważył anonnn.
  3. Korzystanie z select.poll () jest fajne, ale nie działa w systemie Windows zgodnie z dokumentacją Pythona.
  4. Korzystanie z bibliotek innych firm wydaje się przesadą w tym zadaniu i dodaje dodatkowe zależności.
Vikram Pudi
źródło
1
1. q.get_nowait()z mojej odpowiedzi nie wolno nigdy blokować, to jest sens jej użycia. 2. Wątek, który wykonuje readline ( enqueue_output()funkcję ), wychodzi na EOF, np. Łącznie z przypadkiem, w którym proces produkcji wyjściowej został zabity. Jeśli uważasz, że tak nie jest; podaj kompletny minimalny przykład kodu, który pokazuje inaczej (być może jako nowe pytanie ).
jfs
1
@sebastian Spędziłem godzinę lub dłużej próbując wymyślić minimalny przykład. Na koniec muszę zgodzić się, że twoja odpowiedź obejmuje wszystkie sprawy. Wydaje mi się, że nie działało to dla mnie wcześniej, ponieważ kiedy próbowałem zabić proces produkujący dane wyjściowe, został on już zabity i dał trudny do debugowania błąd. Godzina była dobrze spędzona, ponieważ wymyślając minimalny przykład, mogłem wymyślić prostsze rozwiązanie.
Vikram Pudi,
Czy możesz również opublikować prostsze rozwiązanie? :) (jeśli różni się od Sebastiana)
n611x007
@ niebezpieczeństwo89: Myślę dcmpid = myprocess.
ViFI,
W stanie po wywołaniu read () (zaraz po tym, gdy ma wartość True): out nigdy nie będzie pustym ciągiem, ponieważ odczytujesz przynajmniej ciąg / bajty o długości 1.
sergzach
6

Oto mój kod, używany do przechwytywania każdego wyjścia z podprocesu jak najszybciej, w tym wierszy częściowych. Pompuje jednocześnie, stdout i stderr w prawie prawidłowej kolejności.

Testowane i poprawnie działające na systemie Linux i Windows Python 2.7.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
datacompboy
źródło
Jedna z niewielu odpowiedzi, które pozwalają przeczytać rzeczy, które niekoniecznie kończą się nową linią.
totaam
5

Dodaję ten problem, aby przeczytać niektóre podprocesy. Otwórz stdout. Oto moje nieblokujące rozwiązanie odczytu:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Sebastien Claeys
źródło
5
fcntl nie działa w systemie Windows, zgodnie z dokumentacją .
n611x007
@ anatolytechtonik msvcrt.kbhit()zamiast tego użyj
cat
4

Ta wersja nieblokującego odczytu nie wymaga specjalnych modułów i będzie działać od razu po instalacji w większości dystrybucji Linuksa.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Tom Lime
źródło
3

Oto proste rozwiązanie oparte na wątkach, które:

  • działa zarówno w systemie Linux, jak i Windows (nie polega na select ).
  • czyta zarówno, jak stdoutistderr asynchronicznie.
  • nie polega na aktywnym odpytywaniu z dowolnym czasem oczekiwania (przyjazny procesorowi).
  • nie używa asyncio(co może powodować konflikt z innymi bibliotekami).
  • działa do momentu zakończenia procesu potomnego.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Olivier Michel
źródło
2

Dodając tę ​​odpowiedź tutaj, ponieważ zapewnia ona możliwość ustawiania nieblokujących potoków w systemach Windows i Unix.

Wszystkie ctypesszczegóły są dzięki odpowiedzi @ techtonik .

Istnieje nieco zmodyfikowana wersja do użytku zarówno w systemach Unix, jak i Windows.

  • Kompatybilny z Python3 (wymagana tylko niewielka zmiana) .
  • Obejmuje wersję posix i definiuje wyjątek do użycia w obu przypadkach.

W ten sposób możesz użyć tej samej funkcji i wyjątku dla kodu Unix i Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Aby uniknąć odczytu niekompletnych danych, ostatecznie napisałem własny generator readline (który zwraca ciąg bajtów dla każdej linii).

Jest to generator, dzięki czemu możesz na przykład ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
ideasman42
źródło
(1) ten komentarz wskazuje, że readline()nie działa z potokami nieblokującymi (takimi jak ustawianie za pomocą fcntl) w Pythonie 2 - czy uważasz, że nie jest już poprawny? (moja odpowiedź zawiera link ( fcntl), który zawiera te same informacje, ale wydaje się teraz usunięty). (2) Zobacz, jak multiprocessing.connection.PipeużywaSetNamedPipeHandleState
jfs
Testowałem to tylko na Python3. Widziałem jednak te informacje i oczekuję, że pozostaną aktualne. Napisałem również własny kod do użycia zamiast readline, zaktualizowałem swoją odpowiedź, aby go uwzględnić.
ideasman42
2

Mam problem z pierwotnym pytającym, ale nie chciałem powoływać się na wątki. Zmieszałem rozwiązanie Jesse z bezpośrednim odczytem () z potoku i moim własnym programem obsługi buforów do odczytu linii (jednak mój podproces - ping - zawsze zapisywał pełne linie <rozmiar strony systemowej). Unikam zajętego oczekiwania, czytając tylko zegarek IO zarejestrowany przez gobject. Obecnie zwykle uruchamiam kod w gobject MainLoop, aby uniknąć wątków.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

Obserwatorem jest

def watch(f, *other):
print 'reading',f.read()
return True

Główny program ustawia polecenie ping, a następnie wywołuje pętlę poczty gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Wszelkie inne prace są dołączane do wywołań zwrotnych w gobject.

Dave Kitchen
źródło
2

We współczesnym Pythonie jest znacznie lepiej.

Oto prosty program potomny „hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

I program do interakcji z nim:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

To drukuje:

b'hello bob\n'
b'hello alice\n'

Zauważ, że faktyczny wzorzec, który jest również prawie wszystkimi poprzednimi odpowiedziami, zarówno tutaj, jak i w powiązanych pytaniach, polega na ustawieniu deskryptora standardowego pliku dziecka na nieblokujący, a następnie odpytaniu go w jakiejś pętli wyboru. Obecnie ta pętla jest zapewniona przez asyncio.

użytkownik240515
źródło
1

wybierz moduł pozwala określić, gdzie obok wejścia jest przydatna.

Jednak prawie zawsze jesteś szczęśliwszy dzięki oddzielnym wątkom. Jeden robi blokowanie, czytając standardowe wejście, inny robi to, gdzie nie ma potrzeby blokowania.

S.Lott
źródło
11
Myślę, że ta odpowiedź jest nieprzydatna z dwóch powodów: (a) Moduł wyboru nie będzie działał na potokach w systemie Windows (jak podano w podanym linku), co przekreśla zamiary OP dotyczące posiadania przenośnego rozwiązania. (b) Wątki asynchroniczne nie pozwalają na synchroniczny dialog między procesem nadrzędnym i potomnym. Co jeśli proces nadrzędny chce wywołać następną akcję zgodnie z następnym wierszem odczytanym z dziecka ?!
ThomasH
4
select nie jest również użyteczny, ponieważ odczyty Pythona będą blokować nawet po zaznaczeniu, ponieważ nie ma standardowej semantyki C i nie zwróci częściowych danych.
Helmut Grohne 27.01.11
Oddzielna próba czytania z wyników dziecka rozwiązała mój problem, który był podobny do tego. Jeśli potrzebujesz interakcji synchronicznej, myślę, że nie możesz użyć tego rozwiązania (chyba że wiesz, czego się spodziewać). Przyjąłbym tę odpowiedź
Emiliano
1

po co męczyć wątek i kolejkę? w przeciwieństwie do readline (), BufferedReader.read1 () nie blokuje czekania na \ r \ n, zwraca JAK NAJSZYBCIEJ, jeśli pojawi się jakikolwiek wynik.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
mfmain
źródło
Czy zwróci jak najszybciej, jeśli nic się nie pojawi? Jeśli nie, to blokuje.
Mathieu Pagé,
@ MathieuPagé ma rację. read1zablokuje się, jeśli pierwsze leżące poniżej bloki odczytu, co dzieje się, gdy potok jest nadal otwarty, ale dane wejściowe nie są dostępne.
Jack O'Connor,
1

W moim przypadku potrzebowałem modułu rejestrującego, który przechwytuje dane wyjściowe z aplikacji w tle i rozszerza je (dodając znaczniki czasu, kolory itp.).

Skończyło się na wątku w tle, który wykonuje rzeczywiste operacje we / wy. Poniższy kod dotyczy tylko platform POSIX. Pozbyłem się nieistotnych części.

Jeśli ktoś zamierza używać tej bestii na dłuższą metę, rozważ zarządzanie otwartymi deskryptorami. W moim przypadku nie był to duży problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Dmytro
źródło
1

Mój problem jest nieco inny, ponieważ chciałem zebrać zarówno stdout, jak i stderr z uruchomionego procesu, ale ostatecznie taki sam, ponieważ chciałem renderować dane wyjściowe w widgecie po jego wygenerowaniu.

Nie chciałem uciekać się do wielu proponowanych obejść przy użyciu kolejek lub dodatkowych wątków, ponieważ nie powinny one być konieczne do wykonania tak typowego zadania, jak uruchomienie innego skryptu i zebranie jego danych wyjściowych.

Po przeczytaniu proponowanych rozwiązań i dokumentów w języku Python rozwiązałem problem z implementacją poniżej. Tak, działa tylko dla POSIX, ponieważ używam selectwywołania funkcji.

Zgadzam się, że dokumenty są mylące, a implementacja jest niezręczna w przypadku tak powszechnego zadania skryptowego. Uważam, że starsze wersje Pythona mają różne domyślne ustawienia Popeni różne wyjaśnienia, co spowodowało wiele zamieszania. Wydaje się, że działa to dobrze zarówno w Pythonie 2.7.12, jak i 3.5.2.

Kluczem było ustawienie bufsize=1buforowania linii, a następnie universal_newlines=Trueprzetwarzanie jako plik tekstowy zamiast pliku binarnego, który wydaje się być domyślny podczas ustawiania bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG i VERBOSE to po prostu makra, które wypisują dane wyjściowe na terminal.

To rozwiązanie jest skuteczne IMHO 99,99%, ponieważ nadal korzysta z readlinefunkcji blokowania , więc zakładamy, że podproces jest przyjemny i generuje pełne linie.

Czekam na opinie w celu ulepszenia rozwiązania, ponieważ wciąż jestem nowy w Pythonie.

Brooke Wallace
źródło
W tym konkretnym przypadku możesz ustawić stderr = podproces.STDOUT w konstruktorze Popen i uzyskać wszystkie dane wyjściowe z cmd.stdout.readline ().
Aaron
Niezły przykład. Miałem problem z select.select (), ale to rozwiązało to dla mnie.
maharvey67
0

Korzystając z odpowiedzi JF Sebastiana i kilku innych źródeł, stworzyłem prostego menedżera podprocesów. Zapewnia nieblokujący odczyt żądania, a także uruchamia kilka procesów równolegle. Nie używa żadnego połączenia specyficznego dla systemu operacyjnego (o czym wiem) i dlatego powinien działać wszędzie.

Jest dostępny w pypi, więc po prostu pip install shelljob. Przykłady i pełne dokumenty znajdują się na stronie projektu .

edA-qa mort-ora-y
źródło
0

EDYCJA: Ta implementacja wciąż blokuje. Zamiast tego użyj odpowiedzi JFSebastian .

Wypróbowałem najlepszą odpowiedź , ale dodatkowe ryzyko i utrzymanie kodu wątku były niepokojące.

Przeglądając moduł io (i jest ograniczony do 2.6), znalazłem BufferedReader. To jest moje bezgwintowe, nieblokujące rozwiązanie.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
romc
źródło
próbowałeś for line in iter(p.stdout.readline, ""): # do stuff with the line? Jest bezgwintowy (pojedynczy wątek) i blokuje się, gdy kod jest blokowany.
jfs,
@ jf-sebastian Tak, w końcu wróciłem do twojej odpowiedzi. Moja implementacja wciąż czasami jest blokowana. Zedytuję swoją odpowiedź, aby ostrzec innych, aby nie schodzili tą drogą.
romc
0

Niedawno natknąłem się na ten sam problem: muszę czytać jedną linię na raz ze strumienia (uruchamianie ogona w podprocesie) w trybie nieblokującym. Chciałem uniknąć kolejnych problemów: nie nagrywać procesora, nie czytać strumienia o jeden bajt ( tak jak readline) itp

Oto moja implementacja https://gist.github.com/grubberr/5501e1a9760c3eab5e0a nie obsługuje okien (ankiety), nie obsługuje EOF, ale działa dobrze dla mnie

grubberr
źródło
odpowiedź wątek oparte jest nie palić CPU (można określić arbitralne timeoutjak w roztworze) i .readline()czyta więcej niż jeden bajt na raz ( bufsize=1środki liniowych -buffered (dotyczy tylko pisemnie)). Jakie inne problemy znalazłeś? Odpowiedzi zawierające tylko łącze nie są zbyt przydatne.
jfs
0

Jest to przykład uruchomienia komendy interaktywnej w podprocesie, a standardowe wyjście jest interaktywne przy użyciu pseudo terminala. Możesz odnieść się do: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Liao
źródło
0

To rozwiązanie wykorzystuje select moduł do „odczytu dowolnych dostępnych danych” ze strumienia IO. Ta funkcja początkowo blokuje się, dopóki dane nie będą dostępne, ale następnie odczytuje tylko te dane, które są dostępne, i nie blokuje dalej.

Biorąc pod uwagę fakt, że używa on selectmodułu, działa to tylko na Uniksie.

Kod jest w pełni zgodny z PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Bradley Odell
źródło
0

Napotkałem również problem opisany przez Jesse'a i rozwiązałem go, używając „select”, tak jak robili to Bradley , Andy i inni, ale w trybie blokowania, aby uniknąć zajętej pętli. Wykorzystuje atrapę Fajka jako fałszywego standardowego wejścia. Wybierz bloki i poczekaj, aż standardowe wejście lub rura będą gotowe. Po naciśnięciu klawisza stdin odblokowuje wybór, a wartość klucza można odczytać za pomocą read (1). Gdy do gwintu zostanie zapisany inny wątek, wówczas rura odblokowuje zaznaczenie i można to uznać za wskazówkę, że zapotrzebowanie na standardowe wyjście zostało zakończone. Oto kod referencyjny:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
gonzaedu61
źródło
UWAGA: Aby ta funkcja działała w systemie Windows, rura powinna zostać zastąpiona gniazdem. Jeszcze tego nie próbowałem, ale powinno działać zgodnie z dokumentacją.
gonzaedu61
0

Wypróbuj wexpect , który jest alternatywą systemu pexpect dla systemu Windows .

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
betontalpfa
źródło
0

W systemach uniksowych i Python 3.5+ istnieje os.set_blockingdokładnie to, co mówi.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

To daje:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

Z os.set_blockingkomentarzem to:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
saaj
źródło
-2

Oto moduł obsługujący nieblokujące odczyty i zapisy w tle w pythonie:

https://pypi.python.org/pypi/python-nonblock

Zapewnia funkcję,

nonblock_read, który odczyta dane ze strumienia, jeśli jest dostępny, w przeciwnym razie zwróci pusty ciąg (lub Brak, jeśli strumień jest zamknięty po drugiej stronie i wszystkie możliwe dane zostały odczytane)

Możesz również rozważyć moduł python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

co dodaje moduł podprocesu. Tak więc do obiektu zwróconego z „subprocess.Popen” dodaje się dodatkową metodę runInBackground. To uruchamia wątek i zwraca obiekt, który zostanie automatycznie zapełniony, gdy rzeczy są zapisywane do stdout / stderr, bez blokowania głównego wątku.

Cieszyć się!

Tim Savannah
źródło
Chciałbym wypróbować ten moduł nieblokujący , ale jestem stosunkowo nowy w niektórych procedurach Linuksa. Dokładnie jak zainstalować te procedury? Używam Raspbian Jessie, wersji Debiana Linux dla Raspberry Pi. Próbowałem „sudo apt-get install nonblock” i python-nonblock i oba zgłosiły błąd - nie znaleziono. Pobrałem plik zip z tej strony pypi.python.org/pypi/python-nonblock , ale nie wiem co z tym zrobić. Dzięki .... RDK
RDK