기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

06/07 43일차 데이터 파이프라인과 Airflow (3)

usiohc 2023. 6. 7. 17:12

DAG


주요 메모 사항


Hello World 예제 MAX님 소스코드 링크

Operators - PythonOperator

 


from airflow.operators.python import PythonOperator
load_nps = PythonOperator(
 dag=dag,
 task_id='task_id',
 python_callable=python_func,
 params={
 'table': 'delighted_nps',
 'schema': 'raw_data'
 },
)

def python_func(**cxt):
 table = cxt["params"]["table"]
 schema = cxt["params"]["schema"]
 ex_date = cxt["execution_date"]
 # do what you need to do
 ...
 

 

 

2개의 테스크로 구성된 데이터 파이프라인, DAG

  • print_hello: PythonOperator로 구성되어 있으며 먼저 실행
  • print_goodbye: PythonOperator로 구성되어 있으며 두번째로 실행

dag = DAG(
 dag_id = "helloWorld",
 start_date = datetime(2021,8,26),
 catchup=False,
 tags=['example'],
 schedule = '0 2 * * *',
 default_args=default_args
)

 

 

Airflow Decorators : 프로그래밍이 단순해짐 MAX님 소스코드 링크

맨 밑에 print_hello() >> print_goodbye() 를 보면 알겠지만, 해당 함수 이름이 테스크 ID가 된다

  • Task Decorator를 사용하면 훨씬 더 프로그램이 직관적
  • 뒤에 실습에서 두 가지 방식을 혼용해서 사용할 것
f
rom airflow.decorators import task

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"


with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:
    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()

 

 

중요한 DAG 파라미터 (not task parmeters!)

 
with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:

 

max_active_runs: # of DAGs instance

max_active_tasks: # of tasks that can run in parallel

catchup: whether to backfill past runs

DAG parameters vs. Task parameters의 차이점 이해가 중요

  • 위의 파라미터들은 모두 DAG 파라미터로 DAG 객체를 만들 때 지정해주어야함

 

 

 


Colab Python 코드를 Airflow로 포팅하기 v1 소스코드

이전 포스팅에서 다뤘던 과제들임

  • 헤더가 레코드로 추가되는 문제 해결
  • Idempotent 하게 job을 만듬 (멱등성!)
    • 여러번 실행해도 동일한 결과가 나오게
    • 매번 새로 모든 데이터를 읽어오는 job라고 가정하고 구현

 

 

v1 소스코드 개선하기 -> v2 소스코드

- params를 통해 변수 넘기기

- execution_date 얻어내기

- “delete from” vs. “truncate”

  • DELETE FROM raw_data.name_gender; -- WHERE 사용 가능
  • TRUNCATE raw_data.name_gender;

 

 

v2 소스코드 개선하기 -> v3 소스코드

- Xcom 객채를 사용해서 세개의 task로 나누기

- Redshift의 스키마와 테이블 이름을 params로 넘기기

- v3 소스코드의 Line 72

 

Xcom이란?

- 태스크(Operator)들간에 데이터를 주고 받기 위한 방식

- 보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태가 됨

- 이 값들은 Airflow 메타 데이터 DB에 저장이 되기에 큰 데이터를 주고받는데는 사용불가

  • 보통 큰 데이터는 S3등에 로드하고 그 위치를 넘기는 것이 일반적

 

- 태스크 하나로 구성이 된 경우

 
data = extract(link)
lines = transform(data)
load(lines)

 

- 3개의 테스크로 나눈 경우

 

 

 

Connections and Variables

우리가 사용하던 get_redshift 함수에 사용했던 host라던지, 개인 id pw등이 노출되는 것을 Connections 으로 설정

S3 링크를 Variables를 사용해 암호화 하는 등

 

- Connections

  • This is used to store some connection related info such as hostname, port number, and access credential
  • Postgres connection or Redshift connection info can be stored here

