[Airflow] Airflow 기타 기능 _TIL

Dag를 실행하는 방법

- 주기적 실행: schedule로 지정

- 다른 Dag에 의해 트리거

  • Explicit Trigger: Dag A가 Dag B를 트리거할 수 있도록 명시 (TriggerDagOperator)
  • Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)

- 알아두면 좋은 상황에 따라 다른 태스크 실행 방식들

  • 조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
  • 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
  • 앞단 태스크들의 실행상황 (어던 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음)

 

Dag에 의한 트리거

- TriggerDagOperator

  • Dag A의 태스크를 TriggerDagRunOperator로 구현
  • TriggerDagRunOperator 예시
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_B = TriggerDagRunOperator(
	task_id = "trigger_B"
	trigger_dag_id = "트리거하려는 DAG이름"
)

Jinja Template

- Jinja Template이란?

 

  • Django 템플릿 엔진에서 영감을 받아 개발
  • Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML생성
  • Flask에서 사용됨

- 사용 예시

  • 변수는 이중 중괄호 '{{ }}'로 감싸서 사용 (ex. <h1> 안녕하세요, {{name}}님! <h1> )
  • 제어문은 퍼센트 기호{% %} 로 표시 (ex. {% for item in tiems %} <li> {{item}} </li> {% endfor %} )
  • Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의가능 (이를 통해 재사용가능하고 사용자 정의 가능한 워크플로우 생성)
  • 가능한 시스템 변수는 url 참조 (url: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html)
  • Jinja Template 예시 코드

Sensor

- Sensor는 특정 조건이 충족될 때까지 대기하는 Operator

- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용

- Airflow는 몇 가지 내장 Sensor를 제공

  • FileSensor: 지정된 위치에 파일이 생길 때까지 대기
  • HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
  • SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
  • TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지
  • ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기 (worker하나를 점유한 상태)

- 기본적으로 주기적으로 poke를 하는 것

  • worker를 하나 붙잡고 poke간에 sleep을 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재 (mode)
    • mode의 값은 reschedule혹은 poke가 됨

- ExternalTaskSensor

  •  DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
    • 먼저 동일한 schedule_interval을 사용
    • 이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭 안됨
    • 만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 ExternalTaskSensor는 사용불가
    • 위 처럼 사용이 까다롭기 때문에 잘 사용안함
from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
    task_id='waiting_for_end_of_dag_a'
    external_dag_id='DAG이름'
    external_task_id='end'
    timeout=5 * 60
    mode='reschedule'
)

BranchPythonOperator

- 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터

  • 미리 정해준 Operator들 중에 선택하는 형태로 돌아감

- TriggerDagOperator앞에 이 오퍼레이터를 사용하는 경우도 있음 

 

LatestOnlyOperator

- Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함

- 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 해당 Operator에서 중단됨

 

'Programmers TIL' 카테고리의 다른 글

[Spark] Spark 프로그램 구조 _TIL  (0) 2023.07.04
[Spark] Spark 데이터 시스템 구조 _TIL  (1) 2023.07.04
[K8s] Kubernetes(K8s)_TIL  (1) 2023.06.16
[Docker] Docker Compose_TIL  (1) 2023.06.15
[Docker] Docker Volume_TIL  (0) 2023.06.14