기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

06/09 45일차 데이터 파이프라인과 Airflow (5)

usiohc 2023. 6. 9. 17:51

OLTP, Backfill, ELT


주요 메모 사항


전날 숙제 리뷰

2번째 프로젝트를 진행하면서 Cloud Functions와 Cloud Scheduler를 사용했던 것이 Airflow를 이해하는데 크게 도움이 된 것 같다. -> MAX님이 주신 Airflow 관련한 숙제를 푸는데 어려움이 없었음

 


MySQL(OLTP) 테이블 -> Redshift(OLAP) 테이블로 복사하기

- S3 connection 세팅들이 필요함, 보통 권한 등 -> IAM 다룰것

- airflow쪽에서 mysql쪽에 대한 엑세스도 필요

 

 

그럼 AWS 관련 권한 설정을 보자

- Airflow DAG에서 S3 접근 을 위한 권한 -> 이는 쓰기 권한

    a. IAM User를 만들고 S3버킷에 대한 읽기/쓰기 권한을 설정, access key와 secret key 사용

- Redshift가 S3 접근, S3에서 Redshift로 적재 하기 위한 권한 -> 이는 읽기 권한

    a. Redshift에 S3를 접근할 수 있는 역할(Role)을 만들고 이를 RedShift에 지정해야 함

 

 

AWS S3 Connections

- Access Key ID 와 Secret Access Key를 사용, 해당 AWS 작업은 우리가 실습할 수 없음

    a. 사실 루트 사용자의 키를 사용하면 편하지만, 상당한 사고 위험과 문제점을 동반 (해킹 -> AWS 자원 문제)

- 우리가 해볼 Best Practice

    a. IAM을 사용해 별도의 사용자 생성

    b. 그 사용자에게 해당 S3 Bucket을 읽고 쓸 수 있는 권한을 제공

    c. 그 사용자의 Access Key ID와 Secret Access Key를 사용

        - 해당 Key도 주기적으로 변경해서 보안을 한층 더 높일 수 있음 -> 외부의 해킹 공격시 피해 최소화

 

 

1) 유저 생성, 이름은 airflow-s3-access로 지정

 

2) 정책 추가, 2가지 방법이 존재 -> 1번으로 진행할 것

1. 정책을 S3 bucket을 Read만 해도 된다면 Readonly 정책을 선택하는 것과 같이 세부적인 권한을 조정

2. S3에 대한 모든 권한 지정 -> S3FullAccess

 

3) Custom Policy의 내용을 설정

- MAX님은 따로 몰라도 된다고 함 -> AWS의 정책 얘기라 그러신 듯, 하다보면 알게된다고 하셨음

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListAllMyBuckets"
                ],
            "Resource": "arn:aws:s3:::*"
        },
        {
        "Effect": "Allow",
        "Action": "s3:*",
        "Resource": [
            "arn:aws:s3:::grepp-data-engineering",
            "arn:aws:s3:::grepp-data-engineering/*"
            ]
        }
    ]
}

 

4) IAM -> Users -> airflow-s3-access

- 밑에 첫번째 이미지의 policies를 보면 de-s3-bucket-access는 위에서 설정한 Custom Policy

- 두번째 이미지 -> Security Credentials 에서 기존에 진행하기로한 access key를 만들 수 있음

 

5) S3 Connection 설정

- 여기에서 Connection Type을 Amazon Webservices로 선택하라고 하셨는데, 이유는 모르겠음...

 

 

MySQL 연결, 제공된 정보로 바로 Connection 작성

 

MySQL Connections 설정시 유의사항 (Docker 환경)

- 이는 Docker환경을 사용했을때 생길 수 있는 유의점으로 MAX님이 미리 얘기해주시는 것

- 필자는 Ubuntu 위에 바로 Airflow를 설치해서 실습하기 때문에 해당하지 않음

- “ModuleNotFoundError: No module named 'MySQLdb'” 에러에 해당하는 해결방

- 아래 명령을 Airflow Scheduler Docker Container에 root 유저로 로그인해서 실행

docker exec --user root -it <Airflow Scheduler의 Container Id> sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

 

 


2 New Connections(MySQL & AWS)

- aws_conn_id 는 자신에게 맞는 connType을 설정했으므로 S3 이거나 generic 일 수 있음

- redshift_dev_db도 마찬가지로 postgres나 amazon redshfit

 

 

MySQL의 테이블 (OLTP, Production Database)

- 이미 MySQL에 해당 테이블이 생성되어 있음 (레코드도 존재), 이를 RedShift로 복사하는 것이 진행할 실습 

CREATE TABLE prod.nps (
 id INT NOT NULL AUTO_INCREMENT primary key,
 created_at timestamp,
 score smallint
);

 

 

그러면 우리는 RedShift(OLAP, Data Warehouse)에 해당 테이블 생성

- 해당 테이블을 Redshift에 미리 만들어서 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사할 것

CREATE TABLE (본인의스키마).nps (
 id INT NOT NULL primary key,
 created_at timestamp,
 score smallint
);

 

Colab에서 Redshift 테이블 생성 완료

 

 

 


MySQL_to_Redshift DAG의 Task 구성

