wydajny bufor kołowy?

109

Chcę utworzyć wydajny bufor cykliczny w Pythonie (w celu pobierania średnich wartości całkowitych w buforze).

Czy jest to skuteczny sposób używania listy do zbierania wartości?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Co byłoby bardziej wydajne (i dlaczego)?

jedierikb
źródło
Nie jest to skuteczny sposób implementacji bufora cyklicznego, ponieważ pop (0) jest operacją O (n) na liście. pop (0) usuwa pierwszy element z listy i wszystkie elementy muszą zostać przesunięte w lewo. Zamiast tego użyj collections.deque z atrybutem maxlen. deque ma operację O (1) dla dołączania i wyskakiwania.
Vlad Bezden

Odpowiedzi:

205

Chciałbym skorzystać collections.dequez maxlenarg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

W dokumentacji jest przepis na dequepodobny do tego, co chcesz. Moje twierdzenie, że jest najbardziej efektywny, opiera się całkowicie na fakcie, że jest implementowany w C przez niesamowicie wykwalifikowaną załogę, która ma zwyczaj tworzenia kodu najwyższej klasy.

aaronasterling
źródło
7
+1 Tak, to ładne baterie w zestawie. Operacje dla bufora kołowego to O (1) i, jak mówisz, dodatkowe obciążenie jest w C, więc nadal powinno być dość szybkie
John La Rooy
7
Nie podoba mi się to rozwiązanie, ponieważ dokumentacja nie gwarantuje dostępu swobodnego O (1), gdy maxlenjest zdefiniowane. O (n) jest zrozumiałe, gdy dequemoże rosnąć do nieskończoności, ale jeśli maxlenjest podane, indeksowanie elementu powinno być stałe w czasie.
lvella
1
Domyślam się, że jest zaimplementowany jako lista połączona, a nie tablica.
e-satis
1
Wydaje się słuszne, jeśli czasy w mojej odpowiedzi poniżej są prawidłowe.
djvg
13

wyskakiwanie z początku listy powoduje skopiowanie całej listy, więc jest nieefektywne

Zamiast tego powinieneś użyć listy / tablicy o stałym rozmiarze i indeksu, który przesuwa się przez bufor podczas dodawania / usuwania elementów

John La Rooy
źródło
4
Zgodzić się. Bez względu na to, jak elegancko lub nieelegancko może wyglądać lub jakikolwiek język jest używany. W rzeczywistości im mniej zawracasz sobie głowy garbage collector (lub menedżerem sterty, mechanizmami stronicowania / mapowania lub czymkolwiek, co robi magię pamięci), tym lepiej.
@RocketSurgeon To nie jest magia, po prostu jest to tablica, której pierwszy element jest usuwany. Więc dla tablicy o rozmiarze n oznacza to n-1 operacji kopiowania. Nie jest tu używany żaden odśmiecacz ani podobne urządzenie.
Christian,
3
Zgadzam się. Jest to również znacznie łatwiejsze niż niektórzy myślą. Po prostu używaj stale rosnącego licznika i używaj operatora modulo (% arraylen) podczas uzyskiwania dostępu do przedmiotu.
Andre Blum
idem, możesz sprawdzić mój post powyżej, tak to zrobiłem
MoonCactus
10

W oparciu o odpowiedź MoonCactus , oto circularlistklasa. Różnica w stosunku do jego wersji polega na tym, że tutaj c[0]zawsze będzie dawał najstarszy dołączony element, c[-1]ostatni dołączony element, c[-2]przedostatni… Jest to bardziej naturalne dla aplikacji.

c = circularlist(4)
c.append(1); print c, c[0], c[-1]    #[1]              1, 1
c.append(2); print c, c[0], c[-1]    #[1, 2]           1, 2
c.append(3); print c, c[0], c[-1]    #[1, 2, 3]        1, 3
c.append(8); print c, c[0], c[-1]    #[1, 2, 3, 8]     1, 8
c.append(10); print c, c[0], c[-1]   #[10, 2, 3, 8]    2, 10
c.append(11); print c, c[0], c[-1]   #[10, 11, 3, 8]   3, 11

Klasa:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

[Edytowano]: Dodano opcjonalny dataparametr umożliwiający inicjalizację z istniejących list, np .:

