기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

06/21 53일차 Airflow 다양한 기능 사용해보기

usiohc 2023. 6. 21. 16:55

Airflow


주요 메모 사항


Dag Dependencies

 

Dag를 실행하는 방법

주기적 실행: schedule로 지정

다른 Dag에 의해 트리거

ㄴ Explicit Trigger: Dag A가 끝나면 Dag B를 트리거 (TriggerDagRunOperator)

ㄴ Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)

 

알아두면 좋은 상황에 따라 다른 태스크 실행 방식들

ㄴ 조건에 따라 다른 태스크로 분기 (BranchPythonOperator)

ㄴ 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)

ㄴ 앞단 태스크들의 실행상황 -> 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음

 

 

Explicit trigger

TriggerDagRunOperator

DAG A가 명시적으로 DAG B를 트리거

 

Reactive trigger

ExternalTaskSensor

DAG B가 DAG A의 태스크가 끝나기를 대기 -> 이 경우 DAG A는 이 사실을 모름

 

 

 


TriggerDagRunOperator

DAG A의 태스크를 TriggerDagRunOperator로 구현

여기에서 Jinja 라고 하는 Template를 사용해야 한다

 

 

참고: Jinja Template이란?

Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진

ㄴ Django 템플릿 엔진에서 영감을 받아 개발

ㄴ Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성

ㄴ Flask에서 사용됨

 

변수는 이중 중괄호 {{ }}로 감싸서 사용

ㄴ <h1> 안녕하세요, {{ name }}님! </h1>

 

제어문은 퍼센트 기호 {% %}로 표시

ㄴ <ul>
     {% for item in items %}
     <li>{{ item }}</li>
     {% endfor %}
     </ul>

 

 

참고: Jinja Template + Airflow

Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능

ㄴ 이를 통해 재사용가능하고 사용자 정의 가능한 워크플로우 생성

 

예 1) execution_date을 코드 내에서 쉽게 사용: {{ ds }}

ㄴ 가능한 모든 시스템 변수는 여기를 참조

 

 

 

예 2) 파라미터 등으로 넘어온 변수를 쉽게 사용 가능

ㄴ Django를 공부할 때 기억이 난

 

참고: BashOperator 레퍼런스 보기

 

참고: Airflow에서 사용 가능한 Jinja 변수들 몇개 살펴보기

{{ ds }} - 현재 DAG 실행의 실행 날짜를 나타내는 문자열

 

{{ ds_nodash }} - 현재 DAG 실행의 실행 날짜를 나타내는 문자열로, 대시(-) 없이 날짜를 표시

 

{{ ts }} - 현재 Task Instance의 시작 시간을 나타내는 문자열

 

{{ dag }}

현재 DAG 객체를 나타내는 변수, DAG 객체에는 다양한 속성과 메서드가 있으며, DAG의 속성에 접근할 수 있습니다. (예: {{ dag.dag_id }})

 

{{ task }} - 현재 Task 객체를 나타내는 변수, 위의 dag와 비슷하게 사용 가능

 

{{ dag_run }} - DAG 실행에 대한 정보를 나타내는 변수

 

 

Airflow에서는 Jinja 템플릿 엔진을 사용하여 다양한 컨텍스트 변수를 활용할 수 있습니다. 또한, 사용자 정의 변수를 사용하려면 Variable 객체를 사용해야 함

 

{{ var.value }}: {{ var.value.get('my.var', 'fallback') }}

ㄴ Variable 객체를 사용하여 변수 값을 참조합니다. my.var라는 변수 값을 참조하는 예시 변수 값이 없을 경우 fallback을 사용

 

{{ var.json }}: {{ var.json.my_dict_var.key1 }}

JSON 형식의 변수 값을 참조하는 예시, my_dict_var라는 변수에서 key1의 값을 참조

 

