Problem
Czy istnieje sposób w Airflow, aby utworzyć przepływ pracy, w którym liczba zadań B. * jest nieznana do zakończenia zadania A? Przyjrzałem się subdagom, ale wygląda na to, że może działać tylko ze statycznym zestawem zadań, które należy określić podczas tworzenia Dag.
Czy wyzwalacze sztyletowe zadziałają? A jeśli tak, czy mógłbyś podać przykład.
Mam problem polegający na tym, że nie można poznać liczby zadań B, które będą potrzebne do obliczenia zadania C, dopóki zadanie A nie zostanie zakończone. Obliczenie każdego zadania B. * zajmie kilka godzin i nie można ich łączyć.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Pomysł nr 1
Nie podoba mi się to rozwiązanie, ponieważ muszę utworzyć blokujący czujnik ExternalTaskSensor, a wykonanie całego zadania B. * zajmie od 2 do 24 godzin. Dlatego nie uważam tego za wykonalne rozwiązanie. Na pewno jest łatwiejszy sposób? A może Airflow nie został do tego zaprojektowany?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Edycja 1:
Jak na razie to pytanie nadal nie ma świetnej odpowiedzi . Skontaktowało się ze mną kilka osób szukających rozwiązania.
Odpowiedzi:
Oto, jak to zrobiłem z podobną prośbą bez żadnych subdagów:
Najpierw utwórz metodę, która zwraca dowolne wartości
def values_function(): return values
Następnie utwórz metodę, która będzie dynamicznie generować zlecenia:
def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}" return BashOperator( task_id='JOB_NAME_{}'.format(number), bash_command='script.sh {} {}'.format(dyn_value, number), dag=dag)
A następnie połącz je:
push_func = PythonOperator( task_id='push_func', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id='All_jobs_completed', dag=dag) for i in values_function(): push_func >> group(i) >> complete
źródło
for i in values_function()
spodziewałbym się czegoś takiegofor i in push_func_output
. Problem polega na tym, że nie mogę znaleźć sposobu na dynamiczne uzyskanie tego wyniku. Wyjście PythonOperator będzie w Xcom po wykonaniu, ale nie wiem, czy mogę odwołać się do niego z definicji DAG.group
?Wymyśliłem sposób tworzenia przepływów pracy na podstawie wyników poprzednich zadań.
Zasadniczo to, co chcesz zrobić, to mieć dwa subdagi z następującymi elementami:
def return_list()
)parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), prawdopodobnie można by tu dodać więcej filtrów.dag_id='%s.%s' % (parent_dag_name, 'test1')
Teraz przetestowałem to w mojej lokalnej instalacji przepływu powietrza i działa dobrze. Nie wiem, czy część ściągająca xcom będzie miała jakiekolwiek problemy, jeśli będzie więcej niż jedna instancja dag działająca w tym samym czasie, ale wtedy prawdopodobnie użyłbyś unikalnego klucza lub czegoś podobnego, aby jednoznacznie zidentyfikować xcom wartość, którą chcesz. Prawdopodobnie można by zoptymalizować 3. krok, aby mieć 100% pewności, że otrzymasz określone zadanie z bieżącego głównego dag, ale do mojego użytku działa to wystarczająco dobrze, myślę, że do użycia xcom_pull potrzebny jest tylko jeden obiekt task_instance.
Czyszczę również xcomy dla pierwszego subdagu przed każdym wykonaniem, aby upewnić się, że przypadkowo nie otrzymam złej wartości.
Jestem dość kiepski w wyjaśnianiu, więc mam nadzieję, że poniższy kod wyjaśni wszystko:
test1.py
from airflow.models import DAG import logging from airflow.operators.python_operator import PythonOperator from airflow.operators.postgres_operator import PostgresOperator log = logging.getLogger(__name__) def test1(parent_dag_name, start_date, schedule_interval): dag = DAG( '%s.test1' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date, ) def return_list(): return ['test1', 'test2'] list_extract_folder = PythonOperator( task_id='list', dag=dag, python_callable=return_list ) clean_xcoms = PostgresOperator( task_id='clean_xcoms', postgres_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) clean_xcoms >> list_extract_folder return dag
test2.py
from airflow.models import DAG, settings import logging from airflow.operators.dummy_operator import DummyOperator log = logging.getLogger(__name__) def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None): dag = DAG( '%s.test2' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date ) if len(parent_dag.get_active_runs()) > 0: test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull( dag_id='%s.%s' % (parent_dag_name, 'test1'), task_ids='list') if test_list: for i in test_list: test = DummyOperator( task_id=i, dag=dag ) return dag
i główny przepływ pracy:
test.py
from datetime import datetime from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from subdags.test1 import test1 from subdags.test2 import test2 DAG_NAME = 'test-dag' dag = DAG(DAG_NAME, description='Test workflow', catchup=False, schedule_interval='0 0 * * *', start_date=datetime(2018, 8, 24)) test1 = SubDagOperator( subdag=test1(DAG_NAME, dag.start_date, dag.schedule_interval), task_id='test1', dag=dag ) test2 = SubDagOperator( subdag=test2(DAG_NAME, dag.start_date, dag.schedule_interval, parent_dag=dag), task_id='test2', dag=dag ) test1 >> test2
źródło
_ _init_ _.py
do folderu subdags. błąd debiutantaTak, jest to możliwe, stworzyłem przykład DAG, który to demonstruje.
import airflow from airflow.operators.python_operator import PythonOperator import os from airflow.models import Variable import logging from airflow import configuration as conf from airflow.models import DagBag, TaskInstance from airflow import DAG, settings from airflow.operators.bash_operator import BashOperator main_dag_id = 'DynamicWorkflow2' args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), 'provide_context': True } dag = DAG( main_dag_id, schedule_interval="@once", default_args=args) def start(*args, **kwargs): value = Variable.get("DynamicWorkflow_Group1") logging.info("Current DynamicWorkflow_Group1 value is " + str(value)) def resetTasksStatus(task_id, execution_date): logging.info("Resetting: " + task_id + " " + execution_date) dag_folder = conf.get('core', 'DAGS_FOLDER') dagbag = DagBag(dag_folder) check_dag = dagbag.dags[main_dag_id] session = settings.Session() my_task = check_dag.get_task(task_id) ti = TaskInstance(my_task, execution_date) state = ti.current_state() logging.info("Current state of " + task_id + " is " + str(state)) ti.set_state(None, session) state = ti.current_state() logging.info("Updated state of " + task_id + " is " + str(state)) def bridge1(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 2 variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue)) os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date'])) def bridge2(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 3 variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue)) os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date'])) def end(*args, **kwargs): logging.info("Ending") def doSomeWork(name, index, *args, **kwargs): # Do whatever work you need to do # Here I will just create a new file os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt') starting_task = PythonOperator( task_id='start', dag=dag, provide_context=True, python_callable=start, op_args=[]) # Used to connect the stream in the event that the range is zero bridge1_task = PythonOperator( task_id='bridge1', dag=dag, provide_context=True, python_callable=bridge1, op_args=[]) DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1") logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1)) for index in range(int(DynamicWorkflow_Group1)): dynamicTask = PythonOperator( task_id='firstGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['firstGroup', index]) starting_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge1_task) # Used to connect the stream in the event that the range is zero bridge2_task = PythonOperator( task_id='bridge2', dag=dag, provide_context=True, python_callable=bridge2, op_args=[]) DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2)) for index in range(int(DynamicWorkflow_Group2)): dynamicTask = PythonOperator( task_id='secondGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['secondGroup', index]) bridge1_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge2_task) ending_task = PythonOperator( task_id='end', dag=dag, provide_context=True, python_callable=end, op_args=[]) DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3)) for index in range(int(DynamicWorkflow_Group3)): # You can make this logic anything you'd like # I chose to use the PythonOperator for all tasks # except the last task will use the BashOperator if index < (int(DynamicWorkflow_Group3) - 1): dynamicTask = PythonOperator( task_id='thirdGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['thirdGroup', index]) else: dynamicTask = BashOperator( task_id='thirdGroup_' + str(index), bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt', dag=dag) bridge2_task.set_downstream(dynamicTask) dynamicTask.set_downstream(ending_task) # If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream # and your tasks will run simultaneously instead of in your desired stream order. starting_task.set_downstream(bridge1_task) bridge1_task.set_downstream(bridge2_task) bridge2_task.set_downstream(ending_task)
Przed uruchomieniem DAG utwórz te trzy zmienne przepływu powietrza
airflow variables --set DynamicWorkflow_Group1 1 airflow variables --set DynamicWorkflow_Group2 0 airflow variables --set DynamicWorkflow_Group3 0
Zobaczysz, że DAG pochodzi z tego
Do tego po tym, jak biegnie
Więcej informacji na temat tego DAG można znaleźć w moim artykule na temat tworzenia dynamicznych przepływów pracy w przepływie powietrza .
źródło
OA: „Czy w Airflow istnieje sposób na utworzenie takiego przepływu pracy, w którym liczba zadań B. * jest nieznana do czasu zakończenia zadania A?”
Krótka odpowiedź brzmi: nie. Airflow zbuduje przepływ DAG przed uruchomieniem go.
To powiedziawszy, doszliśmy do prostego wniosku, że nie mamy takiej potrzeby. Jeśli chcesz zrównoleglenie niektórych prac, powinieneś ocenić dostępne zasoby, a nie liczbę elementów do przetworzenia.
Zrobiliśmy to w ten sposób: dynamicznie generujemy stałą liczbę zadań, powiedzmy 10, które podzielą pracę. Na przykład, jeśli musimy przetworzyć 100 plików, każde zadanie przetworzy 10 z nich. Kod prześlę dzisiaj później.
Aktualizacja
Oto kod, przepraszam za opóźnienie.
from datetime import datetime, timedelta import airflow from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 8), 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(seconds=5) } dag = airflow.DAG( 'parallel_tasks_v1', schedule_interval="@daily", catchup=False, default_args=args) # You can read this from variables parallel_tasks_total_number = 10 start_task = DummyOperator( task_id='start_task', dag=dag ) # Creates the tasks dynamically. # Each one will elaborate one chunk of data. def create_dynamic_task(current_task_number): return DummyOperator( provide_context=True, task_id='parallel_task_' + str(current_task_number), python_callable=parallelTask, # your task will take as input the total number and the current number to elaborate a chunk of total elements op_args=[current_task_number, int(parallel_tasks_total_number)], dag=dag) end = DummyOperator( task_id='end', dag=dag) for page in range(int(parallel_tasks_total_number)): created_task = create_dynamic_task(page) start_task >> created_task created_task >> end
Objaśnienie kodu:
Tutaj mamy jedno zadanie początkowe i jedno zadanie końcowe (oba fikcyjne).
Następnie z zadania startowego z pętlą for tworzymy 10 zadań z tym samym wywoływalnym Pythonem. Zadania są tworzone w funkcji create_dynamic_task.
Do każdego wywoływanego Pythona przekazujemy jako argumenty całkowitą liczbę równoległych zadań i bieżący indeks zadań.
Załóżmy, że masz 1000 elementów do opracowania: pierwsze zadanie otrzyma dane wejściowe, że powinno opracować pierwszy fragment z 10 fragmentów. Dzieli 1000 pozycji na 10 części i opracowuje pierwszy.
źródło
parallelTask
nie jest zdefiniowana: czy coś mi brakuje?Myślę, że to, czego szukasz, to dynamiczne tworzenie DAG. Kilka dni temu spotkałem się z tego typu sytuacjami, po kilku poszukiwaniach znalazłem tego bloga .
Dynamiczne generowanie zadań
start = DummyOperator( task_id='start', dag=dag ) end = DummyOperator( task_id='end', dag=dag) def createDynamicETL(task_id, callableFunction, args): task = PythonOperator( task_id = task_id, provide_context=True, #Eval is used since the callableFunction var is of type string #while the python_callable argument for PythonOperators only receives objects of type callable not strings. python_callable = eval(callableFunction), op_kwargs = args, xcom_push = True, dag = dag, ) return task
Ustawianie przepływu pracy DAG
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f: # Use safe_load instead to load the YAML file configFile = yaml.safe_load(f) # Extract table names and fields to be processed tables = configFile['tables'] # In this loop tasks are created for each table defined in the YAML file for table in tables: for table, fieldName in table.items(): # In our example, first step in the workflow for each table is to get SQL data from db. # Remember task id is provided in order to exchange data among tasks generated in dynamic way. get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table), 'getSQLData', {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass', 'dbname': configFile['dbname']}) # Second step is upload data to s3 upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table), 'uploadDataToS3', {'previous_task_id': '{}-getSQLData'.format(table), 'bucket_name': configFile['bucket_name'], 'prefix': configFile['prefix']}) # This is where the magic lies. The idea is that # once tasks are generated they should linked with the # dummy operators generated in the start and end tasks. # Then you are done! start >> get_sql_data_task get_sql_data_task >> upload_to_s3_task upload_to_s3_task >> end
Tak wygląda nasz DAG po złożeniu kodu
import yaml import airflow from airflow import DAG from datetime import datetime, timedelta, time from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator start = DummyOperator( task_id='start', dag=dag ) def createDynamicETL(task_id, callableFunction, args): task = PythonOperator( task_id=task_id, provide_context=True, # Eval is used since the callableFunction var is of type string # while the python_callable argument for PythonOperators only receives objects of type callable not strings. python_callable=eval(callableFunction), op_kwargs=args, xcom_push=True, dag=dag, ) return task end = DummyOperator( task_id='end', dag=dag) with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f: # use safe_load instead to load the YAML file configFile = yaml.safe_load(f) # Extract table names and fields to be processed tables = configFile['tables'] # In this loop tasks are created for each table defined in the YAML file for table in tables: for table, fieldName in table.items(): # In our example, first step in the workflow for each table is to get SQL data from db. # Remember task id is provided in order to exchange data among tasks generated in dynamic way. get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table), 'getSQLData', {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass', 'dbname': configFile['dbname']}) # Second step is upload data to s3 upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table), 'uploadDataToS3', {'previous_task_id': '{}-getSQLData'.format(table), 'bucket_name': configFile['bucket_name'], 'prefix': configFile['prefix']}) # This is where the magic lies. The idea is that # once tasks are generated they should linked with the # dummy operators generated in the start and end tasks. # Then you are done! start >> get_sql_data_task get_sql_data_task >> upload_to_s3_task upload_to_s3_task >> end
To było bardzo pomocne, pełne nadziei. Pomoże też komuś innemu
źródło
Myślę, że znalazłem lepsze rozwiązanie tego problemu na https://github.com/mastak/airflow_multi_dagrun , który wykorzystuje proste kolejkowanie DagRuns poprzez wyzwalanie wielu dagrunów, podobnie jak TriggerDagRuns . Większość kredytów trafia na https://github.com/mastak , chociaż musiałem załatać kilka szczegółów aby działały z najnowszym przepływem powietrza.
Rozwiązanie wykorzystuje niestandardowego operatora, który uruchamia kilka DagRunów :
from airflow import settings from airflow.models import DagBag from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow.utils import timezone class TriggerMultiDagRunOperator(TriggerDagRunOperator): CREATED_DAGRUN_KEY = 'created_dagrun_key' @apply_defaults def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs): super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs) self.op_args = op_args or [] self.op_kwargs = op_kwargs or {} def execute(self, context): context.update(self.op_kwargs) session = settings.Session() created_dr_ids = [] for dro in self.python_callable(*self.op_args, **context): if not dro: break if not isinstance(dro, DagRunOrder): dro = DagRunOrder(payload=dro) now = timezone.utcnow() if dro.run_id is None: dro.run_id = 'trig__' + now.isoformat() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, execution_date=now, state=State.RUNNING, conf=dro.payload, external_trigger=True, ) created_dr_ids.append(dr.id) self.log.info("Created DagRun %s, %s", dr, now) if created_dr_ids: session.commit() context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids) else: self.log.info("No DagRun created") session.close()
Następnie możesz przesłać kilka dagrunów z funkcji wywoływalnej w swoim PythonOperator, na przykład:
from airflow.operators.dagrun_operator import DagRunOrder from airflow.models import DAG from airflow.operators import TriggerMultiDagRunOperator from airflow.utils.dates import days_ago def generate_dag_run(**kwargs): for i in range(10): order = DagRunOrder(payload={'my_variable': i}) yield order args = { 'start_date': days_ago(1), 'owner': 'airflow', } dag = DAG( dag_id='simple_trigger', max_active_runs=1, schedule_interval='@hourly', default_args=args, ) gen_target_dag_run = TriggerMultiDagRunOperator( task_id='gen_target_dag_run', dag=dag, trigger_dag_id='common_target', python_callable=generate_dag_run )
Stworzyłem fork z kodem na https://github.com/flinz/airflow_multi_dagrun
źródło
Wykres zadań nie jest generowany w czasie wykonywania. Wykres jest raczej tworzony, gdy jest pobierany przez Airflow z folderu dags. Dlatego tak naprawdę nie będzie możliwe posiadanie innego wykresu dla zadania za każdym razem, gdy jest ono uruchamiane. Możesz skonfigurować zadanie, aby zbudować wykres na podstawie zapytania w czasie ładowania . Ten wykres pozostanie taki sam dla każdego kolejnego przebiegu, co prawdopodobnie nie jest zbyt przydatne.
Za pomocą operatora gałęzi można zaprojektować wykres, który wykonuje różne zadania przy każdym uruchomieniu na podstawie wyników zapytania.
To, co zrobiłem, to wstępne skonfigurowanie zestawu zadań, a następnie pobranie wyników zapytania i rozprowadzenie ich między zadaniami. Jest to prawdopodobnie lepsze w każdym razie, ponieważ jeśli zapytanie zwraca dużo wyników, prawdopodobnie i tak nie chcesz zapełniać harmonogramu wieloma współbieżnymi zadaniami. Aby być jeszcze bezpieczniejszym, użyłem również puli, aby upewnić się, że moja współbieżność nie wymknie się spod kontroli przy nieoczekiwanie dużym zapytaniu.
""" - This is an idea for how to invoke multiple tasks based on the query results """ import logging from datetime import datetime from airflow import DAG from airflow.hooks.postgres_hook import PostgresHook from airflow.operators.mysql_operator import MySqlOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from include.run_celery_task import runCeleryTask ######################################################################## default_args = { 'owner': 'airflow', 'catchup': False, 'depends_on_past': False, 'start_date': datetime(2019, 7, 2, 19, 50, 00), 'email': ['rotten@stackoverflow'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 0, 'max_active_runs': 1 } dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None) totalBuckets = 5 get_orders_query = """ select o.id, o.customer from orders o where o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval and o.is_test = false and o.is_processed = false """ ########################################################################################################### # Generate a set of tasks so we can parallelize the results def createOrderProcessingTask(bucket_number): return PythonOperator( task_id=f'order_processing_task_{bucket_number}', python_callable=runOrderProcessing, pool='order_processing_pool', op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'}, provide_context=True, dag=dag ) # Fetch the order arguments from xcom and doStuff() to them def runOrderProcessing(task_bucket, **context): orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket) if orderList is not None: for order in orderList: logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}") doStuff(**op_kwargs) # Discover the orders we need to run and group them into buckets for processing def getOpenOrders(**context): myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id') # initialize the task list buckets tasks = {} for task_number in range(0, totalBuckets): tasks[f'order_processing_task_{task_number}'] = [] # populate the task list buckets # distribute them evenly across the set of buckets resultCounter = 0 for record in myDatabaseHook.get_records(get_orders_query): resultCounter += 1 bucket = (resultCounter % totalBuckets) tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])}) # push the order lists into xcom for task in tasks: if len(tasks[task]) > 0: logging.info(f'Task {task} has {len(tasks[task])} orders.') context['ti'].xcom_push(key=task, value=tasks[task]) else: # if we didn't have enough tasks for every bucket # don't bother running that task - remove it from the list logging.info(f"Task {task} doesn't have any orders.") del(tasks[task]) return list(tasks.keys()) ################################################################################################### # this just makes sure that there aren't any dangling xcom values in the database from a crashed dag clean_xcoms = MySqlOperator( task_id='clean_xcoms', mysql_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) # Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our # query returns fewer results than we have buckets, we don't try to run them all. # Unfortunately I couldn't get BranchPythonOperator to take a list of results like the # documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now. get_orders_task = PythonOperator( task_id='get_orders', python_callable=getOpenOrders, provide_context=True, dag=dag ) get_orders_task.set_upstream(clean_xcoms) # set up the parallel tasks -- these are configured at compile time, not at run time: for bucketNumber in range(0, totalBuckets): taskBucket = createOrderProcessingTask(bucketNumber) taskBucket.set_upstream(get_orders_task) ###################################################################################################
źródło
for tasks in tasks
pętli w moim przykładzie usuwam obiekt, nad którym iteruję. To zły pomysł. Zamiast tego pobierz listę kluczy i powtórz ją - lub pomiń usuwanie. Podobnie, jeśli xcom_pull zwraca None (zamiast listy lub pustej listy), pętla for również zawodzi. Można by chcieć uruchomić xcom_pull przed „for”, a następnie sprawdzić, czy jest to None - lub upewnić się, że jest tam przynajmniej pusta lista. YMMV. Powodzenia!open_order_task
?Nie rozumiesz, na czym polega problem?
Oto standardowy przykład. Teraz, jeśli w funkcji subdag zastąpić
for i in range(5):
zefor i in range(random.randint(0, 10)):
wtedy wszystko będzie działać. Teraz wyobraź sobie, że operator „start” umieszcza dane w pliku i zamiast przypadkowej wartości funkcja odczyta te dane. Wtedy „start” operatora wpłynie na liczbę zadań.Problem będzie widoczny tylko w interfejsie użytkownika, ponieważ po wejściu w subdag liczba zadań będzie równa ostatniemu odczytowi z pliku / bazy danych / XCom w tej chwili. Co automatycznie nakłada ograniczenie na kilka uruchomień jednego dnia na raz.
źródło
Znalazłem post na Medium, który jest bardzo podobny do tego pytania. Jest jednak pełen literówek i nie działa, gdy próbowałem go zaimplementować.
Moja odpowiedź na powyższe jest następująca:
Jeśli tworzysz zadania dynamicznie, musisz to zrobić poprzez iterację po czymś, co nie jest tworzone przez zadanie nadrzędne lub może być zdefiniowane niezależnie od tego zadania. Dowiedziałem się, że nie można przekazywać dat wykonania ani innych zmiennych przepływu powietrza do czegoś poza szablonem (np. Zadania), jak wielu innych wskazywało wcześniej. Zobacz także ten post .
źródło