- Variables

  • Used to store API keys or some configuration info
  • Use “access” or “secret” in the name if you want its value to be encrypted
  • We will practice this

 

Web UI로 설정 가능

 

 

Variable를 이용해 CSV parameter 넘기기

 

 

 

v3 소스코드 개선하기 -> v4 소스코드

- Redshift Connection 사용하기

- schema 변경 잊지 말기!

 

Redshift Connection 설정 (Data Warehouse)

Web UI에서 설정 가능

 

 

 

v4 소스코드 개선하기 -> v5 소스코드

from airflow.decorators import task

  • task decorator를 사용
  • 이 경우 xcom을 사용할 필요가 없음
  • 기본적으로 PythonOperator 대신에 airflow.decorators.task를 사용

 

실습 환경을 준비했다면 링크 를 clone 해서 실습


Yahoo Finance API DAG 작성

Yahoo Finance API를 호출해서 애플 주식을 읽어오는 Full Refresh 기반의 DAG

소스코드 git 링크

 

구현 DAG의 세부 사항 - Full Refresh로 구현

1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)

2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재

 

 

 

Extract/Transform: Yahoo Finance API 호출 # git sourcecode Line 20

Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱

-> 기본으로 지난 한달의 주식 가격을 리턴해줌

 

 

 

Load: Redshift의 테이블을 업데이트

- Full Refresh로 구현 - 매번 테이블을 새로 만드는 형태로 구성

- 트랜잭션 형태로 구성 (NameGender DAG와 동일)

 

 

 


구현 DAG의 세부 사항 - Incremental Update로 구현 git 링크

1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)

2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거 - 매일 하루치의 데이터씩 늘어남

 

Extract/Transform: Yahoo Finance API 호출 -> 동일

 

Load: Redshift의 테이블을 업데이트

- Incremental Update로 구현

  • 임시 테이블 생성하면서 현재 테이블의 레코드를 복사 (CREATE TEMP TABLE … AS SELECT)
  • 임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
  • 원본 테이블을 삭제하고 새로 생성
  • 원본 테이블에 임시 테이블의 내용을 복사 (이 때 SELECT DISTINCT *를 사용하여 중복 제거)

- 트랜잭션 형태로 구성 (NameGender DAG와 동일)

 

 


Q&A

PostgresHook의 autocommit 파라미터

- Default 값은 False로 주어짐

- False, 이 경우 BEGIN은 아무런 영향이 없음 (no-operation)

 

 

DAG에서 task를 어느 정도로 분리하는 것이 좋을까?

아마도 엔지니어로써의 경험이 필요할 

  • task를 많이 만들면 전체 DAG이 실행되는데 오래 걸리고 스케줄러에 부하가 감 
  • task를 너무 적게 만들면 모듈화가 안되고 실패시 재실행을 시간이 오래 걸림
  • 오래 걸리는 DAG이라는 실패시 재실행이 쉽게 다수의 task로 나누는 것이 좋음

 

 

 

Airflow의 Variable 관리 vs. 코드 관리

장점: 코드 푸시의 필요성이 없음

단점: 관리나 테스트가 안되어서 사고로 이어질 가능성이 있음

 

 

 

 

 

숙제

다음 포스팅에서 다룸

airflow.cfg

1. DAGs 폴더는 어디에 지정되는가?

2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?

3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?

4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? 

5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?

6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?

 

 

 

 

세계 나라 정보 API 사용 DAG 작성

Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할 것!

API 결과에서 아래 3개의 정보를 추출하여 Redshift에 각자 스키마 밑에 테이블 생성

  • country -> [“name”][“official”]
  • population -> [“population”]
  • area -> [“area”]

단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것!

숙제는 개인 github에 repo를 만든 후 제출할 것! 

 

 

 


공부하며 어려웠던 내용

실습 환경을 준비해야한다. -> 라즈베리파이로 불가능 할 것 같아서 개인 데스크탑 WSL2를 사용해 Ubuntu에서 실습