Airflow 두번째 글 입니다.
Helm Chart를 이용해서 k8s위에 Airflow를 설치해보려고 합니다.
우선 많이 사용하는 차트는 아래 2가지 옵션이 있습니다.
- airflow-stable
→ Airflow community에서 개발한 차트입니다. - astronomer/airflow-chart
→ Airflow as a Service를 개발하는 astronomer에서 공개한 차트입니다.
저는 airflow-stable
을 이용해서 구현했습니다. 위에 소개된 링크를 들어가면 설명이 잘 되어있어 구현하기가 편했습니다.
이와 별개로 한달전에 apache/airflow
에서 공식버전의 Helm Chart를 발표했습니다. 그에 따라 stable-airflow
가 합쳐지길 원하는지 독자적으로 가길 원하는지에 대한 토론이 여기서 이루어지고 있습니다.
제가 읽은 바로는 나중엔 하나로 합쳐져야 하지만 아직은 공식버전에 부족한 점들이 있기 때문에 stable-airflow
가 오랫동안 유지될것 같습니다.
Kubernetes Executor
아래와 같은 세팅이 필요합니다.
# values.yaml
airflow:
executor: KubernetesExecutor
config:
AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: "True"
workers:
enabled: false
flower:
enabled: false
redis:
enabled: false
Kubernetes Operator
Docker image를 컨테이너로 실행해주는 operator입니다.
options
- cmds → image의 entrypoint를 대신해서 쓸 값입니다.
- arguments → image안에서 실행할 명령어입니다.
- secrets
- image내부에서 사용할 환경변수 입니다.
airflow.kubernetes.secret
에 있는 Secret객체를 사용합니다.- e.g.
[Secret("env", "<KEY_IN_POD>", "<kubectl-secrets-name>", "<KYE_IN_KUBECTL>")]
- image_pull_secrets
- 이미지를 pull 할 때 사용할 인증 정보이며 regcred는 kubectl의 secret name에 해당합니다.
- e.g.
kubernetes.client.models.V1LocalObjectReference("regcred")
- image저장소로 docker-hub을 사용할 경우 다음과 같은 명령어를 통해 생성가능합니다.
kubectl create secret docker-registry regcred \
--docker-username=<username> \
--docker-password=<password> \
--docker-email=<email> -n airflow
DAGs
DAG을 가져오는 방법은 local volume을 mount 하는 방법과 git-sync 하는 방법이 있습니다.
Local Volume
잘 사용하지 않지만 로컬에서 테스트할 때 필요한 경우 아래와 같은 설정이 필요합니다.
저는 WSL2 환경에서 사용 중이라 extraVolumes의 hostPath가 좀 복잡하게 되어있습니다.
# values.yaml
airflow:
extraVolumeMounts:
- name: dags
mountPath: /opt/airflow/dags # location in the container it will put the directory mentioned below.
extraVolumes:
- name: dags
hostPath:
path: "/run/desktop/mnt/host/c/Users/sawaca96/dags" # wsl2 use this path
git-sync
repo가 public이라면 시크릿 없이도 sync가 잘 이루어집니다. 하지만 대부분의 경우 private으로 운영하기 때문에 시크릿을 추가해줘야 합니다.
시크릿의 경우 values에 노출시킬 수 없기 때문에 kubernetes에 등록되어 있는 시크릿을 사용할 수 있도록 만들어져 있습니다. HTTP와 SSH방식 모두 사용 가능합니다.
아래 코드를 참고하세요. documentation에도 설명이 되어있습니다.
# values.yaml
gitSync:
enabled: true
image:
repository: k8s.gcr.io/git-sync/git-sync
tag: v3.2.2
## values: Always or IfNotPresent
pullPolicy: IfNotPresent
uid: 65533
gid: 65533
repo: http://github.com/sawaca96/airflow-k8s-executor-example
repoSubPath: dags
branch: main
revision: HEAD
depth: 1
syncWait: 60
syncTimeout: 120
httpSecret: "airflow-secrets" # if repo is private
httpSecretUsernameKey: GITHUB_USERNAME # if repo is private
httpSecretPasswordKey: GITHUB_TOKEN # if repo is private
시크릿을 추가할 때 아래와 같은 yaml파일을 이용할 수 있는데 kubectl apply -f secrets.yaml -n airflow
명령어를 통해 추가할 수 있습니다.
# secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
type: Opaque
data:
GITHUB_TOKEN: <personal access tokens>
GITHUB_USERNAME: <github username>
위 파일에 들어가는 value들은 전부 base64로 인코딩 되어야 합니다.
echo <value> | base64
를 통해서 인코딩 된 값을 확인할 수 있습니다.
git-sync에 쓰이는 personal access token은 Github → settings → Developer settings → Personal access tokens에서 발급받을 수 있으며 아래와 같이 repo조건만 주면 충분합니다.
Users & Connections
values에 유저나 connections를 등록할 수 있지만 보안을 강화하기 위해서는 values에는 빈 값을 주고 배포 후 cli로 생성해주는 것이 좋습니다.
connections의 경우 안 써주면 되지만 유저의 경우 기본값이 있기 때문에 아래처럼 빈 값을 써줘야 합니다.
# values.yaml
airflow:
users: []
다음과 같이 values를 세팅하고 아래 명령어를 통해 생성하는 것이 좋습니다.
위 명령어는 pod 내부의 환경변수를 사용하고 아래 명령어는 local의 환경변수를 사용합니다.
# use pod env
kubectl exec deployment/airflow-web -n airflow \
-- sh -c 'airflow users create \
--role Admin \
--username $AIRFLOW_ADMIN_USERNAME \
--firstname admin \
--lastname admin \
--email EMAIL@example.org \
--password $AIRFLOW_ADMIN_PASSWORD'
# use local env
kubectl exec deployment/airflow-web -n airflow \
-- airflow users create \
--role Admin \
--username $AIRFLOW_ADMIN_USERNAME \
--firstname admin \
--lastname admin \
--email EMAIL@example.org \
--password $AIRFLOW_ADMIN_PASSWORD
더 많은 명령어는 아래를 참고하세요.
Remote Logging
airflow:
config:
AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://<host>"
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: <conn_id>
다음 설정을 추가한 다음에 아래 명령어를 통해 connection을 추가하시면 됩니다.
kubectl exec deployment/airflow-web -n airflow \
-- airflow connections add '<conn_id>' \
--conn-type S3 \
--conn-login $AWS_ACCESS_KEY \
--conn-password $AWS_SECRET_ACCESS_KEY
extra에 시크릿 값을 넣을 수도 있지만 보안을 위해서 login과 password 값을 사용하는 것이 좋습니다.
login에는 AWS의 액세스 키를 넣고 password에는 AWS의 비밀 엑세스키를 넣어줍니다.
MS Teams webhook
보통 slack을 많이 사용하지만 저는 teams에 메시지 보내는 코드를 작성해보았습니다.
해당 repo에서 작성된 hook을 사용했습니다.
DAG의 on_failure_callback
옵션에 해당 hook을 넣어주면 teams로 alert가 가게 됩니다.
이때 hook에서 사용하는 conn_id는 airflow의 connection id입니다. 따라서 hook을 사용하기 위해서 connection을 추가해줘야 합니다
kubectl exec deployment/airflow-web -n airflow \
-- airflow connections add 'msteams_webhook_url' \
--conn-type HTTP \
--conn-host $AIRFLOW_CONN_MSTEAMS_WEBHOOK_URL_HOST \
--conn-schema https
WEBHOOK_URL은 사진에 있는 앱을 설치해서 얻을 수 있습니다.
PYTHONPATH
hook을 추가하고 import 해서 사용하려 하니 DAG에서 import error가 발생했습니다.
이는 airflow의 sys.path에 sync 받은 폴더가 포함되어 있지 않기 때문입니다.
따라서 아래 옵션을 추가해주면 import가 가능해집니다.
airflow:
config:
PYTHONPATH: $PYTHONPATH:/opt/airflow/dags/repo
AIRFLOW__WEBSERVER__BASE_URL
teams로 날아오는 메시지에 있는 버튼을 통해서 log를 확인하려면 BASE_URL을 변경해줘야 합니다.
기본값이 localhost이기 때문에 production에서는 사용하시는 airflow의 url을 넣어주시면 될 것 같습니다.
Boilerplate
매번 DAG을 작성하면 불편한 점이 존재합니다.
- 매번 중복되는 옵션을 적어야 한다
- 다른 작업자가 처음에 DAG 작성할 때 어려움을 겪는다
따라서 DAG의 기본 형태를 만들어주는 CLI가 있으면 좋겠다고 판단해 Boilerplate를 만들었습니다.
python create_dag.py
Example
자세한 구현 사항은 여기를 참고하세요
제가 예시로 만들어 놓은 DAG에 대해서 설명해드리겠습니다.
teams_alert
→ 직접 실행시켜야 하며 실패 후 아래 메시지를 받을 수 있습니다.
depends_on_past
ON상태로 2번째 태스크가 앞선 타임에서 실패했기 때문에 시작할 수 없습니다. 따라서 DAG이 running상태에서 멈추게 됩니다.
OFF상태로 앞선 타임의 결과에 관계없이 지속적으로 실행됩니다.
wait_for_downstream
ON 상태에서 depends_on_past와 다르게 모든 Task가 실행 안되는 것을 확인할 수 있습니다.
max_active_runs
ON상태로 앞선 DAG이 실행이 종료되어야 다음 DAG을 실행할 수 있습니다.
OFF상태이기 때문에 한 번에 여러 개의 DAG이 running 상태가 될 수 있습니다.
concurrency
ON상태로 설정값만큼의 Task만 queued상태가 됩니다. 그 외는 scheduled상태가 됩니다.
OFF상태로 모든 Task가 running상태에 들어갑니다.
catchup
ON상태로 실행됐어야 했던 모든 DAG이 실행됐습니다.
OFF상태로 마지막 실행만 이루어졌습니다.
cross dag dependency
child의 모습입니다. parent가 성공할 때까지 up_for_reschedule
상태가 됩니다.
Reference