{{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}

ㄴ 연결(connnection) 객체를 사용하여 연결 정보를 참조합니다. my_conn_id라는 연결 ID의 로그인(login) 및 비밀번호(password) 값을 참조하는 예시

 

 

참고: Airflow에서 Jinja 변수를 사용한 예제 코드 살펴보기

Learn_Jinja

ㄴ BashOperator 3개로 구성 -> “airflow dags test Learn_Jinja 2023-05-30”

 

 

 

TriggerDagRunOperator

airflow.cfg의 dag_run_conf_overrides_params가 True로 설정되어 있어야함 (어제 포스에서 설정했었음)

오른쪽 하단의 주석은 Jinja 템플릿을 지원하는 Operator이기 때문에 가능하다고 하는 것!

위의 TriggerDagRunOperator의 옵션값들은 이렇다!

  • trigger_dag_id (str): DAG ID를 트리거합니다. 이 값은 템플릿 문자열일 수 있습니다.
  • trigger_run_id (str | None): 트리거된 DAG 실행에 사용할 run ID입니다. 지정하지 않으면 run ID가 자동으로 생성됩니다.
  • conf (dict | None): DAG 실행의 구성입니다. 이 값은 템플릿 사전일 수 있습니다.
  • execution_date (str | datetime.datetime | None): DAG의 실행 날짜입니다. 이 값은 템플릿 문자열 또는 datetime 개체일 수 있습니다.
  • reset_dag_run (bool): 기존 DAG 실행이 이미 존재하는 경우 이를 지울지 여부입니다. 이는 백필 또는 기존 DAG 실행을 다시 실행할 때 유용합니다.
  • wait_for_completion (bool): DAG 실행이 완료될 때까지 기다릴지 여부입니다. 기본값은 False입니다.
  • poke_interval (int): wait_for_completion=True인 경우 DAG 실행의 상태를 확인할 간격입니다. 기본값은 60초입니다.

 

 


Sensor + ExternalTaskSensor

BranchPythonOperator LatestOnlyOperator Trigger Rules

 

 

 

Sensor란?

Sensor는 특정 조건이 충족될 때까지 대기하는 Operator

 

Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용

 

Airflow는 몇 가지 내장 Sensor를 제공

ㄴ FileSensor: 지정된 위치에 파일이 생길 때까지 대기

ㄴ HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기

 SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기

 TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지

ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기

 

기본적으로 주기적으로 poke를 하는 것

ㄴ worker를 하나 붙잡고 poke간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode

    ㄴ mode의 값은 reschedule 혹은 poke가 됨

 

 

ExternalTaskSensor

DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함

ㄴ 먼저 동일한 schedule_interval을 사용

ㄴ 이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨!

ㄴ 조건 자체가 까다롭고 poke를 쓰는게 일반적인데 worker 하나가 체크용으로 낭비되고 있어서, 기용님은 잘 안쓴다고 하셨음

 

 

 

만일 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면 ?

ㄴ 예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?

    ㄴ execution_delta를 사용

    ㄴ execution_date_fn을 사용하면 조금더 복잡하게 컨트롤 가능

ㄴ 만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용불가

 

 

 


BranchPythonOperator

상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터

ㄴ 미리 정해준 Operator들 중에 선택하는 형태로 돌아감 (뒤에서 예제를 볼 예정)

 

TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음

 

기용님이 사용하는 예를 하나 들어주셨는데, 만약 테스트 환경에서는 안필요하고 실제 프로덕션할때만 필요한 task가 있다고 하면, mode or dev와 같은 airflow 변수를 만들어서 접속에 따라 설정하는 등이 가능

 

 

Learn_BranchPythonOperator.py 

current_hour: 0

INFO - Following branch morning_task

Skipping tasks ['afternoon_task']

 

실행되는 Task는 다음과 같음

 

이 경우 실행되지 않은 afternoon_task의 상태는 skipped가 됨

 

UTC 기준으로 12시전이면 morning_task로 가고 아니면 afternoon_task로 브랜치

 

 

Catch을 True로 설정해서 오늘까지의 작업이 실행되었음

UTC 기준이기 때문에 현재 6시로 설정된 모습 -> 그래서 morning_task가 실행되고 arternoon-task가 skip되었음

 

 


LatestOnlyOperator

Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함

 

현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨

ㄴ t1 >> t3 >> [t2, t4]

위 소스코드

 

과거 날짜가 Catchup True로 설정되어 실행은 했으나 skip된 것을 로그를 보면 알 수 있다.

 

Trigger Rules이란?

Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면?

ㄴ 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가

 

Operator에 trigger_rule이란 파라미터로 결정 가능

ㄴ trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능

ㄴ all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success

 

Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)

  • ALL_SUCCESS: 이 트리거 규칙은 모든 부모가 성공할 때까지 대기합니다. 부모 중 하나라도 실패하면 이 트리거 규칙은 실행되지 않습니다.
  • ALL_FAILED: 이 트리거 규칙은 모든 부모가 실패할 때까지 대기합니다. 부모 중 하나라도 성공하면 이 트리거 규칙은 실행되지 않습니다.
  • ALL_DONE: 이 트리거 규칙은 모든 부모가 실행을 완료할 때까지 대기합니다. 부모 중 하나라도 실패하면 이 트리거 규칙은 실행되지 않습니다.
  • ONE_FAILED: 이 트리거 규칙은 부모 중 하나 이상이 실패하면 즉시 실행됩니다. 모든 부모가 완료될 때까지 기다리지 않습니다. 이 트리거 규칙은 부모 중 하나 이상이 성공하더라도 실행됩니다.
  • ONE_SUCCESS: 이 트리거 규칙은 부모 중 하나 이상이 성공하면 즉시 실행됩니다. 모든 부모가 완료될 때까지 기다리지 않습니다. 이 트리거 규칙은 부모 중 하나 이상이 실패하더라도 실행되지 않습니다.
  • NONE_FAILED: 이 트리거 규칙은 모든 부모가 실패하지 않았을 때 (즉, 성공하거나 건너뛰었을 때) 실행됩니다. 이 트리거 규칙은 부모 중 하나 이상이 실패하면 실행되지 않습니다.
  • NONE_FAILED_MIN_ONE_SUCCESS: 이 트리거 규칙은 하나 이상의 부모가 완료되었지만 실패하지 않았을 때 실행됩니다. 이 트리거 규칙은 모든 부모가 실패하면 실행되지 않습니다.

 

