기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

06/19 51일차 ELT 작성과 구글시트_슬랙 연동 (전반부)

usiohc 2023. 6. 19. 21:45

Airflow 고급 기능, dbt, Data Catalog


주요 메모 사항


ELT 작성과 구글시트/슬랙 연동

 

 

Airflow Docker 환경부터 다시 설정

이번 강의를 진행하기 전에 먼저 Docker로 환경설정부터 다시 하자

 

이미 learn-airflow Github repo를 다운로드받은 상황이라면 - git pull

 

아니라면 아래를 수행

ㄴ $ git clone https://github.com/learndataeng/learn-airflow.git

ㄴ 2.5.1 이미지 관련 yml 파일 다운로드 

ㄴ $ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

 

 

 


docker-compose.yaml 수정

_PIP_ADDITIONAL_REQUIREMENTS 수정

ㄴ DAG에서 사용할 라이브러리들 설

data 폴더를 호스트 폴더에서 만들고 볼륨으로 공유: 임시 데이터를 저장할 폴더

ㄴ 이를 docker volume으로 지정해서 나중에 디버깅에 사용

ㄴ AIRFLOW_VAR_DATA_DIR: /opt/airflow/data : 컨테이너 내부의 data 위치, airlfow variable 설정하는 것 -> 다만, 웹UI에서는 안보인다는게 단점!

 

 

다음 명령을 수행. Detached 모드로 실행하려면 -d 옵션 지정 (-f 옵션도 존재)

ㄴ $ docker compose -f docker-compose.test.yaml up -d

 

포트번호가 충돌이 나는것 같다 -> pid를 kill하거나 docker ps images

 

먼저 docker 컨테이너 모두 종료

 

프로세스 확인, 아직 살아있다-> 모두 다 kill 

 

다시 up을 진행 -> 성공!

 

 

 

http://localhost:8080으로 웹 UI 로그인 id:pw -> airflow:airflow 사용

앞서 설정한 DATA_DIR이란 변수는 Admin => Variables에 안 보임.

(위에서 설정했었던 AIRFLOW_VAR_DATA_DIR)

ㄴ DAG과 Airflow 환경 정보들은 Postgres의 Named Volume으로 유지되고 있음

ㄴ 환경변수로 설정한 것들은 Web UI에서는 안 보이지만 프로그램에서는 사용가능

$ docker exec -it learn-airflow-airflow-scheduler-1 airflow variables get DATA_DIR

/opt/airflow/data

 

Variables/Connections 설정을 어떻게 관리하는 것이 좋을까?

이를 docker-compose.yaml에서 환경변수로 설정. 뒤에서 설명

 

 


고민 포인트: Airflow 실행환경 관리방안

기타 환경설정값들 (Variables, Connections 등등)을 어떻게 관리/배포할까?

ㄴ 보통 docker-compose.yml 파일에서 아래 부분에 정의

 

환경변수가 아니라 별도 credentials 전용 Secrets 백엔드라는 것을 사용하기도 함 - 소카 기술블로그 airflow관련 포스팅에서 본적이 있음!

 

x-airflow-common: &airflow-common … environment: &airflow-common-env AIRFLOW_VAR_DATA_DIR: /opt/airflow/data AIRFLOW_CONN_TEST_ID: test_connection

 

 

 

어디까지 Airflow 이미지로 관리하고 무엇을 docker-compose.yml에서 관리할지 생각

ㄴ 이는 회사마다 조금씩 다름

ㄴ Airflow 자체 이미지를 만들고 거기에 넣을지? 이 경우 환경변수를 자체 이미지에 넣고 이를 docker-compose.yaml 파일에서 사용

 

AIRFLOW_IMAGE_NAME 환경변수가 정의되어 있다면 그걸 사용하고 아니면 기본값으로 apache/airflow:2.5.1

 

아니면 docker-compose.yaml에서 환경변수를 직접 설정

 

 

 

DAG 코드도 마찬가지

ㄴ Airflow image로 DAG 코드를 복사하여 만드는 것이 좀더 깔끔

ㄴ 아니면 docker-compose에서 host volume 형태로 설정

    ㄴ 이는 개발/테스트용으로 좀더 적합

 

 

 

팁: .airflowignore

Airflow의 DAG 스캔 패턴은?

ㄴ dags_folder가 가리키는 폴더를 서브폴더들까지 다 스캔해서 DAG 모듈이 포함된 모든 파이썬 스크립트를 실행해서 새로운 DAG를 찾게 되며 이는 가끔 사고로 이어짐

 

Airflow가 의도적으로 무시해야 하는 DAG_FOLDER의 디렉터리 또는 파일을 지정

 

.airflowignore의 각 줄은 정규식 패턴으로 지정하며 매칭되는 파일들은 무시됨

ㄴ 예시) 5분마다 DAG를 찾는 작업에서 제외하고자 하는 DAG들을 지정!

 
 .airflowignore

 project_a
 tenant_[\d]
 

