Jak mam się zalogować podczas korzystania z wielu procesów w Pythonie?

239

Teraz mam centralny moduł w ramy, które spawns kilka procesów z użyciem Pythona 2.6 multiprocessingmoduł . Ponieważ używa multiprocessing, istnieje moduł obsługujący wieloprocesowy dziennik na poziomie modułu LOG = multiprocessing.get_logger(). Zgodnie z dokumentami ten program rejestrujący ma blokady współużytkowane przez proces, dzięki czemu nie rozbijasz rzeczy w sys.stderr(lub jakimkolwiek uchwycie pliku) przez jednoczesne pisanie do niego wielu procesów.

Problem, który mam teraz, polega na tym, że inne moduły w frameworku nie obsługują wielu procesów. Z mojego punktu widzenia muszę uzależnić wszystkie zależności od tego modułu centralnego od rejestrowania uwzględniającego przetwarzanie wieloprocesowe. To denerwujące w ramach, nie mówiąc już o wszystkich klientach. Czy są alternatywy, o których nie myślę?

cdleary
źródło
10
Dokumenty, do których linkujesz, podają dokładne przeciwieństwo tego, co mówisz, program rejestrujący nie ma blokad współdzielonych procesów i sprawy się mylą - to też miałem problem.
Sebastian Blask,
3
zobacz przykłady w dokumentacji stdlib: Logowanie do jednego pliku z wielu procesów . Przepisy nie wymagają, aby inne moduły obsługiwały wiele procesów.
jfs
Więc do czego służy przypadek użycia multiprocessing.get_logger()? Wydaje się, że w oparciu o te inne sposoby rejestrowania funkcja rejestrowania multiprocessingma niewielką wartość.
Tim Ludwinski
4
get_logger()to rejestrator używany przez multiprocessingsam moduł. Jest to przydatne, jeśli chcesz debugować multiprocessingproblem.
jfs

Odpowiedzi:

69

Jedynym sposobem radzenia sobie z tym nieinwazyjnie jest:

  1. Odradzaj każdy proces roboczy w taki sposób, aby jego dziennik trafiał do innego deskryptora pliku (na dysk lub do potoku). Najlepiej, aby wszystkie wpisy dziennika były oznaczone znacznikiem czasu.
  2. Proces kontrolera może następnie wykonać jedną z następujących czynności:
    • Jeśli używasz plików dyskowych: Łącz pliki dziennika na końcu przebiegu, posortowane według datownika
    • W przypadku korzystania z rur (zalecane): Wpisy dziennika łączone w locie ze wszystkich rur do centralnego pliku dziennika. (Np. Okresowo na selectpodstawie deskryptorów plików potoków wykonaj sortowanie według dostępnych wpisów w dzienniku i opróżnij dziennik scentralizowany. Powtórz.)
