기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

06/22 54일차 Airflow 운영과 DBT

usiohc 2023. 6. 22. 17:37

Airflow 운영과 DBT


주요 메모 사항


Airflow 운영과 대안

프로덕션 사용을 위한 Airflow 환경 설정

Things to Change

airflow.cfg is in /var/lib/airflow/airflow.cfg

ㄴ Any changes here will be reflected when you restart the webserver and scheduler

ㄴ [core] 섹션의 dags_folder가 DAG들이 있는 디렉토리가 되어야함 -> /var/lib/airflow/dags

ㄴ dag_dir_list_interval: dags_folder를 Airflow가 얼마나 자주 스캔하는지 명시 (초 단위) -> default : 5분

ㄴ .airflowignore 파일 -> gitignore와 비슷 

 

Airflow Database upgrade (Airflow 설치때 설명)

ㄴ Sqlite -> Postgres or MySQL (이 DB는 주기적으로 백업되어야함)

ㄴ sql_alchemy_conn in Core section of airflow.cfg

 

SequentialExecutor 사용 (Airflow 설치때 설명)

ㄴ Executor in Core section of airflow.cfg

ㄴ Single Server: from SequentialExecutor to LocalExecutor or CeleryExecutor

    ㄴ Cluster: from SequentialExecutor to CeleryExecutor or KubernetesExecutor

ㄴ DB를 Sqlite로 사용할 때만 의미가 있음, 우리는 LocalExecutor을 사용했었음, CeleryExecutor을 사용하는게 가장 이상적

 

Enable Authentication & use a strong password

ㄴ In Airflow 2.0, authentication is ON by default

ㄴ 되도록이면VPN (Virtual Private Network) 뒤에 위치, 접근 자체를 어렵게 하는게 목

 

Large disk volume for logs and local data

ㄴ Logs -> /dev/airflow/logs in (Core section of airflow.cfg) 밑의 2개의 키가 존재

    ㄴ base_log_folder

    ㄴ child_process_log_directory

 

Periodic Log data cleanup

ㄴ The above folders need to be cleaned up periodically (아니면 S3와 같은 클라우드 스토리지로 복사)

ㄴ You can write a shell Operator based DAG for this purpose

 

From Scale Up to Scale Out

ㄴ 최대한 1대로 Scale Up을 하다가 Scale Out로 변경이 필요할 때, 웬만하면 Cloud를 사용하

ㄴ Go for Cloud Airflow options (Cloud Composer or MWAA) or Docker/K8s

 

Backup Airflow metadata database

ㄴ Backup variables and connections (command lines or APIs)

    ㄴ airflow variables export variables.json

    ㄴ airflow connections export connections.json

 

Add health-check monitoring

https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/check-health.html

    ㄴ API를 먼저 활성화하고 Health Check Endpoint API를 모니터링 툴과 연동

ㄴ 어느 정도 규모가 된다면 DataDog, Grafana등을 사용하는 것이 일반적 (DevOps팀과 협업)

 

 


Airflow 로그 파일 삭제하기

Airflow에서 발생되는 로그파일의 크기는 작지 않다.

이를 주기적으로 삭제하거나 백업하는 것에 대해 알아보자

Airflow 로그 위치

두 군데에 별도의 로그가 기록됨. 이를 주기적으로 삭제하거나 백업 (s3) 필요

   

[logging]

base_log_folder = /var/lib/airflow/logs

 

[scheduler]

child_process_log_directory = /var/lib/airflow/logs/scheduler

 

 

Airflow 로그 위치 - docker compose

docker compose로 실행된 경우 logs 폴더가 host volume의 형태로 유지

 


Airflow 메타데이터 백업하기

Airflow 메타데이터의 주기적인 백업

이 데이터베이스가 외부에 있다면 (특히 AWS RDS라면) -> 거기에 바로 주기적인 백업 셋업

 

