Pandy: Zygzakowata segmentacja danych na podstawie lokalnych minimów i maksimów

10

Mam dane z timeseries. Generowanie danych

date_rng = pd.date_range('2019-01-01', freq='s', periods=400)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']

Chcę utworzyć linię zygzakowatą łączącą lokalne maksima i lokalne minima, które spełniają warunek, że na osi y |highest - lowest value|każdej linii zygzakowatej musi przekraczać procent (powiedzmy 20%) odległości poprzedniej linia zygzakowata ORAZ z góry określona wartość k (powiedzmy 1.2)

Mogę znaleźć lokalną ekstrema za pomocą tego kodu:

# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]

# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)

# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

ale nie wiem, jak zastosować do tego warunek progowy. Proszę mi doradzić, jak zastosować taki warunek.

Ponieważ dane mogą zawierać miliony znaczników czasu, zaleca się wydajne obliczenia

Dla jaśniejszego opisu: wprowadź opis zdjęcia tutaj

Przykładowe dane wyjściowe z moich danych:

 # Instantiate axes.
(fig, ax) = plt.subplots()
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Zigzag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

wprowadź opis zdjęcia tutaj

Moje pożądane wyjście (coś podobnego do tego, zygzak łączy tylko znaczące segmenty) wprowadź opis zdjęcia tutaj

Thanh Nguyen
źródło

Odpowiedzi:

3

Odpowiedziałem na moje najlepsze zrozumienie pytania. Jednak nie jest jasne, w jaki sposób zmienna K wpływa na filtr.

Chcesz odfiltrować ekstrema na podstawie warunków działania. Zakładam, że chcesz zaznaczyć wszystkie ekstremy, których względna odległość do ostatniego zaznaczonego ekstremum jest większa niż p%. Ponadto zakładam, że zawsze uznajesz pierwszy element szeregu czasowego za ważny / istotny punkt.

Zaimplementowałem to za pomocą następującej funkcji filtrowania:

def filter(values, percentage):
    previous = values[0] 
    mask = [True]
    for value in values[1:]: 
        relative_difference = np.abs(value - previous)/previous
        if relative_difference > percentage:
            previous = value
            mask.append(True)
        else:
            mask.append(False)
    return mask

Aby uruchomić kod, najpierw importuję zależności:

from scipy import signal
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

Aby kod był odtwarzalny, naprawiam losowe ziarno:

np.random.seed(0)

Reszta stąd to copypasta. Zauważ, że zmniejszyłem ilość próbki, aby wynik był jasny.

date_rng = pd.date_range('2019-01-01', freq='s', periods=30)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']
# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]
# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)
# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

Następnie używamy funkcji filtrowania:

p = 0.2 # 20% 
filter_mask = filter(df_peaks_valleys.zigzag_y, p)
filtered = df_peaks_valleys[filter_mask]

I wykreśl tak, jak zrobiłeś to zarówno poprzedni wykres, jak i nowo odfiltrowaną ekstrema:

 # Instantiate axes.