SqlToS3Operator

- MySQL SQL 결과 -> S3

- (s3://grepp-data-engineering/{본인ID}-nps)

- s3://s3_bucket/s3_key

 

 

S3ToRedshiftOperator

- S3 -> Redshift 테이블

- (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)

- COPY command is used

 

 

 

MySQL_to_Redshift.py 

2개의 Operator를 사용해서 구현

- SqlToS3Operator

- S3ToRedshiftOperator

 

MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사

- S3를 경유해서 COPY 명령으로 복사

 


MySQL 테이블의 Incremental Update 방식

MySQL/PostgreSQL 테이블이라면 다음을 만족해야함

- created (timestamp): Optional

- modified (timestamp)

- deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정

 

Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면?

1) ROW_NUMBER로 직접 구현하는 경우

- 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사

- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사

     - 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행

SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)

-> temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사

 

 

2) S3ToRedshiftOperator로 구현하는 경우

query 파라미터로 아래를 지정

SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)

method 파라미터로 “UPSERT”를 지정

upsert_keys 파라미터로 Primary key를 지정 -> "id" columns

- 앞서 nps 테이블이라면 “id” 필드를 사용

 

 

 

MySQL_to_Redshift_v2.py

MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사

이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함 -> 위에서 설명

 

2개의 Operator를 사용해서 구현

- SqlToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜

- S3ToRedshiftOperator

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv', "IGNOREHEADER 1"],
    redshift_conn_id = "redshift_dev_db",
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

 

SQL문의 {{ ~ }} 은 Airflow에서 자동으로 system variable 로 연결해주는 문법이라고 한다

sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"

 

 


Backfill 실행해보기

Daily Incremental DAG에서 2018년 7월달 데이터를 다 다시 읽어와야 한다면

Airflow에서 추천하는 방식으로 Icremental Update를 구현했다면? -> Backfill이 쉬워짐

하지만 이를 어떻게 실행?

- 하루씩 31번 실행하나?

     - airflow dags test MySQL_to_Redshift_v2 2023-07-01

     - ...

     - airflow dags test MySQL_to_Redshift_v2 2023-07-31

 

- 한번에 여러날짜를 동시에 실행? -> 가능, 다만 항상 가능한 것은 아님 실행순서의 문

     - 구현방법에 따라서 한번에 하나씩 실행하는 것이 안전할 수 있음

     - 이를 제어해주는 DAG 파라미터가 max_active_runs

 

 

Backfill CLI

airflow dags backfill dag_id -s 2018- 07- 01 -e 2018- 08- 01

 

This assumes the followings:

- catchUp이 True로 설정되어 있음

- execution_date을 사용해서 Incremental update가 구현되어 있음

 

start_date부터 시작하지만 end_date은 포함하지 않음

 

실행순서는 날짜/시간순은 아니고 랜덤. 만일 날짜순으로 하고 싶다면

- DAG default_args의 depends_on_past를 True로 설정

    - default_args = { 'depends_on_past': True,

 

 

 

How to Make Your DAG Backfill ready

먼저 모든 DAG가 backfill을 필요로 하지는 않음

- Full Refresh를 한다면 backfill은 의미가 없음

 

여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함

- 마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간 기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음

 

데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수

- airflow가 큰 도움이 됨

- 하지만 데이터 소스의 도움 없이는 불가능

 

->어떻게 backfill로 구현할 것인가

제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함

“execution_date”을 사용해서 업데이트할 데이터 결정

“catchup” 필드를 True로 설정

start_date/end_date을 backfill하려는 날짜로 설정

다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함

 

 


최종정리

 

Airflow란

Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임웍

- 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍
- Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름

 

Airflow의 장점

- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 데이터 웨어하우스를 지원
- 백필(Backfill)이 쉬움

 

Airflow 관련 중요 용어/개념

- start_date, execution_date, catchup

 

스케일링 방식

- Scale Up vs. Scale Out vs. 클라우드 버전 vs. K8s 사용

 

 

 

데이터 파이프라인 작성시 기억할 점

데이터 파이프라인에 관한 정보를 수집하는 것이 중요

- 비지니스 오너와 데이터 리니지에 주의할 것
- 결국 데이터 카탈로그가 필요

 

데이터 품질 체크

 - 입력 데이터와 출력 데이터

 

코드 실패를 어설프게 복구하려는 것보다는 깔끔하게 실패하는 것이 좋음
가능하면 Full Refresh -> 데이터가 작은경우, 항상 올바른 방법이 아

 - Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 -> Airflow가 필요한 이유


주기적인 청소 (데이터, 테이블, Dag)

 

 

다음 스텝

Airflow 고도화 강의에서 더 많은 내용을 공부할 예정

    - 운영, 고급 기능 (다양한 방식의 DAG 트리거), 구글 스프레드시트 연동, 슬랙 연동

컨테이너 기술 공부 (Docker와 K8s)

Spark에 대해 학습하여 빅데이터 처리

배치가 아닌 리얼타임 기준으로 스트리밍 데이터 처리 (Kafka, Kinesis) -> 이부분은 많이 궁금하다

 


공부하며 어려웠던 내용