위의 경우 아래 파일들이 무시됨

ㄴ project_a_dag_1.py, TESTING_project_a.py, tenant_1.py, project_a/dag_1.py

 


Summary table: 간단한 DAG 구현을 살펴보기

Build_Summary.py: MAU 요약 테이블을 만들어보자

이 부분을 dbt로 구현하는 회사들도 많음 (Analytics Engineer) 별도 강의에서 다룰 예정

https://www.getdbt.com/

 

 

Summary table: 이번에 사용자별 channel 정보를 요약해보자

앞서와 비슷하게 PythonOperator를 만들고 아래처럼 params 파라미터를 설정

execsql = PythonOperator(
    task_id = 'mau_summary',
    python_callable = execSQL,
    params = {
        'schema' : 'usiohc',
        'table': 'mau_summary',
        'sql' : """SELECT
                TO_CHAR(A.ts, 'YYYY-MM') AS month,
                COUNT(DISTINCT B.userid) AS mau
                FROM raw_data.session_timestamp A
                JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
                GROUP BY 1
                ;"""
    },
    dag = dag
)

CTAS 부분을 아예 별도의 환경설정 파일로 떼어내면 어떨까?

환경 설정 중심의 접근 방식

config 폴더를 생성, 그 안에 써머리 테이블별로 하나의 환경설정 파일 생성

ㄴ 파이썬 dictionary 형태로 유지할 것이라 .py 확장자를 가져야함

 

이렇게 하면 비개발자들이 사용할 때 어려움을 덜 느끼게 됨

그러면서 더 다양한 테스트를 추가

 

 


NPS 써머리 테이블을 만들어 보자

NPS란? Net Promoter Score

ㄴ 10점 만점으로 '주변에 추천하겠는가?'라는 질문을 기반으로 고객 만족도를 계산

ㄴ 10, 9점 추천하겠다는 고객(promoter)의 비율에서 0-6점의 불평고객(detractor)의 비율을 뺀 것이 NPS

     ㄴ 7, 8점은 아예 계산에 안 들어감

 

 

각자스키마.nps 테이블 혹은 raw_data.nps 테이블 기준으로 일별 nps 써머리 생성

 

일별 NPS 계산 SQL

SELECT LEFT(created_at, 10) AS date,
       ROUND(
       SUM(
       CASE
       WHEN score >= 9 THEN 1
       WHEN score <= 6 THEN -1
       END
       )::float*100/COUNT(1), 2
       ) nps
FROM asltn99.nps
GROUP BY 1
ORDER BY 1;

 

 

NPS Summary를 주기적으로 요약 테이블로 만들기

CTAS 부분을 아예 별도의 파일로 떼어내면 어떨까?

ㄴ 환경 설정 중심의 접근 방식

https://github.com/learndataeng/learn-airflow/blob/main/dags/config/nps_summary.py

 

 

새로운 Custom Operator와 helper 함수 구현

RedshiftSummaryOperator

build_summary_table

 

다른 방법은 dbt 사용하기 - Analytics Engineering (ELT)

 

Build_summary_v2.py

 

 

 


Slack 연동하기

 

DAG가 실패하면 Slack으로 에러를 보내보자

 

DAG 실행 중에 에러가 발생하면 그걸 지정된 슬랙 workspace의 채널로 보내기

이를 위해서 해당 슬랙 workspace에 App 설정이 필요

다음으로 연동을 위한 함수를 하나 만들고 (plugins/slack.py)

이를 태스크에 적용되는 default_args의 on_failure_callback에 지정

 

DAG_ID = "Build_Summary_v2"
dag = DAG(
    DAG_ID,
    schedule_interval="25 13 * * *",
    max_active_runs=1,
    concurrency=1,
    catchup=False,
    start_date=datetime(2021, 9, 17),
    default_args= {
        'on_failure_callback': slack.on_failure_callback,
        ''
        ''
    }
)

 

 

먼저 어느 Workspace의 어느 Channel로 보낼 것인지 결정

테스트할 workspace를 하나 만들고, Channel을 하나 개설

그리고 App을 하나 만들어서 해당 채널에 메세지를 보낼 수 있게 할 것

여기에서 진행 https://api.slack.com/messaging/webhooks

 

자신의 Webhook URL로 메세지를 보내보자 (윈도우는 bash 를 사용할 수 있는 터미널에서 진행해야 함)

$ curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' https://hooks.slack.com/services/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

 

https://hooks.slack.com/services/ 뒤의 값을 Airflow Variable에 “slack_url”로 저장할 것!

 

slack에 에러 메세지를 보내는 별도 모듈로 개발

 

이를 DAG 인스턴스를 만들 때 에러 콜백으로 지정, 예제: NameGenderCSVtoRedshift_v4.py

 

 

 

 

 


공부하며 어려웠던 내용