circularlist(4, [1, 2, 3, 4, 5])      #  [2, 3, 4, 5] (4 items)
circularlist(4, set([1, 2, 3, 4, 5])) #  [2, 3, 4, 5] (4 items)
circularlist(4, (1, 2, 3, 4, 5))      #  [2, 3, 4, 5] (4 items)
Basj
źródło
Dobry dodatek. Wykazy Python pozwalają już ujemnych indeksów, ale (-1), np nie zwróci wartość oczekiwaną raz bufor kołowy jest pełna, ponieważ „ostatni” Dodanie do listy kończy się w obrębie listy.
MoonCactus,
1
To działa @MoonCactus, zobacz 6 przykładów, które podałem na początku odpowiedzi; w ostatnich widać c[-1]zawsze właściwy element. __getitem__robi to dobrze.
Basj
o tak, mam na myśli, że mój zawiódł, nie twój, przepraszam: D sprawię, że mój komentarz będzie jaśniejszy! - Och, nie mogę, komentarz jest za stary.
MoonCactus,
ładne proste rozwiązanie. Dodałem opcjonalny argument, aby umożliwić inicjalizację listy z istniejących danych, w ten sposób jest bardziej Pythonpatyczny.
Orwellophile
9

Deque Pythona jest powolny. Możesz także użyć numpy.roll. Jak obrócić liczby w tablicy numpy o kształcie (n,) lub (n, 1)?

W tym teście deque wynosi 448 ms. Numpy.roll to 29 ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

Orvar Korvar
źródło
1
Ale numpy.rollzwraca kopię tablicy, prawda?
djvg
3
Ta odpowiedź jest bardzo myląca - deque w Pythonie wydaje się być dość szybki, ale konwersja zi do tablic numpy znacznie spowalnia ją w testach porównawczych, do których prowadzi łącze.
xitrium
7

ok z użyciem klasy deque, ale dla wymagań pytania (średnia) jest moje rozwiązanie:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
SmartElectron
źródło
Otrzymuję TypeError: 'numpy.float64' object is not callable, próbując wywołać averagemetodę
scls
Tak ... w rzeczywistości wydaje mi się, że deque wewnętrznie używa tablic numpy (po usunięciu @property działa dobrze)
scls
17
Gwarantuję, że deque nie używa wewnętrznie tablic numpy. collectionsjest częścią biblioteki standardowej, numpynie jest. Zależności od bibliotek stron trzecich byłyby okropną biblioteką standardową.
6

Chociaż jest tu już wiele świetnych odpowiedzi, nie mogłem znaleźć żadnego bezpośredniego porównania czasów dla wymienionych opcji. Dlatego proszę, znajdź moją skromną próbę porównania poniżej.

Wyłącznie do celów testowych, klasa może przełączać się między listbuforem collections.dequeopartym na a , buforze a i Numpy.rollbuforze opartym na a .

Zauważ, że updatemetoda dodaje tylko jedną wartość naraz, aby była prosta.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

W moim systemie daje to:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
djvg
źródło
4

A co z rozwiązaniem z książki kucharskiej języka Python , w tym ponownej klasyfikacji instancji bufora pierścieniowego, gdy się zapełni?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

Godnym uwagi wyborem projektowym w implementacji jest to, że ponieważ obiekty te przechodzą nieodwracalną zmianę stanu w pewnym momencie swojego życia - od niepełnego bufora do pełnego bufora (i zmian zachowania w tym momencie) - modelowałem to przez zmianę self.__class__. Działa to nawet w Pythonie 2.2, o ile obie klasy mają te same gniazda (na przykład działa dobrze w przypadku dwóch klas klasycznych, takich jak RingBuffer i__Full tym przepisie).

Zmiana klasy instancji może być dziwna w wielu językach, ale jest Pythonową alternatywą dla innych sposobów przedstawiania sporadycznych, masowych, nieodwracalnych i dyskretnych zmian stanu, które mają ogromny wpływ na zachowanie, jak w tym przepisie. Dobrze, że Python obsługuje go dla wszystkich rodzajów klas.

Kredyt: Sébastien Keim

d8aninja
źródło
Zrobiłem kilka testów szybkości tego vs deque. To około 7 razy wolniej niż deque.
PolyMesh
@PolyMesh super, powinieneś powiadomić autora!
d8aninja
1
jaki byłby w tym sens? To stary opublikowany dokument. Celem mojego komentarza jest poinformowanie innych, że ta odpowiedź jest nieaktualna i zamiast tego użyć deque.
PolyMesh
@PolyMesh prawdopodobnie był jeszcze wolniejszy, kiedy to opublikował; instrukcje dotyczące kontaktu z autorem znajdują się we wstępie do książki. Przedstawiam tylko jedną, możliwą alternatywę. Ponadto „Gdyby tylko prędkość była najlepszą miarą; niestety może być tylko dobra”.
d8aninja
3

Możesz również zobaczyć ten dość stary przepis w Pythonie .

