Próbuję przenieść pliki s3 z „nieusuwalnego” segmentu (co oznacza, że nie mogę usunąć plików) do GCS przy użyciu przepływu powietrza. Nie mogę zagwarantować, że nowe pliki będą tam codziennie, ale muszę codziennie sprawdzać, czy są nowe.
moim problemem jest dynamiczne tworzenie subdagów. Jeśli istnieją pliki, potrzebuję podrzędnych tagów. Jeśli NIE ma plików, nie potrzebuję subdagów. Mój problem dotyczy ustawień pobierania / pobierania danych. W moim kodzie wykrywa pliki, ale nie uruchamia subdagów tak, jak powinny. Coś mi brakuje.
oto mój kod:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
źródło
źródło
spark
zadania lubpython
skrypty i jakich używasz do ich uruchamianialivy
lub innych metodfiles
pusta lista?Odpowiedzi:
Poniżej znajduje się zalecany sposób utworzenia dynamicznego DAG lub sub-DAG w przepływie powietrza, chociaż są też inne sposoby, ale myślę, że w dużym stopniu dotyczy to twojego problemu.
Najpierw utwórz plik,
(yaml/csv)
który zawiera listę wszystkichs3
plików i lokalizacji, w twoim przypadku napisałeś funkcję do przechowywania ich na liście, powiedziałbym, że przechowuj je w osobnymyaml
pliku i załaduj w czasie wykonywania w env airflow, a następnie utwórz DAG.Poniżej znajduje się przykładowy
yaml
plik:dynamicDagConfigFile.yaml
Możesz zmodyfikować swoją
Check_For_Files
funkcję, aby zapisać je wyaml
pliku.Teraz możemy przejść do dynamicznego tworzenia dag:
Najpierw zdefiniuj dwa zadania za pomocą fałszywych operatorów, start początek i koniec zadania. Takie zadania są tymi, na których będziemy się opierać
DAG
, dynamicznie tworząc między nimi zadania:Dynamiczny DAG: użyjemy
PythonOperators
w przepływie powietrza. Funkcja powinna otrzymać jako argumenty identyfikator zadania; funkcja Pythona do wykonania, tj. Python_callable dla operatora Python; oraz zestaw argumentów do użycia podczas wykonywania.Dołącz argument
task id
. Możemy więc wymieniać dane między zadaniami generowanymi dynamicznie, npXCOM
. Poprzez .Możesz określić swoją funkcję operacyjną w tym dynamicznym dag jak
s3_to_gcs_op
.Wreszcie na podstawie położenia obecnego w pliku yaml możesz tworzyć dynamiczne dags, najpierw przeczytaj
yaml
plik jak poniżej i utwórz dynamiczny dag:Ostateczna definicja DAG:
Chodzi o to, że
Pełny kod przepływu powietrza w kolejności:
źródło
upload_s3_toGCS
nie będzie istnieć i wystąpi błąd przepływu powietrza.yaml
pliku po przesłaniu wszystkich plików do GCS, w ten sposób tylko nowe pliki będą obecne wyaml
pliku. A jeśli nie będzie nowych plików,yaml
plik będzie pusty i nie zostanie utworzony dynamiczny dag. Dlategoyaml
plik jest znacznie lepszą opcją niż przechowywanie plików na liście.yaml
Plik będzie również pomóc w utrzymaniu rejestrowanie plików s3 w sposób, jeśli załóżmy niektóre pliku s3 nie zostać przesłany do OWS, następnie można również utrzymać flagę odpowiadającą tym pliku, a następnie ponownie je przy następnym uruchomieniu DAG.if
warunek przed DAG, który sprawdzi, czy wyaml
plikach są nowe pliki, jeśli są nowe pliki, uruchom go, w przeciwnym razie pomiń go.