Właściwy sposób tworzenia dynamicznych przepływów pracy w Airflow

98

Problem

Czy istnieje sposób w Airflow, aby utworzyć przepływ pracy, w którym liczba zadań B. * jest nieznana do zakończenia zadania A? Przyjrzałem się subdagom, ale wygląda na to, że może działać tylko ze statycznym zestawem zadań, które należy określić podczas tworzenia Dag.

Czy wyzwalacze sztyletowe zadziałają? A jeśli tak, czy mógłbyś podać przykład.

Mam problem polegający na tym, że nie można poznać liczby zadań B, które będą potrzebne do obliczenia zadania C, dopóki zadanie A nie zostanie zakończone. Obliczenie każdego zadania B. * zajmie kilka godzin i nie można ich łączyć.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Pomysł nr 1

Nie podoba mi się to rozwiązanie, ponieważ muszę utworzyć blokujący czujnik ExternalTaskSensor, a wykonanie całego zadania B. * zajmie od 2 do 24 godzin. Dlatego nie uważam tego za wykonalne rozwiązanie. Na pewno jest łatwiejszy sposób? A może Airflow nie został do tego zaprojektowany?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edycja 1:

Jak na razie to pytanie nadal nie ma świetnej odpowiedzi . Skontaktowało się ze mną kilka osób szukających rozwiązania.

costrouc
źródło
Czy wszystkie zadania B * są podobne, ponieważ można je tworzyć w pętli?
Daniel Lee,
Tak, wszystkie zadania B. * można szybko utworzyć w pętli po zakończeniu zadania A. Wykonanie zadania A zajmuje około 2 godzin.
costrouc
Czy znalazłeś rozwiązanie problemu? czy mógłbyś to opublikować?
Daniel Dubovski
3
Przydatne źródło Pomysłu nr 1: linkedin.com/pulse/ ...
Juan Riaza
1
Oto artykuł, który napisałem, wyjaśniający, jak to zrobić linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine
Kyle Bridenstine

Odpowiedzi:

33

Oto, jak to zrobiłem z podobną prośbą bez żadnych subdagów:

Najpierw utwórz metodę, która zwraca dowolne wartości

def values_function():
     return values

Następnie utwórz metodę, która będzie dynamicznie generować zlecenia:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

A następnie połącz je:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
Oleg Yamin
źródło
Gdzie definiowane są wartości?
mnichów
11
Zamiast tego for i in values_function()spodziewałbym się czegoś takiego for i in push_func_output. Problem polega na tym, że nie mogę znaleźć sposobu na dynamiczne uzyskanie tego wyniku. Wyjście PythonOperator będzie w Xcom po wykonaniu, ale nie wiem, czy mogę odwołać się do niego z definicji DAG.
Włączono
@Ena Czy znalazłeś sposób, aby to osiągnąć?
eldos,
1
@eldos zobacz moją odpowiedź poniżej
Ena
1
A co by było, gdybyśmy musieli wykonać w pętli serię zależnych od kroków kroków? Czy w funkcji byłby drugi łańcuch zależności group?
CodingInCircles
12

Wymyśliłem sposób tworzenia przepływów pracy na podstawie wyników poprzednich zadań.
Zasadniczo to, co chcesz zrobić, to mieć dwa subdagi z następującymi elementami:

  1. Xcom wypycha listę (lub cokolwiek, czego potrzebujesz później do utworzenia dynamicznego przepływu pracy) w subdagu, który jest wykonywany jako pierwszy (zobacz test1.py def return_list() )
  2. Przekaż główny obiekt dag jako parametr do drugiego subdagu
  3. Teraz, jeśli masz główny obiekt dag, możesz go użyć, aby uzyskać listę jego instancji zadań. Z tej listy instancji zadań można odfiltrować zadanie bieżącego uruchomienia przy użyciuparent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1] ), prawdopodobnie można by tu dodać więcej filtrów.
  4. W tej instancji zadania możesz użyć xcom pull, aby uzyskać potrzebną wartość, określając dag_id do jednego z pierwszych subdagów: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Użyj listy / wartości, aby dynamicznie tworzyć zadania