Oto moja własna wersja z tablicą NumPy:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
scls
źródło
4
+1 za używanie numpy, ale -1 za nie implementację bufora cyklicznego. Sposób, w jaki to zaimplementowałeś, przesuwa wszystkie dane za każdym razem, gdy dodajesz pojedynczy element, to kosztuje O(n)czas. Aby zaimplementować odpowiedni bufor cykliczny , powinieneś mieć zarówno indeks, jak i zmienną rozmiaru, i musisz poprawnie obsłużyć przypadek, gdy dane „zawijają się” na końcu bufora. Podczas pobierania danych może być konieczne połączenie dwóch sekcji na początku i na końcu bufora.
Bas Swinckels
2

Ten nie wymaga żadnej biblioteki. Tworzy listę, a następnie przechodzi do niej według indeksu.

Ślad jest bardzo mały (brak biblioteki) i działa przynajmniej dwa razy szybciej niż usuwanie z kolejki. Rzeczywiście dobrze jest obliczyć średnie kroczące, ale należy pamiętać, że elementy nie są sortowane według wieku, jak powyżej.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

Aby uzyskać średnią wartość, np .:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Prowadzi do:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

To około 1/3 czasu odpowiednika z kolejką.

MoonCactus
źródło
1
Nie powinieneś __getitem__być trochę mocniejszy self._data[(key + self._index + 1) % self._size]:?
Mateen Ulhaq
Dlaczego chcesz przesunąć o +1? Teraz tak, zobacz wariant Basj poniżej dla pomysłu
MoonCactus
1

Miałem ten problem przed programowaniem szeregowym. Nieco ponad rok temu też nie mogłem znaleźć żadnej wydajnej implementacji, więc napisałem jedną jako rozszerzenie C i jest ona również dostępna na pypi na licencji MIT. Jest super podstawowy, obsługuje tylko bufory 8-bitowych znaków ze znakiem, ale ma elastyczną długość, więc możesz użyć Struct lub czegoś na nim, jeśli potrzebujesz czegoś innego niż znaki. Widzę teraz w wyszukiwarce Google, że obecnie jest kilka opcji, więc warto też je przejrzeć.

sirlark
źródło
1

Twoja odpowiedź jest nieprawidłowa. Główny bufor kołowy ma dwie zasady (https://en.wikipedia.org/wiki/Circular_buffer )

  1. Ustawiana jest jedna część bufora;
  2. Pierwszy wchodzi pierwszy wychodzi;
  3. Podczas dodawania lub usuwania elementu inne elementy nie powinny zmieniać swojej pozycji

twój kod poniżej:

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Rozważmy sytuację, w której lista jest pełna, używając swojego kodu:

self.mylist = [1, 2, 3, 4, 5]

teraz dodajemy 6, lista zmienia się na

self.mylist = [2, 3, 4, 5, 6]

pozycje oczekujące na 1 na liście zmieniły swoją pozycję

Twój kod jest kolejką, a nie buforem koła.

Myślę, że odpowiedź Basja jest najbardziej skuteczna.

Nawiasem mówiąc, bufor koła może poprawić wydajność operacji dodania elementu.

Johnny Wong
źródło
1

Z Githuba:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

Ijaz Ahmad Khan
źródło
0

Pierwotne pytanie brzmiało: „ wydajny ” bufor cykliczny. Zgodnie z tą żądaną skutecznością, odpowiedź udzielona przez aaronasterling wydaje się być definitywnie poprawna. Użycie dedykowanej klasy zaprogramowanej w Pythonie i porównanie przetwarzania czasu z collections.deque pokazuje przyspieszenie x5,2 razy z deque! Oto bardzo prosty kod do przetestowania tego:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

Aby przekształcić deque w listę, po prostu użyj:

my_list = [v for v in my_deque]

Otrzymasz wtedy O (1) losowy dostęp do przedmiotów Deque. Oczywiście jest to cenne tylko wtedy, gdy musisz wykonać wiele losowych dostępów do deque po jednokrotnym ustawieniu.

Schmouk
źródło
0

Powoduje to zastosowanie tej samej zasady do niektórych buforów przeznaczonych do przechowywania najnowszych wiadomości tekstowych.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
David Torrens
źródło
0

Możesz sprawdzić ten cykliczny bufor oparty na predefiniowanej tablicy numpy o rozmiarze. Pomysł polega na tym, że tworzysz bufor (przydzielasz pamięć dla tablicy numpy), a następnie dodajesz do niego. Wprowadzanie danych i wyszukiwanie jest bardzo szybkie. Stworzyłem ten moduł w podobnym celu, jakiego potrzebujesz. W moim przypadku mam urządzenie, które generuje dane całkowite. Przeczytałem dane i umieściłem je w buforze okrężnym w celu późniejszej analizy i przetwarzania.

Valentyn
źródło