Airflow와 같은 서버에 메타 데이터 DB가 있다면 (예를 들어 PostgreSQL) -> 그러면 DAG등을 이용해 주기 백업 실행 (S3로 저장)

 

Airflow 대안

Airflow 말고 다른 ETL 프레임웍으로 무엇이 있는지 살펴보자

Airflow 이외의 다른 데이터 파이프라인 프레임웍들

Prefect (Open Source)

Dagster (Open Source)

Airbyte (Open Source)

SaaS 형태의 데이터 통합 툴들

ㄴ FiveTran

ㄴ Stitch Data

ㄴ Segment

 

 


Database Normalization

 

 

ELT의 미래는?

ㄴ> ETL을 하는 이유는 결국 ELT를 하기 위함이며 이 때 데이터 품질 검증이 중요해짐

 

데이터 품질의 중요성 증대

ㄴ 입출력 체크

ㄴ 더 다양한 품질 검사

ㄴ 리니지 체크

ㄴ 데이터 히스토리 파악

 

데이터 품질 유지 -> 비용/노력 감소와 생산성 증대의 지름길

 

 

Database Normalization

Data Maturity Model and Reality

  1. AI & ML: 인공지능(AI)과 머신러닝(ML)은 데이터를 분석하고, 패턴을 찾고, 예측 모델을 생성하는 데 사용되는 기술입니다.
  2. BI & Analytics: 비즈니스 인텔리전스(BI)와 분석은 조직이 데이터를 사용하여 더 나은 의사 결정을 내릴 수 있도록 하는 방법입니다.
  3. Data Integration: 데이터 통합은 데이터를 여러 소스에서 가져와서 단일, 일관된 데이터 세트로 결합하는 프로세스입니다.
  4. Data Wrangling: 데이터 조작은 데이터를 정리하고, 변환하고, 모델링하는 프로세스입니다.
  5. Data Collection: 데이터 수집은 데이터를 수집하는 프로세스입니다. 데이터는 다양한 소스에서 수집될 수 있습니다.

2번과 3번 사이에 큰 갭이 존재

 

Database Normalization

데이터베이스를 좀더 조직적이고 일관된 방법으로 디자인하려는 방법

ㄴ 데이터베이스 정합성을 쉽게 유지하고 레코드들을 수정/적재/삭제를 용이하게 하는 것

 

이 외에는 1NF ~ 등에 관련한 내용이라 생략, 학부때 도부이결다조가 생각이 남

 

 

Slowly Changing Dimensions

DW나 DL에서는 모든 테이블들의 히스토리를 유지하는 것이 중요함

ㄴ 보통 두 개의 timestamp 필드를 갖는 것이 좋음

    1. created_at (생성시간으로 한번 만들어지면 고정됨)

    2. updated_at (꼭 필요 마지막 수정 시간을 나타냄)

 

이 경우 컬럼의 성격에 따라 어떻게 유지할지 방법이 달라짐

ㄴ DBT를 이용해서 하나 두개를 구현할 예정

ㄴ SCD Type 0  SCD Type 1 SCD Type 2 SCD Type 3 SCD Type 4

 

 

그렇다면 히스토리를 유지하기 위해 어떻게 할 것?

일부 속성들은 시간을 두고 변하게 되는데 DW Table쪽에 어떻게 반영해야하나?

ㄴ 현재 데이터만 유지 Vs 처음부터 지금까지 히스토리도 유지

 

 

SCD Type 0

ㄴ 한번 쓰고 나면 바꿀 이유가 없는 경우들

ㄴ 한번 정해지면 갱신되지 않고 고정되는 필드들

ㄴ 예) 고객 테이블이라면 회원 등록일, 제품 첫 구매일

 

SCD Type 1

ㄴ 데이터가 새로 생기면 덮어쓰면 되는 컬럼들

ㄴ 처음 레코드 생성시에는 존재하지 않았지만 나중에 생기면서 채우는 경우

 

 

SCD Type 2

