Airflow에 대한 공식문서의 설명
Airflow
는 워크플로우를 프로그래밍 방식으로 작성, 스케줄링 및 모니터링하는 플랫폼입니다.Airflow
를 사용하여 워크플로우를 비순환 그래프로 작성할 수 있습니다.Airflow
스케줄러는 지정된 종속성을 따르면서 워커에게 작업을 부여합니다.- 다양한 명령어들을 통해 복잡한 작업도 수행할 수 있습니다.
- 훌륭한 인터페이스를 통해서 운영 중인 파이프라인을 쉽게 시각화하고, 진행 상황을 모니터링하며, 필요할 때 문제를 해결할 수 있습니다.
- 워크플로우가 코드로 정의되면 유지 관리, 버전 관리, 테스트 및 협업 기능이 향상됩니다.
💡 비순환 그래프는 순환하는 싸이클이 존재하지 않고 일방향성만 가지는것을 의미합니다.
요약하면 Airflow는 개발자가 작성한 Python DAG를 읽고, 거기에 맞춰 Scheduler가 Task를 스케줄링하면, Worker가 Task를 가져가 실행합니다. Task의 실행상태는 Database에 저장되고, 사용자는 UI를 통해서 각 Task의 실행 상태, 성공 여부 등을 확인할 수 있습니다.
Workflow Mangment는 Ariflow말고 다른것들도 존재합니다.
그중에 luigi와 Airflow가 많이 쓰이고 있지만, luigi는 종속성 표현이 복잡합니다.
따라서 Task들의 종속성 설정이 쉬우면서 강력한 UI를 갖고 있는 Airflow가 사용하기 편합니다.
Basic Components
Scheduler
모든 DAG와 Task에 대하여 모니터링 및 관리하고, 실행해야할 Task를 스케줄링 해줍니다.
Web server
Airflow의 웹 UI 서버 입니다.
DAG
Directed Acyclic Graph로 개발자가 Python으로 작성한 워크플로우 입니다. Task들의 의존성과 관계로 구성됩니다.
Database
Airflow에 존재하는 DAG와 Task들의 메타데이터를 저장하는 데이터베이스입니다.
Operator(Task)
Task는 Airflow의 기본 실행 단위이며 오퍼레이터를 사용해 정의됩니다.
Task는 DAG에서 정의된 다음, 실행되어야 하는 순서를 표현하기 위해 그들 사이에 업스트림 및 다운스트림 종속성을 설정합니다.
built-in or per-installed
community Provider packages를 통해 사용할 수 있는 오퍼레이터
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
S3FileTransformOperator
PrestoToMySqlOperator
SlackAPIOperator
Airflow Executor 고민
Executor(worker)는 태스크를 실행하는 메커니즘입니다.
흔히 쓰는 Executor로는 CeleryExecutor
와 KubernetesExecutor
가 존재합니다.
먼저, Celery Executor는 Scheduler가 Task를 전달하고 Worker가 Task를 가져가서 실행하는 방식입니다.
Cerley Exectuor도 Kubernetes위에 배포하면 리소스 관리, 쉬운 버전 업데이트, DAG 배포 자동화, 쉬운 리소스 확장 등의 장점을 가질 수 있습니다.
하지만 Celery에 대한 의존성이 남아있기 때문에 Redis, Celery Worker에 대한 리소스를 계속 점유하고 있어야 합니다.
즉 Scale to Zero가 불가능합니다.
Kubernetes Executor는 Task가 존재할때만 pod가 생성되고 Task가 완료되면 pod가 사라지기 때문에 더 리소스를 효율적으로 사용할 수 있습니다.
물론 Kubernetes Executor의 경우 매번 pod를 새롭게 생성하기 떄문에 시작하는데 딜레이가 생기고 자료가 적고 구성이 복잡하다는 단점이 있습니다.
하지만 Celery Executor의 경우 Stateful Worker와 Redis에 대한 리소스 및 운영복잡도 증가로 인해서 Kubernetes Executor가 좀 더 사용하기 편하다고 생각합니다.
Option
schedule_interval
- crontab과 timedelta object가 사용가능합니다.
- None값을 사용시 수동으로만 실행할 수 있습니다.
depends_on_past
- True라면 이전타임의 같은 Task가 성공해야 실행합니다.
- 3시타임의 Task_A는 2시타임의 Task_A가 성공해야만 실행합니다.
wait_for_downstream
- True라면 이전타임의 모든 Task가 성공해야 실행합니다.
- 3시타임의 Task_A는 2시타임의 Task가 모두 성공해야 실행합니다.
- 이전타임의 DAG이 성공해야 한다는 뜻과 같습니다.
max_active_runs
- 같은 ID의 DAG이 최대 몇개 까지 실행될 수 있는지 설정합니다.
concurrency
- DAG에서 동시에 실행될 수 있는 Task의 최댓값을 설정합니다.
catchup
start_date
부터 현재시간까지 실행됐어야 하는 DAG들의 실행 여부를 결정합니다.- False로 설정시 가장 마지막으로 실행 됐어야 하는 DAG만 실행됩니다.
- True로 설정시 모든 DAG들이 실행됩니다.
start_date 이해하기
처음으로 DAG이 실행되는 시간은 start_date
+ schedule_interval
입니다.
혹시 여러개의 start_date
가 적용되어 있다면 그중에서 min값(최신)이 적용됩니다.
airflow 공식문서는 datetime.now()
와 같은 동적인 값을 start_date
에 사용하지 않는것을 권장합니다.
start_date
는 DAG이 처음 실행되는 시간을 결정하는 값입니다. 그런데 이 값이 동적으로 변한다는것은 혼란의 여지가 있기 때문입니다.
예를들어 start_date = datetime.now()
schedule_interval = @hourly
로 설정하면 DAG은 영원히 실행되지 않습니니다. 현재시간 보다 한시간뒤에 실행한다는 것은 일어날 수 없기 떄문입니다.
따라서 schedule_interval
의 값에 따라 고정된 start_date
를 사용하는것이 좋습니다.
@hourly
→00:00
@daily
→ 자정@monthly
→ 1일
static 한 start_date
를 정하는 것엔 2가지 방법이 있습니다.
아래 내용은 제 생각입니다.
- 배포 날짜(또는 첫 실행 시간)가 정해지면
start_date
를 수정 후 배포한다. - 과거 아무 날짜나 고정 시켜놓고 사용한다.
첫번째 방법 사용시 DAG의 첫번째 실행 시간에 맞추어 start_date
를 설정해줘야 하기 때문에 불필요한 커밋이 배포때마다 발생하게 됩니다.
두번째 방법은 배포 후 바로 활성화 시키게 되면 원치 않는 시점에 시작할 수 있기 때문에 첫 실행 시간에 대시보드에가서 수동으로 활성화 시켜줘야 합니다.
원치않는 시점이란 뜻은 예를들어 start_date
가 2021-01-01인 DAG을 2021-06-01에 배포해서 2021-06-02때 첫 실행되기를 원하다고 생각합시다. 해당 DAG이 배포 후 바로 활성화 되면 start_date
가 현재시간보다 과거에 있기 떄문에 그 즉시 실행됩니다.
배포이후 특정 시간에 활성화 시켜주는것이 더 수동적이고 코스트가 크다고 생각해 첫번째 방법으로 배포하는게 좋다고 생각합니다.
execution_date 이해하기
Airflow는 ETL을 위한 솔루션으로 개발되었습니다. 따라서 데이터를 요약하는 작업을 주로 행합니다. 만약 2016-02-19에 대한 데이터를 요약하고 싶다면 작업은 2016-02-20 자정에 이루어져야합니다.
이러한 논리로 봤을떄 Airflow 에서 execution_date
는 데이터의 출처를 의미합니다. 만약 execution_date
가 2021-5-30이라면 2021-5-30에 작업이 실행되는것이 아니고 해당 작업에 사용된 데이터는 2021-5-30부터 쌓였다 라고 이해하는것이 옳습니다.
다른말로는 max(start_date, last_run_date)
라고 생각하시면 됩니다. 첫 실행이라면 start_date
가 execution_date
가 되겠지만 여러번 실행됐다면 마지막으로 실행된 시간부터 새로운 데이터가 쌓이기 떄문에 execution_date
는 last_run_date
가 됩니다.
처음 접하면 헷갈리는 부분이니 필히 이해하는것을 추천드립니다.
Web UI에서 Run이 의미하는것이 execution_date
입니다.
Cross-DAG Dependencies
아래와 같은 경우 DAG사이에서 의존성이 필요할 수 있습니다.
- 두개의 DAG이 서로 다른 스케쥴을 갖고있을때
→ e.g. 7시마다 실행되는 DAG이 5시에 실행되는 DAG에 의존하고 있을때 - DAG의 책임이 서로다른 팀에 있어 하나의 DAG으로 만들기 어려울때
- 등등...
ExternalTaskSensor
DAG사이의 의존성을 부여하기 위해 ExternalTaskSensor
을 사용합니다.
child_task1 = ExternalTaskSensor(
task_id="child_task1",
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
timeout=600,
allowed_states=['success'],
failed_states=['failed', 'skipped'],
mode="reschedule",
execution_delta=timedelta(hours=2),
execution_delat_fn=lamda x:x,
)
- external_dag_id → 기다리고 싶은 Task가 포함된 DAG의 id
- external_task_id → 기다리고 싶은 Task id. 입력하지 않을경우 DAG을 기다립니다.
- allowed_state → 기다리고 있는 Task를 통과했다고 판단하는 기준
- failled_states → 기다리고 있는 Task가 실패했다고 판단하는 기준
- execution_delta
→ 기다리고 있는 시간과의 execution_date 차이 (시간차가 있을경우 반드시 설정해야합니다) - execution_delta_fn
→ 첫번째 인자로 current executino date를 받는 함수로써 timedelta를 반환해야합니다.
→ execution_delta 와 execution_delta_fn 둘다 설정할수는 없습니다. - check_existence → 기다릴 Task나 DAG가 있는지 확인 후 없으면 기다리지 않습니다.
ExternalTaskMarker
부모DAG이 실패해서 자식DAG도 함께 실패했을때 부모DAG만 clear해도 자식DAG이 clear되게 하려면 ExternalTaskMarker
를 사용해야 합니다.
parent_task = ExternalTaskMarker(
task_id="parent_task",
external_dag_id="example_external_task_marker_child",
external_task_id="child_task1",
execution_date=
)
다음과 같이 marker를 지정해 놓으면 clear시 marker에 설정되어 있는 DAG도 함께 초기화 됩니다.
ExternalTaskMarker
역시 DAG사이에 시간차가 존재한다면 execution_date에 지정해줘야 합니다.
그런데 위의 ExternalTaskSensor
처럼 delta나 fn값을 받는것이 아니기 때문에 어떤 방법으로 의존성을 짝 지을 수 있는지 못찾았습니다.
짝을 짓는다는 말은 예를들어 매일 5시에 부모가 실행되고 7시에 자식이 실행될때 의존성이 부여된 것은 같은 날짜의 DAG들이지 서로 다른 날짜의 DAG끼리 엮이는것을 원하지 않는다는 뜻입니다.
그런데 현재 execution_date옵션은 하나의 datetime을 받고있기 때문에 고정된 execution_date의 Task만 clear할 수 있어서 한계가 있는것 같습니다.
따라서 아쉽게도 현재는 부모가 실패했을때 부모와 자식을 각각 초기화 해주고 있습니다.
TriggerDagRunOperator
같은 시간에 돌아야 하지만 의존성을 갖는 DAG의 경우 sensor가 아닌 TriggerDagRunOperator
를 사용해서 의존성을 유지할 수도 있습니다.
run_this = TriggerDagRunOperator(
task_id='run_this',
trigger_dag_id='my_dag',
python_callable=modify_dro,
dag=dag
)
Reference
💡 Airflow는 공식문서가 잘 되어있기 때문에 꼭 보시는것을 추천 드립니다.