Vladr
źródło
Fajnie, minęło 35 lat, zanim o tym pomyślałem (pomyślałem, że atexitużyję :-). Problem w tym, że nie da ci odczytu w czasie rzeczywistym. Może to być częścią ceny wieloprocesowości, a nie wielowątkowości.
cdleary
@cdleary, stosując podejście potokowe, byłoby to tak blisko czasu rzeczywistego, jak to tylko możliwe (szczególnie jeśli stderr nie jest buforowany w procesach
spawnowania
1
Nawiasem mówiąc, duże założenie tutaj: nie Windows. Czy korzystasz z systemu Windows?
vladr
22
Dlaczego zamiast tego nie skorzystać z funkcji multiprocessing.Queue i logowania w głównym procesie? Wydaje się prostsze.
Brandon Rhodes
1
@BrandonRhodes - Jak powiedziałem, nieinwazyjnie . Użycie multiprocessing.Queuenie będzie prostsze, jeśli do użycia będzie dużo kodu multiprocessing.Queuei / lub jeśli problem stanowi wydajność
vladr
122

Właśnie napisałem własny moduł obsługi dziennika, który przekazuje wszystko do procesu nadrzędnego za pomocą potoku. Testowałem go tylko przez dziesięć minut, ale wydaje się, że działa całkiem dobrze.

( Uwaga: To jest zakodowane na stałe RotatingFileHandler, co jest moim własnym przypadkiem użycia.)


Aktualizacja: @javier utrzymuje teraz to podejście jako pakiet dostępny na Pypi - patrz rejestrowanie wieloprocesowe na Pypi, github na https://github.com/jruere/multiprocessing-logging


Aktualizacja: wdrożenie!

To teraz używa kolejki do poprawnej obsługi współbieżności, a także poprawnie odzyskuje po błędach. Używam tego w produkcji od kilku miesięcy, a obecna wersja poniżej działa bez problemu.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
zzzeek
źródło
4
Powyższy moduł obsługi wykonuje cały zapis plików z procesu nadrzędnego i używa tylko jednego wątku do odbierania wiadomości przekazywanych z procesów potomnych. Jeśli wywołujesz sam moduł obsługi z odrodzonego procesu potomnego, oznacza to, że używa go niepoprawnie, a otrzymasz wszystkie te same problemy, co RotatingFileHandler. Z powyższego kodu korzystam od lat bez problemu.
zzzeek
9
Niestety to podejście nie działa w systemie Windows. Z docs.python.org/library/multiprocessing.html 16.6.2.12 „Pamiętaj, że w systemie Windows procesy potomne odziedziczą tylko poziom programu rejestrującego proces nadrzędny - wszelkie inne modyfikacje programu rejestrującego nie zostaną odziedziczone”. Podprocesy nie odziedziczą modułu obsługi i nie można go przekazać jawnie, ponieważ nie można go zalać.
Noah Yetter
2
Warto zauważyć, że multiprocessing.Queueużywa wątku do put(). Dlatego nie należy wywoływać put(tj. Rejestrować wiadomości przy użyciu modułu MultiProcessingLogobsługi) przed utworzeniem wszystkich podprocesów. W przeciwnym razie wątek będzie martwy w procesie potomnym. Jednym z rozwiązań jest wywołanie Queue._after_fork()na początku każdego procesu potomnego lub użycie multiprocessing.queues.SimpleQueuezamiast niego, który nie wymaga wątku, ale blokuje.
Danqi Wang
5
Czy możesz dodać prosty przykład, który pokazuje inicjalizację, a także użycie z hipotetycznego procesu potomnego? Nie jestem do końca pewien, w jaki sposób proces potomny powinien uzyskać dostęp do kolejki bez tworzenia kolejnej instancji twojej klasy.
JesseBuesking
11
@zzzeek, ​​to rozwiązanie jest dobre, ale nie mogłem znaleźć pakietu z nim lub czegoś podobnego, więc stworzyłem taki o nazwie multiprocessing-logging.
Javier
30

QueueHandlerjest natywny w Pythonie 3.2+ i robi to dokładnie. Jest łatwo replikowany w poprzednich wersjach.

Dokumenty Pythona mają dwa pełne przykłady: Logowanie się do jednego pliku z wielu procesów

Jeśli używasz Pythona <3.2, po prostu skopiuj QueueHandlerdo własnego kodu z: https://gist.github.com/vsajip/591589 lub alternatywnie zaimportuj logutils .

Każdy proces (w tym proces nadrzędny) umieszcza swoje logowanie Queue, a następnie listenerwątek lub proces (dla każdego podano jeden przykład) zbiera je i zapisuje w pliku - bez ryzyka uszkodzenia lub zakłócenia.

fantastyczny
źródło
21

Poniżej znajduje się inne rozwiązanie z naciskiem na prostotę dla każdego innego (takiego jak ja), który przybywa tutaj z Google. Logowanie powinno być łatwe! Tylko dla wersji 3.2 lub wyższej.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
użytkownik2133814
źródło
2
QueueHandlerI QueueListenerklasy mogą być wykorzystane na Python 2.7, jak również, dostępny w logutilspakiecie.
Lev Levitsky
5
Program rejestrujący główny proces powinien również używać narzędzia QueueHandler. W twoim obecnym kodzie główny proces omija kolejkę, więc mogą istnieć warunki wyścigu między głównym procesem a procesami roboczymi. Każdy powinien zalogować się do kolejki (przez QueueHandler) i tylko QueueListener powinien mieć możliwość logowania do StreamHandler.
Ismael EL ATIFI
Ponadto nie trzeba inicjować rejestratora u każdego dziecka. Wystarczy zainicjować program rejestrujący w procesie nadrzędnym i uzyskać program rejestrujący w każdym procesie podrzędnym.
okwap
20

Jeszcze inną alternatywą mogą być różne programy obsługi rejestrowania nie oparte na plikach w loggingpakiecie :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(i inni)

W ten sposób możesz łatwo mieć demona rejestrującego, w którym możesz bezpiecznie pisać i poprawnie obsługiwać wyniki. (Np. Prosty serwer z gniazdami, który po prostu usuwa wiadomość i wysyła ją do własnej obrotowej procedury obsługi plików.)

SyslogHandlerBędzie dbać o to dla ciebie. Oczywiście możesz użyć własnego wystąpienia syslog, a nie systemowego.

Ali Afshar
źródło
13

Wariant innych, który oddziela wątek rejestrowania i kolejki.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
hakowiec
źródło
Podoba mi się pomysł pobrania nazwy rejestratora z rekordu kolejki. Pozwala to na użycie konwencjonalnego fileConfig()w MainProcess i ledwo skonfigurowanego rejestratora w PoolWorkers (tylko z setLevel(logging.NOTSET)). Jak wspomniałem w innym komentarzu, korzystam z Pool, więc musiałem uzyskać kolejkę (proxy) od Managera zamiast wieloprocesowego przetwarzania, aby można było ją marynować. To pozwala mi przekazywać kolejkę do pracownika wewnątrz słownika (który w większości pochodzi z argsparse obiektu vars()). Wydaje mi się, że ostatecznie jest to najlepsze podejście dla MS Windows, które nie ma fork () i psuje rozwiązanie @zzzeak.
mlt
@mlt Wydaje mi się, że zamiast inicjowania menedżera możesz także umieścić kolejkę wieloprzetwarzania zamiast menedżera (patrz odpowiedź na stackoverflow.com/questions/25557686/... - dotyczy blokad, ale uważam, że działa również dla kolejek)
fantastyczny
@fantabolous To nie działa na MS Windows lub innej platformie, której brakuje fork. W ten sposób każdy proces będzie miał własną niezależną, bezużyteczną kolejkę. Drugie podejście w połączonych pytaniach nie działa na takich platformach. Jest to sposób na nieprzenośny kod.
mlt
@mlt Interesujące. Korzystam z systemu Windows i wydaje mi się, że działa dobrze - niedługo po tym, jak ostatnio skomentowałem, utworzyłem pulę procesów współużytkujących multiprocessing.Queuez procesem głównym i od tego czasu ciągle go używam. Nie będę twierdził, że rozumie, dlaczego to działa.
fantastyczny
10

Wszystkie obecne rozwiązania są zbyt sprzężone z konfiguracją rejestrowania za pomocą modułu obsługi. Moje rozwiązanie ma następującą architekturę i funkcje:

  • Możesz użyć dowolnej konfiguracji rejestrowania
  • Logowanie odbywa się w wątku demona
  • Bezpieczne zamknięcie demona za pomocą menedżera kontekstu
  • Komunikacja z wątkiem rejestrującym odbywa się za pomocą multiprocessing.Queue
  • W podprocesach logging.Logger(i już zdefiniowane wystąpienia) są załatane, aby wysłać wszystkie rekordy do kolejki
  • Nowość : format śledzenia i wiadomości przed wysłaniem do kolejki, aby zapobiec błędom wytrawiania

Kod z przykładem użycia i danymi wyjściowymi można znaleźć na następującej liście: https://gist.github.com/schlamar/7003737

schlamar
źródło
Chyba że jestem brakuje czegoś, to nie jest rzeczywiście demon wątek, ponieważ nigdy nie można ustawić daemon_thread.daemonna True. Musiałem to zrobić, aby mój program Python poprawnie zakończył działanie, gdy wystąpi wyjątek w menedżerze kontekstu.
blah238
Musiałem także wychwytywać, rejestrować i połykać wyjątki zgłaszane przez cel, funcw logged_callprzeciwnym razie wyjątek zostałby zniekształcony przez inne zarejestrowane dane wyjściowe. Oto moja zmodyfikowana wersja tego: gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf
blah238
8

Ponieważ możemy reprezentować rejestrowanie wieloprocesowe jak największej liczby wydawców i jednego subskrybenta (słuchacza), użycie ZeroMQ do implementacji przesyłania komunikatów PUB-SUB jest rzeczywiście opcją.

Ponadto moduł PyZMQ , powiązania Pythona dla ZMQ, implementuje PUBHandler , który jest obiektem do publikowania komunikatów rejestrujących przez gniazdo zmq.PUB.

W sieci istnieje rozwiązanie do scentralizowanego rejestrowania z aplikacji rozproszonej przy użyciu PyZMQ i PUBHandler, które można łatwo przystosować do lokalnej pracy z wieloma procesami publikacji.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Samuel
źródło
6

Podoba mi się również odpowiedź Zzzeka, ale Andre ma rację, że kolejka jest wymagana, aby zapobiec zakłóceniom. Miałem trochę szczęścia z fajką, ale widziałem dudnienie, które jest nieco oczekiwane. Wdrożenie go okazało się trudniejsze niż myślałem, szczególnie ze względu na działanie w systemie Windows, gdzie istnieją dodatkowe ograniczenia dotyczące zmiennych globalnych i innych rzeczy (zobacz: Jak zaimplementowano wieloprocesowość Pythona w systemie Windows? )

Ale w końcu udało mi się to uruchomić. Ten przykład prawdopodobnie nie jest idealny, dlatego komentarze i sugestie są mile widziane. Nie obsługuje również ustawiania formatyzatora ani niczego innego niż główny rejestrator. Zasadniczo musisz ponownie uruchomić program rejestrujący w każdym z procesów puli w kolejce i skonfigurować inne atrybuty programu rejestrującego.

Ponownie, wszelkie sugestie dotyczące ulepszenia kodu są mile widziane. Na pewno jeszcze nie znam wszystkich sztuczek Pythona :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Mike Miller
źródło
1
Zastanawiam się, czy if 'MainProcess' == multiprocessing.current_process().name:można go wykorzystać zamiast przejść child?
mlt
W przypadku, gdy ktoś inny próbuje użyć puli procesów zamiast oddzielnych obiektów procesów w systemie Windows, warto wspomnieć, że do przekazywania kolejki do podprocesów należy użyć Menedżera , ponieważ nie można jej bezpośrednio odebrać.
mlt
Ta implementacja działała dla mnie dobrze. Zmodyfikowałem go, aby działał z dowolną liczbą programów obsługi. W ten sposób możesz skonfigurować swój moduł obsługi root w sposób nieobsługujący wielu procesów, a następnie w miejscu, w którym można bezpiecznie utworzyć kolejkę, przekazać do niego moduły obsługi root, usunąć je i uczynić z tego jedynego modułu obsługi.
Jaxor24
3

po prostu opublikuj gdzieś swoje wystąpienie programu rejestrującego. w ten sposób inne moduły i klienci mogą korzystać z interfejsu API, aby uzyskać rejestrator bez konieczności import multiprocessing.

Javier
źródło
1
Problem polega na tym, że rejestratory wieloprocesorowe wydają się nienazwane, więc nie będzie można łatwo rozszyfrować strumienia wiadomości. Być może byłoby możliwe nadanie im nazwy po stworzeniu, co sprawiłoby, że rozsądniej byłoby na to spojrzeć.
cdleary
cóż, opublikuj jeden rejestrator dla każdego modułu, lub lepiej, wyeksportuj różne zamknięcia, które używają rejestratora z nazwą modułu. chodzi o to, aby inne moduły mogły korzystać z twojego API
Javier,
Zdecydowanie rozsądne (i +1 ode mnie!), Ale tęskniłbym za tym, że mogłem tylko import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')z dowolnego miejsca i sprawić, by działał poprawnie.
cdleary
3
To ciekawe zjawisko, które widzę, kiedy korzystam z Pythona, że ​​jesteśmy tak przyzwyczajeni do robienia tego, co chcemy w 1 lub 2 prostych liniach, że proste i logiczne podejście w innych językach (np. Publikowanie rejestratora wieloprocesowego lub owijanie w akcesorium) nadal wydaje się ciężarem. :)
Kylotan
3