ㄴ 특정 entity에 대한 데이터가 새로운 레코드로 추가되어야 하는 경우

ㄴ 예) 고객 테이블에서 고객의 등급 변화

    ㄴ tier라는 컬럼의 값이 “regular”에서 “vip”로 변화하는 경우, 변경시간도 같이 추가되어야함

 

SCD Type 3

ㄴ SCD Type 2의 대안으로 특정 entity 데이터가 새로운 컬럼으로 추가되는 경우

ㄴ 예) 고객 테이블에서 tier라는 컬럼의 값이 “regular”에서 “vip”로 변화하는 경우

    ㄴ previous_tier라는 컬럼 생성, 변경시간도 별도 컬럼으로 존재해야함

SCD Type 4

ㄴ 특정 entity에 대한 데이터를 새로운 Dimension 테이블에 저장하는 경우

ㄴ SCD Type 2의 변종

ㄴ 예) 별도의 테이블로 저장하고 이 경우 아예 일반화할 수도 있음

 

 

 


dbt (Data Build Tool)

dbt란?

ㄴ ELT용 오픈소스: In-warehouse data transformation

ㄴ dbt Labs라는 회사가 상용화 ($4.2B valuation)

ㄴ Analytics Engineer라는 말을 만들어냄

 

다양한 데이터 웨어하우스를 지원 - dbt가 서포트해주는 데이터 시스템

 

클라우드 버전도 존재 - dbt Cloud

 

 

dbt 구성 컴포넌트

데이터 모델 (models)

ㄴ 테이블들을 몇개의 티어로 관리 -> 일종의 CTAS (SELECT 문들), Lineage 트래킹

ㄴ Table, View, CTE 등등

데이터 품질 검증 (tests) - 데이터의 품질, 수준에 따라서 Tier을 나눠서 부른다!

스냅샷 (snapshots)

 

dbt 사용 시나리오

dbt가 어떻게 사용될 수 있는지 가상 환경을 생각

 

다음과 같은 요구조건을 달성해야한다면? -> DBT 사용

ㄴ 데이터 변경 사항을 이해하기 쉽고 필요하다면 롤백 가능

ㄴ 데이터간 리니지 확인 가능

ㄴ 데이터 품질 테스트 및 에러 보고

ㄴ Fact 테이블의 증분 로드 (Incremental Update)

ㄴ Dimension 테이블 변경 추적 (히스토리 테이블)

ㄴ 용이한 문서 작성

 

보통 사용하는 테크 스택

Redshift/Spark/Snowflake/BigQuery

    |

dbt

    |

Airflow

 

 

 

 

 

무슨 ELT 작업 예정인지?

Redshift 사용

AB 테스트 분석을 쉽게 하기 위한 ELT 테이블을 만들어보자

 

입력 테이블: user_event, user_variant, user_metadata

Production DB에 저장되는 정보들을 Data Warehouse로 적재했다고 가정

raw_data.user_event

ㄴ 사용자/날짜/아이템별로 impression이 있는 경우 그 정보를 기록하고 impression으로부터 클릭, 구매, 구매시 금액을 기록.

ㄴ 실제 환경에서는 이런 aggregate 정보를 로그 파일등의 소스(하나 이상의 소스가 될 수도 있음) 로부터 만들어내는 프로세스가 필요함

 

 

raw_data.user_variant

ㄴ 사용자가 소속한 AB test variant를 기록한 파일 (control vs. test)

ㄴ 보통은 experiment와 variant 테이블이 별도로 존재함

ㄴ 그리고 언제 variant_id로 소속되었는지 타임스탬프 필드가 존재하는 것이 일반적

 

 

raw_data.user_metadata

ㄴ 사용자에 관한 메타 정보가 기록된 파일 (성별, 나이 등등)

ㄴ 이를 이용해 다양한 각도에서 AB 테스트 결과를 분석해볼 수 있음

입력 데이터 요약

Fact 테이블과 Dimension 테이블