Teraz przetestowałem to w mojej lokalnej instalacji przepływu powietrza i działa dobrze. Nie wiem, czy część ściągająca xcom będzie miała jakiekolwiek problemy, jeśli będzie więcej niż jedna instancja dag działająca w tym samym czasie, ale wtedy prawdopodobnie użyłbyś unikalnego klucza lub czegoś podobnego, aby jednoznacznie zidentyfikować xcom wartość, którą chcesz. Prawdopodobnie można by zoptymalizować 3. krok, aby mieć 100% pewności, że otrzymasz określone zadanie z bieżącego głównego dag, ale do mojego użytku działa to wystarczająco dobrze, myślę, że do użycia xcom_pull potrzebny jest tylko jeden obiekt task_instance.

Czyszczę również xcomy dla pierwszego subdagu przed każdym wykonaniem, aby upewnić się, że przypadkowo nie otrzymam złej wartości.

Jestem dość kiepski w wyjaśnianiu, więc mam nadzieję, że poniższy kod wyjaśni wszystko:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

i główny przepływ pracy:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
Christopher Beck
źródło
w Airflow 1.9 te nie ładowały się po dodaniu do folderu DAG, czy czegoś mi brakuje?
Anthony Keane,
@AnthonyKeane, czy umieściłeś testy1.py i test2.py w folderze o nazwie subdags w swoim folderze dag?
Christopher Beck
Zrobiłem tak. Skopiowałem oba pliki do subdagów i umieściłem test.py w folderze dag, nadal pojawia się ten błąd. Uszkodzony DAG: [/home/airflow/gcs/dags/test.py] Brak modułu o nazwie subdags.test1 Uwaga Używam Google Cloud Composer (zarządzany przez Google przepływ powietrza 1.9.0)
Anthony Keane
@AnthonyKeane czy to jedyny błąd, który widzisz w dziennikach? Uszkodzony DAG może być spowodowany błędem kompilacji w subdagu.
Christopher Beck,
3
Cześć @Christopher Beck, znalazłem MÓJ błąd, który musiałem dodać _ _init_ _.pydo folderu subdags. błąd debiutanta
Anthony Keane
11

Tak, jest to możliwe, stworzyłem przykład DAG, który to demonstruje.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Przed uruchomieniem DAG utwórz te trzy zmienne przepływu powietrza

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

Zobaczysz, że DAG pochodzi z tego

wprowadź opis obrazu tutaj

Do tego po tym, jak biegnie

wprowadź opis obrazu tutaj

Więcej informacji na temat tego DAG można znaleźć w moim artykule na temat tworzenia dynamicznych przepływów pracy w przepływie powietrza .

Kyle Bridenstine
źródło
1
Ale co się stanie, jeśli masz wiele DagRunów tego DAG. Czy wszystkie mają te same zmienne?
Mar-k
1
Tak, użyliby tej samej zmiennej; Odnoszę się do tego w moim artykule na samym końcu. Musisz dynamicznie utworzyć zmienną i użyć identyfikatora uruchomienia dag w nazwie zmiennej. Mój przykład jest prosty, aby zademonstrować dynamiczne możliwości, ale musisz zadbać o jakość produkcji :)
Kyle Bridenstine
Czy mosty są potrzebne podczas tworzenia dynamicznych zadań? Przeczytam Twój artykuł w pełni za chwilę, ale chciałem zapytać. W tej chwili zmagam się z utworzeniem dynamicznego zadania opartego na zadaniu nadrzędnym i zaczynam rozumieć, gdzie popełniłem błąd. Moim obecnym problemem jest to, że z jakiegoś powodu nie mogę zsynchronizować DAG z torbą DAG. Mój DAG zsynchronizował się, gdy korzystałem ze statycznej listy w module, ale zatrzymał się, gdy zmieniłem tę statyczną listę, aby została zbudowana z zadania nadrzędnego.
lucid_goose
To bardzo sprytne
jvans
1
@jvans dzięki, że jest sprytny, ale prawdopodobnie nie jakość produkcji
Kyle Bridenstine
6

OA: „Czy w Airflow istnieje sposób na utworzenie takiego przepływu pracy, w którym liczba zadań B. * jest nieznana do czasu zakończenia zadania A?”