Podobała mi się odpowiedź Zzzeka. Po prostu zastąpiłbym potok kolejką, ponieważ jeśli wiele wątków / procesów używa tego samego końca potoku do generowania komunikatów dziennika, zostaną one zniekształcone.

André Cruz
źródło
Miałem pewne problemy z modułem obsługi, chociaż nie było tak, że wiadomości były zniekształcone, po prostu wszystko przestało działać. Zmieniłem Pipe na kolejkę, ponieważ jest to bardziej odpowiednie. Jednak błędy, które otrzymywałem, nie zostały przez to rozwiązane - ostatecznie dodałem metodę try / else do metody receive () - bardzo rzadko próba zarejestrowania wyjątków zakończy się niepowodzeniem i zostanie tam złapana. Po dodaniu try / wyjątek działa bez problemu przez kilka tygodni, a plik standarderr pobierze około dwóch błędnych wyjątków tygodniowo.
zzzeek
2

Co powiesz na delegowanie całego logowania do innego procesu, który odczytuje wszystkie wpisy dziennika z kolejki?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Po prostu udostępnij LOG_QUEUE za pośrednictwem dowolnego mechanizmu wieloprocesowego lub nawet dziedziczenia, a wszystko działa dobrze!

Sawan
źródło
1

Mam rozwiązanie podobne do ironhackera, z tym wyjątkiem, że używam rejestrowania. Wyjątek w niektórych moich kodach i stwierdziłem, że muszę sformatować wyjątek przed przekazaniem go z powrotem do kolejki, ponieważ śledzenia nie można ustawić:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Richard Jones
źródło
Tutaj znalazłem pełny przykład zgodny z tymi zasadami .
Aryeh Leib Taurog
1