Fact 테이블: 분석의 초점이 되는 양적 정보를 포함하는 중앙 테이블

ㄴ 일반적으로 매출 수익, 판매량, 이익과 같은 측정 항목 포함. 비즈니스 결정에 사용

ㄴ Fact 테이블은 일반적으로 외래 키를 통해 여러 Dimension 테이블과 연결됨

ㄴ 보통 Fact 테이블의 크기가 훨씬 더 큼

 

Dimension 테이블: Fact 테이블에 대한 상세 정보를 제공하는 테이블

ㄴ 고객, 제품과 같은 테이블로 Fact 테이블에 대한 상세 정보 제공

ㄴ Fact 테이블의 데이터에 맥락을 제공하여 다양한 방식으로 분석 가능하게 해줌

ㄴ Dimension 테이블은 primary key를 가지며, fact 테이블에서 참조 (foreign key)

ㄴ 보통 Dimension 테이블의 크기는 훨씬 더 작음

 

-> 예전에 강의에서 설명해주셨던게 기억이 남, 만약 얼마 안된 회사라면 Dimension 테이블의 크기가 더 큰경우도 있다!

 

 

 

 

최종 생성 데이터 (ELT 테이블)

생성 테이블: Variant별 사용자별 일별 요약 테이블

ㄴ variant_id, user_id, datestamp, age, gender,

ㄴ 총 impression, 총 click, 총 purchase, 총 revenue

 

SELECT로 표현하면 아래와 같음

 


dbt 설치와 환경 설정

dbt 사용절차

1. dbt 설치

ㄴ dbt Cloud vs. dbt Core / git을 보통 사용함

2. dbt 환경설정

3. Connector 설정

ㄴ Connector가 바로 바탕이 되는 데이터 시스템 (Redshift, Spark, …)

4. 데이터 모델링 (tier)

ㄴ Raw Data -> Staging -> Core

5. 테스트 코드 작성

6. (필요하다면) Snapshot 설정

 

 

 

 

dbt 설치 옵션

ㄴ Cloud 버전: dbt Cloud

ㄴ 로컬 개발 버전: dbt Core -> 해당 방법으로 진행

 

 

 

dbt 설치: 로컬 버전으로 진행

위의 명령은 dbt-core 모듈도 설치해줌, 환경에 맞는 dbt connector를 설치: Redshift, BigQuery, Snowflake

 

 

 

 

dbt 환경 설정

$ dbt init learn_dbt

 

 

dbt init 완료 후 모습, 기용님은 yml에 target-path: "target"가 존재했는데 실행하면 추가되는 건지 모르겠다

 

 

dbt Models: Input

dbt Model을 사용해 입력 데이터들을 transform

 

 

Model이란?

ELT 테이블을 만듬에 있어 기본이 되는 빌딩블록

ㄴ 테이블이나 뷰나 CTE의 형태로 존재

입력,중간,최종 테이블을 정의하는 곳

ㄴ 티어 (raw, staging, core, …) -> 위에서 언급했던 Tier

ㄴ raw => staging (src) => core

 

Model 구성 요소

Input

ㄴ 입력(raw)과 중간(staging, src) 데이터 정의

ㄴ raw는 CTE로 정의

ㄴ staging은 View로 정의

 

Output

ㄴ 최종(core) 데이터 정의

ㄴ core는 Table로 정의

 

이 모두는 models 폴더 밑에 sql 파일로 존재

ㄴ 기본적으로는 SELECT + Jinja 템플릿과 매크로

ㄴ 다른 테이블들을 사용 가능 (reference) -> 이를 통해 리니지 파악

 

 

데이터 빌딩 프로세스

 

 

3개의 sql을 작성해서 models에 저장

 

 

Model 빌딩: dbt run (Issue)

필자의 경우 cp949를 디코드할수 없다는 에러가 발생했는데, manifest.py에 utf-8를 추가해 해결했

 

 

 

Model 빌딩 확인