(fig, ax) = plt.subplots(figsize=(10,10))
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Extrema")
# Plot zigzag trendline.
ax.plot(filtered['date'].values, filtered['zigzag_y'].values, 
                                                        color='blue', label="ZigZag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

wprowadź opis zdjęcia tutaj

EDYCJA :

Jeśli chcesz zarówno uznać pierwszy, jak i ostatni punkt za poprawne, możesz dostosować funkcję filtrowania w następujący sposób:

def filter(values, percentage):
    # the first value is always valid
    previous = values[0] 
    mask = [True]
    # evaluate all points from the second to (n-1)th
    for value in values[1:-1]: 
        relative_difference = np.abs(value - previous)/previous
        if relative_difference > percentage:
            previous = value
            mask.append(True)
        else:
            mask.append(False)
    # the last value is always valid
    mask.append(True)
    return mask
Nikolas Rieble
źródło
cześć, dzięki za świetną odpowiedź. Tak, twoje założenie jest słuszne „zaznacz wszystkie ekstremy, których względna odległość do ostatniego zaznaczonego ekstremum jest większa niż p%.”, I zawsze należy wziąć pod uwagę zarówno pierwszy, jak i ostatni punkt. Sprawdziłem twoją odpowiedź, czasem brakuje jej ostatniego punktu, czy możesz mi w tym pomóc?
Thanh Nguyen
3

Możesz użyć funkcji Pandas rolling, aby stworzyć lokalną ekstrema. To trochę upraszcza kod w porównaniu do twojego podejścia Scipy.

Funkcje znajdowania ekstremów:

def islocalmax(x):
    """Both neighbors are lower,
    assumes a centered window of size 3"""
    return (x[0] < x[1]) & (x[2] < x[1])

def islocalmin(x):
    """Both neighbors are higher,
    assumes a centered window of size 3"""
    return (x[0] > x[1]) & (x[2] > x[1])

def isextrema(x):
    return islocalmax(x) or islocalmin(x)

Funkcja tworzenia zygzaka, można ją zastosować jednocześnie do ramki danych (nad każdą kolumną), ale wprowadzi to NaN, ponieważ zwracane znaczniki czasu będą różne dla każdej kolumny. Możesz łatwo upuścić je później, jak pokazano w poniższym przykładzie, lub po prostu zastosować funkcję w jednej kolumnie w ramce danych.

Zauważ, że odkomentowałem test względem progu k, nie jestem pewien, czy w pełni rozumiem tę część poprawnie. Możesz to uwzględnić, jeśli absolutna różnica między poprzednim a obecnym ekstremum musi być większa niż k:& (ext_val.diff().abs() > k)

Nie jestem również pewien, czy ostateczny zygzak powinien zawsze przejść z oryginalnego wysokiego do niskiego lub odwrotnie. Założyłem, że tak, w przeciwnym razie możesz usunąć drugie wyszukiwanie ekstremów na końcu funkcji.

def create_zigzag(col, p=0.2, k=1.2):

    # Find the local min/max
    # converting to bool converts NaN to True, which makes it include the endpoints    
    ext_loc = col.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)

    # extract values at local min/max
    ext_val = col[ext_loc]

    # filter locations based on threshold
    thres_ext_loc = (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p)) #& (ext_val.diff().abs() > k)

    # Keep the endpoints
    thres_ext_loc.iloc[0] = True
    thres_ext_loc.iloc[-1] = True

    thres_ext_loc = thres_ext_loc[thres_ext_loc]

    # extract values at filtered locations 
    thres_ext_val = col.loc[thres_ext_loc.index]

    # again search the extrema to force the zigzag to always go from high > low or vice versa,
    # never low > low, or high > high
    ext_loc = thres_ext_val.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)
    thres_ext_val  =thres_ext_val[ext_loc]

    return thres_ext_val

Wygeneruj przykładowe dane:

date_rng = pd.date_range('2019-01-01', freq='s', periods=35)

df = pd.DataFrame(np.random.randn(len(date_rng), 3),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)

df = df.cumsum()

Zastosuj funkcję i wyodrębnij wynik dla kolumny „data1”:

dfzigzag = df.apply(create_zigzag)
data1_zigzag = dfzigzag['data1'].dropna()

Wizualizuj wynik:

fig, axs = plt.subplots(figsize=(10, 3))

axs.plot(df.data1, 'ko-', ms=4, label='original')
axs.plot(data1_zigzag, 'ro-', ms=4, label='zigzag')
axs.legend()

wprowadź opis zdjęcia tutaj

Rutger Kassies
źródło
dzięki za odpowiedź. Chcę zapytać o tę linię (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p)), jak rozumiem, porównujesz odległość między dwoma punktami a p%ostatnim punktem, prawda? Ponieważ chcę porównać każdy zygzakowaty segment z poprzednim segmentem i powtarzać, aż warunek zostanie spełniony.
Thanh Nguyen,