Poniżej znajduje się klasa, która może być używana w środowisku Windows, wymaga ActivePython. Możesz także dziedziczyć dla innych programów rejestrujących (StreamHandler itp.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

A oto przykład, który pokazuje użycie:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
użytkownik6336812
źródło
Prawdopodobnie użycie multiprocessing.Lock()zamiast Windows Mutex uczyniłoby to rozwiązanie przenośnym.
xmedeko,
1

Oto mój prosty hack / obejście ... nie jest to najbardziej kompleksowe, ale łatwo modyfikowalne i łatwiejsze do odczytania i zrozumienia, myślę, niż jakiekolwiek inne odpowiedzi, które znalazłem przed napisaniem tego:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
nmz787
źródło
1

Jest ten świetny pakiet

Pakiet: https://pypi.python.org/pypi/multiprocessing-logging/

kod: https://github.com/jruere/multiprocessing-logging

Zainstalować:

pip install multiprocessing-logging

Następnie dodaj:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
juan Isaza
źródło
3
Ta biblioteka jest dosłownie oparta na innym komentarzu do bieżącego postu SO: stackoverflow.com/a/894284/1698058 .
Chris Hunt
Origins: stackoverflow.com/a/894284/1663382 Doceniam przykładowe użycie modułu, oprócz dokumentacji na stronie głównej.
Liquidgenius
0

Jedną z alternatyw jest zapisanie dziennika przetwarzania wielokrotnego do znanego pliku i zarejestrowanie modułu atexitobsługi, aby dołączył do tych procesów i odczytał go ponownie na stderr; jednak w ten sposób nie uzyskasz przepływu w czasie rzeczywistym do komunikatów wyjściowych na stderr.

cdleary
źródło
podejście, które proponujesz poniżej, jest identyczne z podejściem z twojego komentarza tutaj stackoverflow.com/questions/641420/...
iruvar
0

Jeśli masz zakleszczenia występujące w kombinacji blokad, wątków i widelców w loggingmodule, jest to zgłaszane w raporcie o błędzie 6721 (patrz także powiązane pytanie SO ).

Jest mała rozwiązanie fixup pisał tutaj .

To jednak naprawi wszelkie potencjalne impasy logging. To nie naprawi, że rzeczy mogą być zniekształcone. Zobacz inne odpowiedzi przedstawione tutaj.

Albert
źródło
0

Najprostszy pomysł, jak wspomniano:

  • Pobierz nazwę pliku i identyfikator bieżącego procesu.
  • Skonfiguruj a [WatchedFileHandler][1]. Powody tej procedury obsługi zostały szczegółowo omówione tutaj , ale w skrócie istnieją inne gorsze warunki wyścigu z innymi przewodnikami. Ten ma najkrótsze okno na warunki wyścigu.
    • Wybierz ścieżkę, aby zapisać dzienniki, takie jak / var / log / ...
użytkownik 1460675
źródło
0

Dla każdego, kto może tego potrzebować, napisałem dekorator dla pakietu multiprocessing_logging, który dodaje bieżącą nazwę procesu do dzienników, aby stało się jasne, kto co loguje.

Działa również install_mp_handler (), więc uruchomienie go przed utworzeniem puli staje się bezużyteczne.

To pozwala mi zobaczyć, który pracownik tworzy, które rejestruje wiadomości.

Oto plan z przykładem:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
Orsiris de Jong
źródło
-5

Moim dzieciom, które od dziesięcioleci spotykają ten sam problem i znalazłem to pytanie na tej stronie, zostawiam tę odpowiedź.

Prostota kontra nadmierna komplikacja. Po prostu użyj innych narzędzi. Python jest niesamowity, ale nie został zaprojektowany do robienia niektórych rzeczy.

Poniższy fragment kodu dla demona logrotate działa dla mnie i nie komplikuje rzeczy. Zaplanuj, aby działał co godzinę i

/var/log/mylogfile.log {
    size 1
    copytruncate
    create
    rotate 10
    missingok
    postrotate
        timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
        mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
    endscript
}

Tak go instaluję (dowiązania symboliczne nie działają dla logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
baldr
źródło