해당 스키마 밑에 테이블 생성 여부 확인

 

dbt run은 프로젝트 구성 다양한 SQL 실행

ㄴ 이 SQL들은 DAG로 구성됨

 

dbt run은 보통 다른 더 큰 명령의 일부로 실행

ㄴ dbt test

ㄴ dbt docs generate

해당 명령어로 실행해서 Redshift에 View로 만들어졌다

 

 

 

View란 무엇인가?

SELECT 결과를 기반으로 만들어진 가상 테이블

ㄴ 기존 테이블의 일부 혹은 여러 테이블들을 조인한 결과를 제공함

ㄴ CREATE VIEW 이름 AS SELECT …

 

View의 장점

ㄴ 데이터의 추상화: 사용자는 View를 통해 필요 데이터에 직접 접근. 원본 데이터를 알 필요가 없음

ㄴ 데이터 보안: View를 통해 사용자에게 필요한 데이터만 제공. 원본 데이터 접근 불필요

ㄴ 복잡한 쿼리의 간소화: SQL(View)를 사용하면 복잡한 쿼리를 단순화

 

View의 단점

ㄴ 매번 쿼리가 실행되므로 시간이 걸릴 수 있음

ㄴ 원본 데이터의 변경을 모르면 실행이 실패함

 

 

 


dbt Models: Output

최종 출력 데이터를 만드는 과정을 살펴보자

 

Materialization이란?

입력 데이터(테이블)들을 연결해서 새로운 데이터(테이블) 생성하는 것

ㄴ 보통 여기서 추가 transformation이나 데이터 클린업 수행

 

4가지의 내장 materialization이 제공됨

 

파일이나 프로젝트 레벨에서 가능

 

역시 dbt run을 기타 파라미터를 가지고 실행

 

4가지의 Materialization 종류

View -> 데이터를 자주 사용하지 않는 경우

Table -> 데이터를 반복해서 자주 사용하는 경우

Incremental (Table Appends) -> Fact 테이블, 과거 레코드를 수정할 필요가 없는 경우

Ephemeral (CTE) -> 한 SELECT에서 자주 사용되는 데이터를 모듈화하는데 사용

 

 

데이터 빌딩 프로세스

Raw Data에서 Staging를 만들었는데, Staging는 보통 table로 만들지 않는다!

우리는 View로 만들었음

 

 

 

models 밑에 core 테이블들을 위한 dim, fact 생성

dim 폴더와 fact 폴더 생성

ㄴ dim 밑에 각각 dim_user_variant.sql과 dim_user_metadata.sql 생성

ㄴ fact 밑에 fact_user_event.sql 생성

이 모두를 physical table로 생성

 

dim_user_variant.sql

Jinja 템플릿과 ref 태그를 사용해서 dbt 내 다른 테이블들을 액세스

WITH src_user_variant AS (
 SELECT * FROM {{ ref('src_user_variant') }}
)
SELECT      ''' 필요한 경우 여기에서 transformation을 수행 '''
 user_id,
 variant_id
FROM
 src_user_variant

 

 

dim_user_metadata.sql

설정에 따라 view/table/CTE 등으로 만들어져서 사용됨, materialized라는 키워드로 설정

WITH src_user_metadata AS (
 SELECT * FROM {{ ref('src_user_metadata') }}
)
SELECT
 user_id,
 age,
 gender,
 updated_at
FROM
 src_user_metadata

 

 

fact_user_event.sql

Incremental Table로 빌드 (materialized = 'incremental') 

{{
 config(
    materialized = 'incremental',
    '''
    incremental_strategy도 사용가능, default는 append -> 항상 새로운 레코드를 만듬
    - append
    - merge
    - insert_overwrite
    이 경우 unique_key와 merge_update_columns
    필드를 사용하기도 함
    '''

    on_schema_change='fail'
    '''
    스키마가 바뀐 경우 대응 방법 지정
    - append_new_columns
    - ignore
    - sync_all_columns
    - fail
    '''
 )
}}
WITH src_user_event AS (
 SELECT * FROM {{ ref("src_user_event") }}
)
SELECT
 user_id,
 datestamp,
 item_id,
 clicked,
 purchased,
 paidamount
