Jak uruchomić zadanie przepływu powietrza tylko wtedy, gdy nowa partycja / dane są dostępne w tabeli AWS athena przy użyciu DAG w pythonie?

9

Mam scenerię jak poniżej:

  1. Naciśnij Task 1iTask 2 tylko wtedy, gdy nowe dane są dla nich dostępne w tabeli źródłowej (Athena). Wyzwalacz dla Zadania 1 i Zadania 2 powinien się zdarzyć, gdy nowa parowanie danych nastąpi w ciągu jednego dnia.
  2. Wyzwalacz Task 3 tylko po zakończeniu Task 1iTask 2
  3. Uruchom Task 4tylko zakończenieTask 3

wprowadź opis zdjęcia tutaj

Mój kod

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Jaki jest najlepszy optymalny sposób na osiągnięcie tego?

pankaj
źródło
masz jakieś problemy z tym rozwiązaniem?
Bernardo Stearns zmartwychwstał
@ Bernardostearnsreisen, Czasem Task1i Task2idzie w pętli. Dla mnie dane są ładowane do tabeli źródłowej Athena 10 AM CET.
pankaj
masz na myśli pętlę, przepływ powietrza próbuje wiele razy Zadanie 1 i Zadanie 2, dopóki się nie powiedzie?
Bernardo Stearns zmartwychwstał
@Bernardostearnsreisen, yup dokładnie
pankaj
1
@Bernardostearnsreisen, nie wiedziałem, jak przyznać nagrodę :)
pankaj

Odpowiedzi:

1

Uważam, że twoje pytanie dotyczy dwóch głównych problemów:

  1. zapomnienie skonfigurować schedule_interval w sposób jawny, więc @daily konfiguruje coś, czego się nie spodziewasz.
  2. Jak poprawnie uruchomić i ponowić wykonanie dag, gdy zależy od zdarzenia zewnętrznego, aby zakończyć wykonanie

krótka odpowiedź: ustaw jawnie swój rozkład_czasu za pomocą formatu zadania cron i używaj operatorów czujników do sprawdzania od czasu do czasu

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

gdzie startimejest godzina rozpoczęcia codziennego zadania, endtimejaka jest ostatnia pora dnia, aby sprawdzić, czy zdarzenie zostało wykonane przed oznaczeniem go jako nieudanego, oraz poke_timeprzedział czasu sensor_operator, w którym sprawdzisz, czy zdarzenie miało miejsce.

Jak jawnie zaadresować zadanie crona za każdym razem, gdy ustawiasz swój dag@dailytak, jak zrobiłeś:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

z dokumentów widać, że faktycznie robisz: @daily - Run once a day at midnight

Co teraz ma sens, dlaczego pojawia się błąd przekroczenia limitu czasu i kończy się niepowodzeniem po 5 minutach, ponieważ ustawiono 'retries': 1i'retry_delay': timedelta(minutes=5) . Próbuje uruchomić sztylet o północy, ale się nie udaje. ponawia próbę 5 minut później i ponownie kończy się niepowodzeniem, więc oznacza to, że nie powiodło się.

Więc w zasadzie @ codzienne uruchamianie ustawia ukryte zadanie cron:

@daily -> Run once a day at midnight -> 0 0 * * *

Format zadania cron ma format poniżej i ustawiasz tę wartość, *ilekroć chcesz powiedzieć „wszystkie”.

Minute Hour Day_of_Month Month Day_of_Week

Więc @daily zasadniczo mówi, uruchamiaj to co: minuta 0 godzina 0 wszystkich dni miesiąca dla wszystkich miesięcy wszystkich dni tygodnia

Tak więc twoja sprawa jest uruchamiana co: minuta 0 godzina 10 wszystkich dni miesiąca wszystkich dni miesiąca wszystkich dni tygodnia. Przekłada się to na format zadania cron na:

0 10 * * *

Jak poprawnie uruchomić i ponowić wykonanie dag, gdy zależy od zdarzenia zewnętrznego, aby zakończyć wykonanie

  1. możesz wywołać sztylet w przepływie powietrza ze zdarzenia zewnętrznego za pomocą polecenia airflow trigger_dag. byłoby to możliwe, gdybyś w jakiś sposób mógł uruchomić skrypt lambda / skrypt Pythona, aby celować w Twoją instancję przepływu powietrza.

  2. Jeśli nie możesz uruchomić sztyletu na zewnątrz, użyj operatora czujnika takiego jak OP, ustaw dla niego poke_time i ustaw rozsądną liczbę ponownych prób.

Bernardo Stearns Reisen
źródło
Dzięki za to. Również jeśli chcę uruchomić zadania na podstawie zdarzenia, a nie czasu, tj. Jak tylko nowa partycja danych będzie dostępna w źródłowym `AWS Athena Tables`, powinno zostać uruchomione następne zadanie. Jak mam zaplanować. Czy mój obecny kod jest wystarczający.
pankaj
@pankaj, widzę tylko dwie alternatywy. Nie wiem wiele o aws athena, ale możesz wywołać sztylet w przepływie powietrza ze zdarzenia zewnętrznego za pomocą polecenia airflow trigger_dag. byłoby to możliwe, gdybyś w jakiś sposób mógł uruchomić skrypt lambda / skrypt Pythona, aby celować w twoją instancję przepływu powietrza.
Bernardo Stearns powstaje
drugą alternatywą jest mniej więcej to, co robisz, ponieważ nie masz wyzwalacza opartego na zdarzeniu, musisz okresowo sprawdzać, czy to zdarzenie miało miejsce. Tak więc użycie tego obecnego rozwiązania byłoby ustawione na crona na kilka godzin pracy dag z wysoką częstotliwością minut ... wiele się nie powiedzie, ale będzie w stanie dość szybko złapać po zdarzeniu
Bernardo stearns reisen
@Bernado, wymyśliłem pakiet wywołany przez Airflow AwsGlueCatalogPartitionSensorwraz z poleceniem airflow {{ds_nodash}}dla wyjść z partycji. Moje pytanie, jak to zaplanować.
pankaj
@Benado, czy możesz spojrzeć na mój kod, w którym zaimplementowałem wyżej wymieniony czek i podać swoje dane wejściowe
pankaj