Krótka odpowiedź brzmi: nie. Airflow zbuduje przepływ DAG przed uruchomieniem go.

To powiedziawszy, doszliśmy do prostego wniosku, że nie mamy takiej potrzeby. Jeśli chcesz zrównoleglenie niektórych prac, powinieneś ocenić dostępne zasoby, a nie liczbę elementów do przetworzenia.

Zrobiliśmy to w ten sposób: dynamicznie generujemy stałą liczbę zadań, powiedzmy 10, które podzielą pracę. Na przykład, jeśli musimy przetworzyć 100 plików, każde zadanie przetworzy 10 z nich. Kod prześlę dzisiaj później.

Aktualizacja

Oto kod, przepraszam za opóźnienie.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Objaśnienie kodu:

Tutaj mamy jedno zadanie początkowe i jedno zadanie końcowe (oba fikcyjne).

Następnie z zadania startowego z pętlą for tworzymy 10 zadań z tym samym wywoływalnym Pythonem. Zadania są tworzone w funkcji create_dynamic_task.

Do każdego wywoływanego Pythona przekazujemy jako argumenty całkowitą liczbę równoległych zadań i bieżący indeks zadań.

Załóżmy, że masz 1000 elementów do opracowania: pierwsze zadanie otrzyma dane wejściowe, że powinno opracować pierwszy fragment z 10 fragmentów. Dzieli 1000 pozycji na 10 części i opracowuje pierwszy.