Trigger Rule 사용 예

T3는 실패하도록 만든 예제임

exit는 0을 리턴하는데 bash command에 exit 1 로 지정

위의 예제를 실행했을 떄 Web UI 결과

T3에 해당하는 exit task만 실패했으나, final_task는 성공된 것을 볼 수 있음

 

로그를 보면 bash command에 의해 실패한 것을 알 수 있음

 

 

 

 

 

팁: Airflow 메타데이터 DB 내용 살펴보기

airflow:airflow로 Postgres에 로그인 가능

 

psql 쉘에 진입

 

psql shell에서 아래 명령 수행

 

dag_run의 내용을 조금만 출력 해봄

삭제도 가능

DELETE FROM dag_run WHERE dag_id = '기록을삭제하고싶은DAG’;

 

 


Task Grouping

태스크 그룹핑의 필요성

태스크 수가 많은 DAG라면 태스크들을 성격에 따라 관리하고 싶은 니즈 존재

ㄴ SubDAG이 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세

ㄴ SubDAG를 비슷한 일을 하는 태스크들을 SubDAG라는 Child Dag로 만들어서 관리

 

다수의 파일 처리를 하는 DAG라면

ㄴ 파일 다운로드 태스크들과 파일 체크 태스크와 데이터 처리 태스크들로 구성

 

 

예제 살펴보기 - 소스코드

Learn Task Groups

ㄴ TaskGroup 안에 TaskGroup nesting 가능

ㄴ TaskGroup도 태스크처럼 실행 순서 정의 가능

 

 

ㄴ nesting을 한다면 비슷한 Task들일 확률이 높음, DAG

 

 


Dynamic Dags

Dynamic Dags를 사용해서 코드 재사용을 최대화할 수 있다!

Jinja template와 YAML을 사용해야함

 

Dynamic Dag란 무엇인가?

템플릿과 YAML을 기반으로 DAG를 동적으로 만들어보자

ㄴ Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공

 

이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지

 

DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스 필요

ㄴ 오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음

 

 

Dynamic Dag의 기본적인 아이디어

비슷한 구조의 DAG

 

ㄴ 간단한 예제

https://github.com/learndataeng/learn-airflow/tree/main/dags/dynamic_dags

 

템플릿을 통한 최종 DAG 파일 생성

ㄴ learn-airflow % python3 dags/dynamic_dags/generator.py

 

이는 dags 폴더에 yml 파일의 수 만큼의 DAG 코드를 생성해줌

ㄴ generator 실행을 언제할지 결정이 필요, 직접 사람이 필요할 때마다 사용할 것인지 or 아예 generator 자체를 자동화할 것 인지

 

 

 

 

 


공부하며 어려웠던 내용

너무 많은 오퍼레이터 들이 존재하다보니 필요할때 적용시킬 오퍼레이터들을 조금 숙지해야 할 것 같다.