próba utworzenia dynamicznych subdagów z nadrzędnego dag na podstawie tablicy nazw plików

10

Próbuję przenieść pliki s3 z „nieusuwalnego” segmentu (co oznacza, że ​​nie mogę usunąć plików) do GCS przy użyciu przepływu powietrza. Nie mogę zagwarantować, że nowe pliki będą tam codziennie, ale muszę codziennie sprawdzać, czy są nowe.

moim problemem jest dynamiczne tworzenie subdagów. Jeśli istnieją pliki, potrzebuję podrzędnych tagów. Jeśli NIE ma plików, nie potrzebuję subdagów. Mój problem dotyczy ustawień pobierania / pobierania danych. W moim kodzie wykrywa pliki, ale nie uruchamia subdagów tak, jak powinny. Coś mi brakuje.

oto mój kod:

from airflow import models
from  airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
    'owner': 'Airflow',
    'start_date': dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
    dag_id='My_Ingestion',
    default_args=args,
    schedule_interval='@daily',
    catchup=False
)

def Check_For_Files(**kwargs):
    s3 = S3Hook(aws_conn_id='S3_BOX')
    s3.get_conn()
    bucket = bucket
    LastBDEXDate = int(Variable.get("last_publish_date"))
    maxdate = LastBDEXDate
    files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
    for file in files:
        print(file)
        print(file.split("_")[-2])
        print(file.split("_")[-2][-8:])  ##proves I can see a date in the file name is ok.
        maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
    if maxdate > LastBDEXDate:
        return 'Start_Process'
    return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

    # dag
    subdag = models.DAG(dag_id=dag_id_child,
              default_args=args,
              schedule_interval=None)

    # operators
    s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
        task_id=dag_id_child,
        bucket=bucket,
        prefix=file_name,
        dest_gcs_conn_id='GCP_Account',
        dest_gcs='gs://my_files/To_Process/',
        replace=False,
        gzip=True,
        dag=subdag)


    return subdag

def create_subdag_operator(dag_parent, filename, index):
    tid_subdag = 'file_{}'.format(index)
    subdag = create_subdag(dag_parent, tid_subdag, filename)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

def create_subdag_operators(dag_parent, file_list):
    subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
    # chain subdag-operators together
    chain(*subdags)
    return subdags

check_for_files = BranchPythonOperator(
    task_id='Check_for_s3_Files',
    provide_context=True,
    python_callable=Check_For_Files,
    dag=parent_dag
)

finished = DummyOperator(
    task_id='finished',
    dag=parent_dag
)

decision_to_continue = DummyOperator(
    task_id='Start_Process',
    dag=parent_dag
)

if len(files) > 0:
    subdag_ops = create_subdag_operators(parent_dag, files)
    check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished
arcee123
źródło
Jakie zadania działają na zapleczu tych DAGS, są to sparkzadania lub pythonskrypty i jakich używasz do ich uruchamiania livylub innych metod
ashwin agrawal
Przepraszam, nie rozumiem pytania. czy możesz to powtórzyć?
arcee123
Mam na myśli, że używasz tylko prostych skryptów Pythona i nie używasz żadnej pracy Spark, prawda?
ashwin agrawal
Tak. proste operatory, które są domyślnie w przepływie powietrza. Chcę dodawać istniejących operatorów z dynamiczną szybkością na podstawie oznaczonych plików w S3 Chcę wprowadzić do GCS.
arcee123
Dlaczego filespusta lista?
Oluwafemi Sule

Odpowiedzi:

3

Poniżej znajduje się zalecany sposób utworzenia dynamicznego DAG lub sub-DAG w przepływie powietrza, chociaż są też inne sposoby, ale myślę, że w dużym stopniu dotyczy to twojego problemu.

Najpierw utwórz plik, (yaml/csv)który zawiera listę wszystkich s3plików i lokalizacji, w twoim przypadku napisałeś funkcję do przechowywania ich na liście, powiedziałbym, że przechowuj je w osobnym yamlpliku i załaduj w czasie wykonywania w env airflow, a następnie utwórz DAG.

Poniżej znajduje się przykładowy yamlplik: dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
    - File1: 'S3Loc1'
    - File2: 'S3Loc2'
    - File3: 'S3Loc3'

Możesz zmodyfikować swoją Check_For_Filesfunkcję, aby zapisać je w yamlpliku.

Teraz możemy przejść do dynamicznego tworzenia dag:

Najpierw zdefiniuj dwa zadania za pomocą fałszywych operatorów, start początek i koniec zadania. Takie zadania są tymi, na których będziemy się opierać DAG, dynamicznie tworząc między nimi zadania:

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

Dynamiczny DAG: użyjemy PythonOperatorsw przepływie powietrza. Funkcja powinna otrzymać jako argumenty identyfikator zadania; funkcja Pythona do wykonania, tj. Python_callable dla operatora Python; oraz zestaw argumentów do użycia podczas wykonywania.

Dołącz argument task id. Możemy więc wymieniać dane między zadaniami generowanymi dynamicznie, np XCOM. Poprzez .

Możesz określić swoją funkcję operacyjną w tym dynamicznym dag jak s3_to_gcs_op.

def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Wreszcie na podstawie położenia obecnego w pliku yaml możesz tworzyć dynamiczne dags, najpierw przeczytaj yamlplik jak poniżej i utwórz dynamiczny dag:

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.

Ostateczna definicja DAG:

Chodzi o to, że

#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks. 
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

Pełny kod przepływu powietrza w kolejności:

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
ashwin agrawal
źródło
Dziękuję bardzo. więc jednym z problemów, które miałem, było to, co się stanie, jeśli nie będzie nowych plików? Jednym z problemów, z którymi się zmagam, jest to, że zawsze będą w tym miejscu pliki, ale nie ma gwarancji, że NOWE pliki do pobrania, co oznacza, że ​​sekcja upload_s3_toGCSnie będzie istnieć i wystąpi błąd przepływu powietrza.
arcee123
Możesz rozwiązać problem, usuwając pliki z yamlpliku po przesłaniu wszystkich plików do GCS, w ten sposób tylko nowe pliki będą obecne w yamlpliku. A jeśli nie będzie nowych plików, yamlplik będzie pusty i nie zostanie utworzony dynamiczny dag. Dlatego yamlplik jest znacznie lepszą opcją niż przechowywanie plików na liście.
ashwin agrawal
yamlPlik będzie również pomóc w utrzymaniu rejestrowanie plików s3 w sposób, jeśli załóżmy niektóre pliku s3 nie zostać przesłany do OWS, następnie można również utrzymać flagę odpowiadającą tym pliku, a następnie ponownie je przy następnym uruchomieniu DAG.
ashwin agrawal
A jeśli nie ma nowych plików, możesz postawić ifwarunek przed DAG, który sprawdzi, czy w yamlplikach są nowe pliki, jeśli są nowe pliki, uruchom go, w przeciwnym razie pomiń go.
ashwin agrawal
problem polega na tym, że strumienie downstream są ustawione. jeśli strumienie downstream są ustawione bez rzeczywistych zadań (ponieważ nie istnieją żadne pliki), wystąpi błąd.
arcee123