Airflow 운영과 DBT
주요 메모 사항
Airflow 운영과 대안
프로덕션 사용을 위한 Airflow 환경 설정
Things to Change
airflow.cfg is in /var/lib/airflow/airflow.cfg
ㄴ Any changes here will be reflected when you restart the webserver and scheduler
ㄴ [core] 섹션의 dags_folder가 DAG들이 있는 디렉토리가 되어야함 -> /var/lib/airflow/dags
ㄴ dag_dir_list_interval: dags_folder를 Airflow가 얼마나 자주 스캔하는지 명시 (초 단위) -> default : 5분
ㄴ .airflowignore 파일 -> gitignore와 비슷
Airflow Database upgrade (Airflow 설치때 설명)
ㄴ Sqlite -> Postgres or MySQL (이 DB는 주기적으로 백업되어야함)
ㄴ sql_alchemy_conn in Core section of airflow.cfg
SequentialExecutor 사용 (Airflow 설치때 설명)
ㄴ Executor in Core section of airflow.cfg
ㄴ Single Server: from SequentialExecutor to LocalExecutor or CeleryExecutor
ㄴ Cluster: from SequentialExecutor to CeleryExecutor or KubernetesExecutor
ㄴ DB를 Sqlite로 사용할 때만 의미가 있음, 우리는 LocalExecutor을 사용했었음, CeleryExecutor을 사용하는게 가장 이상적
Enable Authentication & use a strong password
ㄴ In Airflow 2.0, authentication is ON by default
ㄴ 되도록이면VPN (Virtual Private Network) 뒤에 위치, 접근 자체를 어렵게 하는게 목
Large disk volume for logs and local data
ㄴ Logs -> /dev/airflow/logs in (Core section of airflow.cfg) 밑의 2개의 키가 존재
ㄴ base_log_folder
ㄴ child_process_log_directory
Periodic Log data cleanup
ㄴ The above folders need to be cleaned up periodically (아니면 S3와 같은 클라우드 스토리지로 복사)
ㄴ You can write a shell Operator based DAG for this purpose
From Scale Up to Scale Out
ㄴ 최대한 1대로 Scale Up을 하다가 Scale Out로 변경이 필요할 때, 웬만하면 Cloud를 사용하
ㄴ Go for Cloud Airflow options (Cloud Composer or MWAA) or Docker/K8s
Backup Airflow metadata database
ㄴ Backup variables and connections (command lines or APIs)
ㄴ airflow variables export variables.json
ㄴ airflow connections export connections.json
Add health-check monitoring
ㄴ https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/check-health.html
ㄴ API를 먼저 활성화하고 Health Check Endpoint API를 모니터링 툴과 연동
ㄴ 어느 정도 규모가 된다면 DataDog, Grafana등을 사용하는 것이 일반적 (DevOps팀과 협업)
Airflow 로그 파일 삭제하기
Airflow에서 발생되는 로그파일의 크기는 작지 않다.
이를 주기적으로 삭제하거나 백업하는 것에 대해 알아보자
Airflow 로그 위치
두 군데에 별도의 로그가 기록됨. 이를 주기적으로 삭제하거나 백업 (s3) 필요
[logging]
base_log_folder = /var/lib/airflow/logs
[scheduler]
child_process_log_directory = /var/lib/airflow/logs/scheduler
Airflow 로그 위치 - docker compose
docker compose로 실행된 경우 logs 폴더가 host volume의 형태로 유지
Airflow 메타데이터 백업하기
Airflow 메타데이터의 주기적인 백업
이 데이터베이스가 외부에 있다면 (특히 AWS RDS라면) -> 거기에 바로 주기적인 백업 셋업
Airflow와 같은 서버에 메타 데이터 DB가 있다면 (예를 들어 PostgreSQL) -> 그러면 DAG등을 이용해 주기 백업 실행 (S3로 저장)
Airflow 대안
Airflow 말고 다른 ETL 프레임웍으로 무엇이 있는지 살펴보자
Airflow 이외의 다른 데이터 파이프라인 프레임웍들
Prefect (Open Source)
Dagster (Open Source)
Airbyte (Open Source)
SaaS 형태의 데이터 통합 툴들
ㄴ FiveTran
ㄴ Stitch Data
ㄴ Segment
Database Normalization
ELT의 미래는?
ㄴ> ETL을 하는 이유는 결국 ELT를 하기 위함이며 이 때 데이터 품질 검증이 중요해짐
데이터 품질의 중요성 증대
ㄴ 입출력 체크
ㄴ 더 다양한 품질 검사
ㄴ 리니지 체크
ㄴ 데이터 히스토리 파악
데이터 품질 유지 -> 비용/노력 감소와 생산성 증대의 지름길
Database Normalization
Data Maturity Model and Reality
- AI & ML: 인공지능(AI)과 머신러닝(ML)은 데이터를 분석하고, 패턴을 찾고, 예측 모델을 생성하는 데 사용되는 기술입니다.
- BI & Analytics: 비즈니스 인텔리전스(BI)와 분석은 조직이 데이터를 사용하여 더 나은 의사 결정을 내릴 수 있도록 하는 방법입니다.
- Data Integration: 데이터 통합은 데이터를 여러 소스에서 가져와서 단일, 일관된 데이터 세트로 결합하는 프로세스입니다.
- Data Wrangling: 데이터 조작은 데이터를 정리하고, 변환하고, 모델링하는 프로세스입니다.
- Data Collection: 데이터 수집은 데이터를 수집하는 프로세스입니다. 데이터는 다양한 소스에서 수집될 수 있습니다.
2번과 3번 사이에 큰 갭이 존재
Database Normalization
데이터베이스를 좀더 조직적이고 일관된 방법으로 디자인하려는 방법
ㄴ 데이터베이스 정합성을 쉽게 유지하고 레코드들을 수정/적재/삭제를 용이하게 하는 것
이 외에는 1NF ~ 등에 관련한 내용이라 생략, 학부때 도부이결다조가 생각이 남
Slowly Changing Dimensions
DW나 DL에서는 모든 테이블들의 히스토리를 유지하는 것이 중요함
ㄴ 보통 두 개의 timestamp 필드를 갖는 것이 좋음
1. created_at (생성시간으로 한번 만들어지면 고정됨)
2. updated_at (꼭 필요 마지막 수정 시간을 나타냄)
이 경우 컬럼의 성격에 따라 어떻게 유지할지 방법이 달라짐
ㄴ DBT를 이용해서 하나 두개를 구현할 예정
ㄴ SCD Type 0 SCD Type 1 SCD Type 2 SCD Type 3 SCD Type 4
그렇다면 히스토리를 유지하기 위해 어떻게 할 것?
일부 속성들은 시간을 두고 변하게 되는데 DW Table쪽에 어떻게 반영해야하나?
ㄴ 현재 데이터만 유지 Vs 처음부터 지금까지 히스토리도 유지
SCD Type 0
ㄴ 한번 쓰고 나면 바꿀 이유가 없는 경우들
ㄴ 한번 정해지면 갱신되지 않고 고정되는 필드들
ㄴ 예) 고객 테이블이라면 회원 등록일, 제품 첫 구매일
SCD Type 1
ㄴ 데이터가 새로 생기면 덮어쓰면 되는 컬럼들
ㄴ 처음 레코드 생성시에는 존재하지 않았지만 나중에 생기면서 채우는 경우
SCD Type 2
ㄴ 특정 entity에 대한 데이터가 새로운 레코드로 추가되어야 하는 경우
ㄴ 예) 고객 테이블에서 고객의 등급 변화
ㄴ tier라는 컬럼의 값이 “regular”에서 “vip”로 변화하는 경우, 변경시간도 같이 추가되어야함
SCD Type 3
ㄴ SCD Type 2의 대안으로 특정 entity 데이터가 새로운 컬럼으로 추가되는 경우
ㄴ 예) 고객 테이블에서 tier라는 컬럼의 값이 “regular”에서 “vip”로 변화하는 경우
ㄴ previous_tier라는 컬럼 생성, 변경시간도 별도 컬럼으로 존재해야함
SCD Type 4
ㄴ 특정 entity에 대한 데이터를 새로운 Dimension 테이블에 저장하는 경우
ㄴ SCD Type 2의 변종
ㄴ 예) 별도의 테이블로 저장하고 이 경우 아예 일반화할 수도 있음
dbt (Data Build Tool)
dbt란?
ㄴ ELT용 오픈소스: In-warehouse data transformation
ㄴ dbt Labs라는 회사가 상용화 ($4.2B valuation)
ㄴ Analytics Engineer라는 말을 만들어냄
다양한 데이터 웨어하우스를 지원 - dbt가 서포트해주는 데이터 시스템
클라우드 버전도 존재 - dbt Cloud
dbt 구성 컴포넌트
데이터 모델 (models)
ㄴ 테이블들을 몇개의 티어로 관리 -> 일종의 CTAS (SELECT 문들), Lineage 트래킹
ㄴ Table, View, CTE 등등
데이터 품질 검증 (tests) - 데이터의 품질, 수준에 따라서 Tier을 나눠서 부른다!
스냅샷 (snapshots)
dbt 사용 시나리오
dbt가 어떻게 사용될 수 있는지 가상 환경을 생각
다음과 같은 요구조건을 달성해야한다면? -> DBT 사용
ㄴ 데이터 변경 사항을 이해하기 쉽고 필요하다면 롤백 가능
ㄴ 데이터간 리니지 확인 가능
ㄴ 데이터 품질 테스트 및 에러 보고
ㄴ Fact 테이블의 증분 로드 (Incremental Update)
ㄴ Dimension 테이블 변경 추적 (히스토리 테이블)
ㄴ 용이한 문서 작성
보통 사용하는 테크 스택
Redshift/Spark/Snowflake/BigQuery
|
dbt
|
Airflow
무슨 ELT 작업 예정인지?
Redshift 사용
AB 테스트 분석을 쉽게 하기 위한 ELT 테이블을 만들어보자
입력 테이블: user_event, user_variant, user_metadata
Production DB에 저장되는 정보들을 Data Warehouse로 적재했다고 가정
raw_data.user_event
ㄴ 사용자/날짜/아이템별로 impression이 있는 경우 그 정보를 기록하고 impression으로부터 클릭, 구매, 구매시 금액을 기록.
ㄴ 실제 환경에서는 이런 aggregate 정보를 로그 파일등의 소스(하나 이상의 소스가 될 수도 있음) 로부터 만들어내는 프로세스가 필요함
raw_data.user_variant
ㄴ 사용자가 소속한 AB test variant를 기록한 파일 (control vs. test)
ㄴ 보통은 experiment와 variant 테이블이 별도로 존재함
ㄴ 그리고 언제 variant_id로 소속되었는지 타임스탬프 필드가 존재하는 것이 일반적
raw_data.user_metadata
ㄴ 사용자에 관한 메타 정보가 기록된 파일 (성별, 나이 등등)
ㄴ 이를 이용해 다양한 각도에서 AB 테스트 결과를 분석해볼 수 있음
입력 데이터 요약
Fact 테이블과 Dimension 테이블
Fact 테이블: 분석의 초점이 되는 양적 정보를 포함하는 중앙 테이블
ㄴ 일반적으로 매출 수익, 판매량, 이익과 같은 측정 항목 포함. 비즈니스 결정에 사용
ㄴ Fact 테이블은 일반적으로 외래 키를 통해 여러 Dimension 테이블과 연결됨
ㄴ 보통 Fact 테이블의 크기가 훨씬 더 큼
Dimension 테이블: Fact 테이블에 대한 상세 정보를 제공하는 테이블
ㄴ 고객, 제품과 같은 테이블로 Fact 테이블에 대한 상세 정보 제공
ㄴ Fact 테이블의 데이터에 맥락을 제공하여 다양한 방식으로 분석 가능하게 해줌
ㄴ Dimension 테이블은 primary key를 가지며, fact 테이블에서 참조 (foreign key)
ㄴ 보통 Dimension 테이블의 크기는 훨씬 더 작음
-> 예전에 강의에서 설명해주셨던게 기억이 남, 만약 얼마 안된 회사라면 Dimension 테이블의 크기가 더 큰경우도 있다!
최종 생성 데이터 (ELT 테이블)
생성 테이블: Variant별 사용자별 일별 요약 테이블
ㄴ variant_id, user_id, datestamp, age, gender,
ㄴ 총 impression, 총 click, 총 purchase, 총 revenue
SELECT로 표현하면 아래와 같음
dbt 설치와 환경 설정
dbt 사용절차
1. dbt 설치
ㄴ dbt Cloud vs. dbt Core / git을 보통 사용함
2. dbt 환경설정
3. Connector 설정
ㄴ Connector가 바로 바탕이 되는 데이터 시스템 (Redshift, Spark, …)
4. 데이터 모델링 (tier)
ㄴ Raw Data -> Staging -> Core
5. 테스트 코드 작성
6. (필요하다면) Snapshot 설정
dbt 설치 옵션
ㄴ Cloud 버전: dbt Cloud
ㄴ 로컬 개발 버전: dbt Core -> 해당 방법으로 진행
dbt 설치: 로컬 버전으로 진행
위의 명령은 dbt-core 모듈도 설치해줌, 환경에 맞는 dbt connector를 설치: Redshift, BigQuery, Snowflake
dbt 환경 설정
$ dbt init learn_dbt
dbt init 완료 후 모습, 기용님은 yml에 target-path: "target"가 존재했는데 실행하면 추가되는 건지 모르겠다
dbt Models: Input
dbt Model을 사용해 입력 데이터들을 transform
Model이란?
ELT 테이블을 만듬에 있어 기본이 되는 빌딩블록
ㄴ 테이블이나 뷰나 CTE의 형태로 존재
입력,중간,최종 테이블을 정의하는 곳
ㄴ 티어 (raw, staging, core, …) -> 위에서 언급했던 Tier
ㄴ raw => staging (src) => core
Model 구성 요소
Input
ㄴ 입력(raw)과 중간(staging, src) 데이터 정의
ㄴ raw는 CTE로 정의
ㄴ staging은 View로 정의
Output
ㄴ 최종(core) 데이터 정의
ㄴ core는 Table로 정의
이 모두는 models 폴더 밑에 sql 파일로 존재
ㄴ 기본적으로는 SELECT + Jinja 템플릿과 매크로
ㄴ 다른 테이블들을 사용 가능 (reference) -> 이를 통해 리니지 파악
데이터 빌딩 프로세스
3개의 sql을 작성해서 models에 저장
Model 빌딩: dbt run (Issue)
필자의 경우 cp949를 디코드할수 없다는 에러가 발생했는데, manifest.py에 utf-8를 추가해 해결했
Model 빌딩 확인
해당 스키마 밑에 테이블 생성 여부 확인
dbt run은 프로젝트 구성 다양한 SQL 실행
ㄴ 이 SQL들은 DAG로 구성됨
dbt run은 보통 다른 더 큰 명령의 일부로 실행
ㄴ dbt test
ㄴ dbt docs generate
해당 명령어로 실행해서 Redshift에 View로 만들어졌다
View란 무엇인가?
SELECT 결과를 기반으로 만들어진 가상 테이블
ㄴ 기존 테이블의 일부 혹은 여러 테이블들을 조인한 결과를 제공함
ㄴ CREATE VIEW 이름 AS SELECT …
View의 장점
ㄴ 데이터의 추상화: 사용자는 View를 통해 필요 데이터에 직접 접근. 원본 데이터를 알 필요가 없음
ㄴ 데이터 보안: View를 통해 사용자에게 필요한 데이터만 제공. 원본 데이터 접근 불필요
ㄴ 복잡한 쿼리의 간소화: SQL(View)를 사용하면 복잡한 쿼리를 단순화
View의 단점
ㄴ 매번 쿼리가 실행되므로 시간이 걸릴 수 있음
ㄴ 원본 데이터의 변경을 모르면 실행이 실패함
dbt Models: Output
최종 출력 데이터를 만드는 과정을 살펴보자
Materialization이란?
입력 데이터(테이블)들을 연결해서 새로운 데이터(테이블) 생성하는 것
ㄴ 보통 여기서 추가 transformation이나 데이터 클린업 수행
4가지의 내장 materialization이 제공됨
파일이나 프로젝트 레벨에서 가능
역시 dbt run을 기타 파라미터를 가지고 실행
4가지의 Materialization 종류
View -> 데이터를 자주 사용하지 않는 경우
Table -> 데이터를 반복해서 자주 사용하는 경우
Incremental (Table Appends) -> Fact 테이블, 과거 레코드를 수정할 필요가 없는 경우
Ephemeral (CTE) -> 한 SELECT에서 자주 사용되는 데이터를 모듈화하는데 사용
데이터 빌딩 프로세스
Raw Data에서 Staging를 만들었는데, Staging는 보통 table로 만들지 않는다!
우리는 View로 만들었음
models 밑에 core 테이블들을 위한 dim, fact 생성
dim 폴더와 fact 폴더 생성
ㄴ dim 밑에 각각 dim_user_variant.sql과 dim_user_metadata.sql 생성
ㄴ fact 밑에 fact_user_event.sql 생성
이 모두를 physical table로 생성
dim_user_variant.sql
Jinja 템플릿과 ref 태그를 사용해서 dbt 내 다른 테이블들을 액세스
dim_user_metadata.sql
설정에 따라 view/table/CTE 등으로 만들어져서 사용됨, materialized라는 키워드로 설정
fact_user_event.sql
Incremental Table로 빌드 (materialized = 'incremental')
다음으로 model의 materialized format 결정
최종 Core 테이블들은 view가 아닌 table로 빌드
dbt_project.yml을 편집
Model 빌딩: dbt run
빌딩된 테이블 확인!!
fact_user_event.sql을 수정해서 incremental하게 레코드를 업데이트 하기
다시 빌드
raw_data.user_event에 새 레코드 추가후 dbt run 수행 (redshift에 해당 스키마에 insert 권한이 없음 기용님의 데모만 봄)
적당한 Redshift 클라이언트 툴에서 아래 수행
ㄴ INSERT INTO raw_data.user_event VALUES (100, '2023-06-10', 100, 1, 0, 0);
다음으로 dbt run을 수행
ㄴ compiled SQL을 확인해서 정말 Incremental하게 업데이트되었는지 확인
최종적으로 Redshift 클라이언트 툴에서 다시 확인
ㄴ SELECT * FROM keeyong.fact_user_event WHERE datestamp = '2023-06-10';
src 테이블들을 CTE로 변환해보기
src 테이블들을 굳이 빌드할 필요가 있나? -> dbt_project.yml 편집
src 테이블들 (View) 삭제
dbt run 실행 -> 이제 SRC 테이블들은 CTE 형태로 임베드되어서 빌드됨
최종 analytics
데이터 빌딩 프로세스
dim_user.sql
dim_user_variant와 dim_user_metadata를 조인
analytics_variant_user_daily.sql
dim_user와 fact_user_event를 조인 - analytics 폴더를 models 밑에 생성
Model 빌딩 : dbt run
공부하며 어려웠던 내용
개념 자체는 어렵지않은 것 같은데, 처음 배우다보니 전체적인 면에서 어렵게 느껴졌
'프로그래머스 데브코스-데이터 엔지니어 > TIL(Today I Learned)' 카테고리의 다른 글
06/26 ~ 06/30, 56~60일차 3번째 프로젝트 (0) | 2023.06.27 |
---|---|
06/23 55일차 DBT와 데이터 카탈로그 (2) | 2023.06.23 |
06/21 53일차 Airflow 다양한 기능 사용해보기 (0) | 2023.06.21 |
06/20 52일차 ELT 작성과 구글시트_슬랙 연동 (후반부) (0) | 2023.06.20 |
06/19 51일차 ELT 작성과 구글시트_슬랙 연동 (전반부) (0) | 2023.06.19 |