Mam scenerię jak poniżej:
- Naciśnij
Task 1
iTask 2
tylko wtedy, gdy nowe dane są dla nich dostępne w tabeli źródłowej (Athena). Wyzwalacz dla Zadania 1 i Zadania 2 powinien się zdarzyć, gdy nowa parowanie danych nastąpi w ciągu jednego dnia. - Wyzwalacz
Task 3
tylko po zakończeniuTask 1
iTask 2
- Uruchom
Task 4
tylko zakończenieTask 3
Mój kod
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
Jaki jest najlepszy optymalny sposób na osiągnięcie tego?
Task1
iTask2
idzie w pętli. Dla mnie dane są ładowane do tabeli źródłowej Athena 10 AM CET.Odpowiedzi:
Uważam, że twoje pytanie dotyczy dwóch głównych problemów:
schedule_interval
w sposób jawny, więc @daily konfiguruje coś, czego się nie spodziewasz.krótka odpowiedź: ustaw jawnie swój rozkład_czasu za pomocą formatu zadania cron i używaj operatorów czujników do sprawdzania od czasu do czasu
gdzie
startime
jest godzina rozpoczęcia codziennego zadania,endtime
jaka jest ostatnia pora dnia, aby sprawdzić, czy zdarzenie zostało wykonane przed oznaczeniem go jako nieudanego, orazpoke_time
przedział czasusensor_operator
, w którym sprawdzisz, czy zdarzenie miało miejsce.Jak jawnie zaadresować zadanie crona za każdym razem, gdy ustawiasz swój dag
@daily
tak, jak zrobiłeś:z dokumentów widać, że faktycznie robisz:
@daily - Run once a day at midnight
Co teraz ma sens, dlaczego pojawia się błąd przekroczenia limitu czasu i kończy się niepowodzeniem po 5 minutach, ponieważ ustawiono
'retries': 1
i'retry_delay': timedelta(minutes=5)
. Próbuje uruchomić sztylet o północy, ale się nie udaje. ponawia próbę 5 minut później i ponownie kończy się niepowodzeniem, więc oznacza to, że nie powiodło się.Więc w zasadzie @ codzienne uruchamianie ustawia ukryte zadanie cron:
Format zadania cron ma format poniżej i ustawiasz tę wartość,
*
ilekroć chcesz powiedzieć „wszystkie”.Minute Hour Day_of_Month Month Day_of_Week
Więc @daily zasadniczo mówi, uruchamiaj to co: minuta 0 godzina 0 wszystkich dni miesiąca dla wszystkich miesięcy wszystkich dni tygodnia
Tak więc twoja sprawa jest uruchamiana co: minuta 0 godzina 10 wszystkich dni miesiąca wszystkich dni miesiąca wszystkich dni tygodnia. Przekłada się to na format zadania cron na:
Jak poprawnie uruchomić i ponowić wykonanie dag, gdy zależy od zdarzenia zewnętrznego, aby zakończyć wykonanie
możesz wywołać sztylet w przepływie powietrza ze zdarzenia zewnętrznego za pomocą polecenia
airflow trigger_dag
. byłoby to możliwe, gdybyś w jakiś sposób mógł uruchomić skrypt lambda / skrypt Pythona, aby celować w Twoją instancję przepływu powietrza.Jeśli nie możesz uruchomić sztyletu na zewnątrz, użyj operatora czujnika takiego jak OP, ustaw dla niego poke_time i ustaw rozsądną liczbę ponownych prób.
źródło
airflow trigger_dag
. byłoby to możliwe, gdybyś w jakiś sposób mógł uruchomić skrypt lambda / skrypt Pythona, aby celować w twoją instancję przepływu powietrza.AwsGlueCatalogPartitionSensor
wraz z poleceniem airflow{{ds_nodash}}
dla wyjść z partycji. Moje pytanie, jak to zaplanować.