FROM
 src_user_event

 

 

다음으로 model의 materialized format 결정

최종 Core 테이블들은 view가 아닌 table로 빌드

dbt_project.yml을 편집

 

Model 빌딩: dbt run 

 

빌딩된 테이블 확인!!


fact_user_event.sql을 수정해서 incremental하게 레코드를 업데이트 하기

{{
    config(
        materialized = 'incremental',
        on_schema_change='fail'
    )
}}
WITH src_user_event AS (
 SELECT * FROM {{ ref("src_user_event") }}
)
SELECT
 user_id,
 datestamp,
 item_id,
 clicked,
 purchased,
 paidamount
FROM
 src_user_event
 
''' 아래 추 '''
WHERE datestamp is not NULL
{% if is_incremental() %}
 AND datestamp > (SELECT max(datestamp) from {{ this }})
{% endif %}

 

다시 빌드

 

 

raw_data.user_event에 새 레코드 추가후 dbt run 수행 (redshift에 해당 스키마에 insert 권한이 없음 기용님의 데모만 봄)

적당한 Redshift 클라이언트 툴에서 아래 수행

ㄴ INSERT INTO raw_data.user_event VALUES (100, '2023-06-10', 100, 1, 0, 0);

 

다음으로 dbt run을 수행

ㄴ compiled SQL을 확인해서 정말 Incremental하게 업데이트되었는지 확인

 

최종적으로 Redshift 클라이언트 툴에서 다시 확인

ㄴ SELECT * FROM keeyong.fact_user_event WHERE datestamp = '2023-06-10';

 

 

 

 

 

src 테이블들을 CTE로 변환해보기

src 테이블들을 굳이 빌드할 필요가 있나? -> dbt_project.yml 편집

models:
  learn_dbt:
    +materialized: view

    dim:
      +materialized: table
    src:
      +materialized: ephemeral

 

src 테이블들 (View) 삭제

drop view asltn99.src_user_variant;
drop view asltn99.src_user_metadata;
drop view asltn99.src_user_event;

 

dbt run 실행 -> 이제 SRC 테이블들은 CTE 형태로 임베드되어서 빌드됨

 


최종 analytics

데이터 빌딩 프로세스

 

 

dim_user.sql

dim_user_variant와 dim_user_metadata를 조인

WITH um AS (
 SELECT * FROM {{ ref("dim_user_metadata") }}
), uv AS (
 SELECT * FROM {{ ref("dim_user_variant") }}
)
SELECT
 uv.user_id,
 uv.variant_id,
 um.age,
 um.gender
FROM uv
LEFT JOIN um ON uv.user_id = um.user_id

 

 

analytics_variant_user_daily.sql

dim_user와 fact_user_event를 조인 - analytics 폴더를 models 밑에 생성

WITH u AS (
 SELECT * FROM {{ ref("dim_user") }}
), ue AS (
 SELECT * FROM {{ ref("fact_user_event") }}
)
SELECT
 variant_id,
 ue.user_id,
 datestamp,
 age,
 gender,
 COUNT(DISTINCT item_id) num_of_items, -- 총 impression
 COUNT(DISTINCT CASE WHEN clicked THEN item_id END) num_of_clicks, -- 총 click
 SUM(purchased) num_of_purchases, -- 총 purchase
 SUM(paidamount) revenue -- 총 revenue
FROM ue LEFT JOIN u ON ue.user_id = u.user_id
GROUP by 1, 2, 3, 4, 5

 

Model 빌딩 : dbt run

 

 


공부하며 어려웠던 내용

개념 자체는 어렵지않은 것 같은데, 처음 배우다보니 전체적인 면에서 어렵게 느껴졌