Włącz
źródło
1
Jest to dobre rozwiązanie, o ile nie potrzebujesz konkretnego zadania dla elementu (takiego jak postęp, wynik, sukces /
porażka
@Ena parallelTasknie jest zdefiniowana: czy coś mi brakuje?
Anthony Keane,
2
@AnthonyKeane Jest to funkcja Pythona, którą powinieneś wywołać, aby coś zrobić. Jak skomentowano w kodzie, jako dane wejściowe przyjmie całkowitą liczbę i bieżącą liczbę, aby opracować porcję wszystkich elementów.
Ena
4

Myślę, że to, czego szukasz, to dynamiczne tworzenie DAG. Kilka dni temu spotkałem się z tego typu sytuacjami, po kilku poszukiwaniach znalazłem tego bloga .

Dynamiczne generowanie zadań

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Ustawianie przepływu pracy DAG

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

Tak wygląda nasz DAG po złożeniu kodu wprowadź opis obrazu tutaj

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

To było bardzo pomocne, pełne nadziei. Pomoże też komuś innemu

Muhammad Bin Ali
źródło
Czy osiągnąłeś to sam? Próbowałem. Ale zawiodłem.
Newt
Tak, na mnie zadziałało. Jaki masz problem?
Muhammad Bin Ali
1
Mam to. Mój problem został rozwiązany. Dzięki. Po prostu nie udało mi się odczytać zmiennych środowiskowych w obrazach dockera.
Newt
1
co się stanie, jeśli elementy tabeli mogą się zmienić, więc nie możemy ich umieścić w statycznym pliku yaml?
FrankZhu
To naprawdę zależy, gdzie go używasz. Chociaż byłbym zainteresowany tym, co sugerujesz. @FrankZhu, jak należy to zrobić poprawnie?
Muhammad Bin Ali
3

Myślę, że znalazłem lepsze rozwiązanie tego problemu na https://github.com/mastak/airflow_multi_dagrun , który wykorzystuje proste kolejkowanie DagRuns poprzez wyzwalanie wielu dagrunów, podobnie jak TriggerDagRuns . Większość kredytów trafia na https://github.com/mastak , chociaż musiałem załatać kilka szczegółów aby działały z najnowszym przepływem powietrza.

Rozwiązanie wykorzystuje niestandardowego operatora, który uruchamia kilka DagRunów :

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

Następnie możesz przesłać kilka dagrunów z funkcji wywoływalnej w swoim PythonOperator, na przykład:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

Stworzyłem fork z kodem na https://github.com/flinz/airflow_multi_dagrun

flinz
źródło
3

Wykres zadań nie jest generowany w czasie wykonywania. Wykres jest raczej tworzony, gdy jest pobierany przez Airflow z folderu dags. Dlatego tak naprawdę nie będzie możliwe posiadanie innego wykresu dla zadania za każdym razem, gdy jest ono uruchamiane. Możesz skonfigurować zadanie, aby zbudować wykres na podstawie zapytania w czasie ładowania . Ten wykres pozostanie taki sam dla każdego kolejnego przebiegu, co prawdopodobnie nie jest zbyt przydatne.

Za pomocą operatora gałęzi można zaprojektować wykres, który wykonuje różne zadania przy każdym uruchomieniu na podstawie wyników zapytania.

To, co zrobiłem, to wstępne skonfigurowanie zestawu zadań, a następnie pobranie wyników zapytania i rozprowadzenie ich między zadaniami. Jest to prawdopodobnie lepsze w każdym razie, ponieważ jeśli zapytanie zwraca dużo wyników, prawdopodobnie i tak nie chcesz zapełniać harmonogramu wieloma współbieżnymi zadaniami. Aby być jeszcze bezpieczniejszym, użyłem również puli, aby upewnić się, że moja współbieżność nie wymknie się spod kontroli przy nieoczekiwanie dużym zapytaniu.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
zgniły
źródło
Zauważ, że wydaje się, że może być możliwe tworzenie subdagów w locie w wyniku zadania, jednak większość dokumentacji na temat subdagów, którą znalazłem, zdecydowanie zaleca trzymanie się z daleka od tej funkcji, ponieważ powoduje ona więcej problemów niż rozwiązuje w większości przypadków. Widziałem sugestie, że subdagi mogą zostać wkrótce usunięte jako funkcja wbudowana.
zgniły
Zwróć również uwagę, że w for tasks in taskspętli w moim przykładzie usuwam obiekt, nad którym iteruję. To zły pomysł. Zamiast tego pobierz listę kluczy i powtórz ją - lub pomiń usuwanie. Podobnie, jeśli xcom_pull zwraca None (zamiast listy lub pustej listy), pętla for również zawodzi. Można by chcieć uruchomić xcom_pull przed „for”, a następnie sprawdzić, czy jest to None - lub upewnić się, że jest tam przynajmniej pusta lista. YMMV. Powodzenia!
zgniły
1
co jest w open_order_task?
alltej
Masz rację, to jest literówka w moim przykładzie. Powinien to być get_orders_task.set_upstream (). Naprawię to.
zgniły
0

Nie rozumiesz, na czym polega problem?

Oto standardowy przykład. Teraz, jeśli w funkcji subdag zastąpić for i in range(5):ze for i in range(random.randint(0, 10)):wtedy wszystko będzie działać. Teraz wyobraź sobie, że operator „start” umieszcza dane w pliku i zamiast przypadkowej wartości funkcja odczyta te dane. Wtedy „start” operatora wpłynie na liczbę zadań.

Problem będzie widoczny tylko w interfejsie użytkownika, ponieważ po wejściu w subdag liczba zadań będzie równa ostatniemu odczytowi z pliku / bazy danych / XCom w tej chwili. Co automatycznie nakłada ograniczenie na kilka uruchomień jednego dnia na raz.

Denis Shcheglov
źródło
-1

Znalazłem post na Medium, który jest bardzo podobny do tego pytania. Jest jednak pełen literówek i nie działa, gdy próbowałem go zaimplementować.

Moja odpowiedź na powyższe jest następująca:

Jeśli tworzysz zadania dynamicznie, musisz to zrobić poprzez iterację po czymś, co nie jest tworzone przez zadanie nadrzędne lub może być zdefiniowane niezależnie od tego zadania. Dowiedziałem się, że nie można przekazywać dat wykonania ani innych zmiennych przepływu powietrza do czegoś poza szablonem (np. Zadania), jak wielu innych wskazywało wcześniej. Zobacz także ten post .

MarMat
źródło
Jeśli spojrzysz na mój komentarz, zobaczysz, że w rzeczywistości możliwe jest tworzenie zadań na podstawie wyników zadań nadrzędnych.
Christopher Beck