OLTP, Backfill, ELT
주요 메모 사항
전날 숙제 리뷰
2번째 프로젝트를 진행하면서 Cloud Functions와 Cloud Scheduler를 사용했던 것이 Airflow를 이해하는데 크게 도움이 된 것 같다. -> MAX님이 주신 Airflow 관련한 숙제를 푸는데 어려움이 없었음
MySQL(OLTP) 테이블 -> Redshift(OLAP) 테이블로 복사하기
- S3 connection 세팅들이 필요함, 보통 권한 등 -> IAM 다룰것
- airflow쪽에서 mysql쪽에 대한 엑세스도 필요
그럼 AWS 관련 권한 설정을 보자
- Airflow DAG에서 S3 접근 을 위한 권한 -> 이는 쓰기 권한
a. IAM User를 만들고 S3버킷에 대한 읽기/쓰기 권한을 설정, access key와 secret key 사용
- Redshift가 S3 접근, S3에서 Redshift로 적재 하기 위한 권한 -> 이는 읽기 권한
a. Redshift에 S3를 접근할 수 있는 역할(Role)을 만들고 이를 RedShift에 지정해야 함
AWS S3 Connections
- Access Key ID 와 Secret Access Key를 사용, 해당 AWS 작업은 우리가 실습할 수 없음
a. 사실 루트 사용자의 키를 사용하면 편하지만, 상당한 사고 위험과 문제점을 동반 (해킹 -> AWS 자원 문제)
- 우리가 해볼 Best Practice
a. IAM을 사용해 별도의 사용자 생성
b. 그 사용자에게 해당 S3 Bucket을 읽고 쓸 수 있는 권한을 제공
c. 그 사용자의 Access Key ID와 Secret Access Key를 사용
- 해당 Key도 주기적으로 변경해서 보안을 한층 더 높일 수 있음 -> 외부의 해킹 공격시 피해 최소화
1) 유저 생성, 이름은 airflow-s3-access로 지정
2) 정책 추가, 2가지 방법이 존재 -> 1번으로 진행할 것
1. 정책을 S3 bucket을 Read만 해도 된다면 Readonly 정책을 선택하는 것과 같이 세부적인 권한을 조정
2. S3에 대한 모든 권한 지정 -> S3FullAccess
3) Custom Policy의 내용을 설정
- MAX님은 따로 몰라도 된다고 함 -> AWS의 정책 얘기라 그러신 듯, 하다보면 알게된다고 하셨음
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::grepp-data-engineering",
"arn:aws:s3:::grepp-data-engineering/*"
]
}
]
}
4) IAM -> Users -> airflow-s3-access
- 밑에 첫번째 이미지의 policies를 보면 de-s3-bucket-access는 위에서 설정한 Custom Policy
- 두번째 이미지 -> Security Credentials 에서 기존에 진행하기로한 access key를 만들 수 있음
5) S3 Connection 설정
- 여기에서 Connection Type을 Amazon Webservices로 선택하라고 하셨는데, 이유는 모르겠음...
MySQL 연결, 제공된 정보로 바로 Connection 작성
MySQL Connections 설정시 유의사항 (Docker 환경)
- 이는 Docker환경을 사용했을때 생길 수 있는 유의점으로 MAX님이 미리 얘기해주시는 것
- 필자는 Ubuntu 위에 바로 Airflow를 설치해서 실습하기 때문에 해당하지 않음
- “ModuleNotFoundError: No module named 'MySQLdb'” 에러에 해당하는 해결방
- 아래 명령을 Airflow Scheduler Docker Container에 root 유저로 로그인해서 실행
docker exec --user root -it <Airflow Scheduler의 Container Id> sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
2 New Connections(MySQL & AWS)
- aws_conn_id 는 자신에게 맞는 connType을 설정했으므로 S3 이거나 generic 일 수 있음
- redshift_dev_db도 마찬가지로 postgres나 amazon redshfit
MySQL의 테이블 (OLTP, Production Database)
- 이미 MySQL에 해당 테이블이 생성되어 있음 (레코드도 존재), 이를 RedShift로 복사하는 것이 진행할 실습
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
그러면 우리는 RedShift(OLAP, Data Warehouse)에 해당 테이블 생성
- 해당 테이블을 Redshift에 미리 만들어서 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사할 것
CREATE TABLE (본인의스키마).nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
Colab에서 Redshift 테이블 생성 완료
MySQL_to_Redshift DAG의 Task 구성
SqlToS3Operator
- MySQL SQL 결과 -> S3
- (s3://grepp-data-engineering/{본인ID}-nps)
- s3://s3_bucket/s3_key
S3ToRedshiftOperator
- S3 -> Redshift 테이블
- (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
- COPY command is used
MySQL_to_Redshift.py
2개의 Operator를 사용해서 구현
- SqlToS3Operator
- S3ToRedshiftOperator
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
- S3를 경유해서 COPY 명령으로 복사
MySQL 테이블의 Incremental Update 방식
MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
- created (timestamp): Optional
- modified (timestamp)
- deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정
Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면?
1) ROW_NUMBER로 직접 구현하는 경우
- 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사
- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
- 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
-> temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사
2) S3ToRedshiftOperator로 구현하는 경우
query 파라미터로 아래를 지정
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
method 파라미터로 “UPSERT”를 지정
upsert_keys 파라미터로 Primary key를 지정 -> "id" columns
- 앞서 nps 테이블이라면 “id” 필드를 사용
MySQL_to_Redshift_v2.py
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함 -> 위에서 설명
2개의 Operator를 사용해서 구현
- SqlToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜
- S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv', "IGNOREHEADER 1"],
redshift_conn_id = "redshift_dev_db",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
SQL문의 {{ ~ }} 은 Airflow에서 자동으로 system variable 로 연결해주는 문법이라고 한다
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
Backfill 실행해보기
Daily Incremental DAG에서 2018년 7월달 데이터를 다 다시 읽어와야 한다면
Airflow에서 추천하는 방식으로 Icremental Update를 구현했다면? -> Backfill이 쉬워짐
하지만 이를 어떻게 실행?
- 하루씩 31번 실행하나?
- airflow dags test MySQL_to_Redshift_v2 2023-07-01
- ...
- airflow dags test MySQL_to_Redshift_v2 2023-07-31
- 한번에 여러날짜를 동시에 실행? -> 가능, 다만 항상 가능한 것은 아님 실행순서의 문
- 구현방법에 따라서 한번에 하나씩 실행하는 것이 안전할 수 있음
- 이를 제어해주는 DAG 파라미터가 max_active_runs
Backfill CLI
airflow dags backfill dag_id -s 2018- 07- 01 -e 2018- 08- 01
This assumes the followings:
- catchUp이 True로 설정되어 있음
- execution_date을 사용해서 Incremental update가 구현되어 있음
start_date부터 시작하지만 end_date은 포함하지 않음
실행순서는 날짜/시간순은 아니고 랜덤. 만일 날짜순으로 하고 싶다면
- DAG default_args의 depends_on_past를 True로 설정
- default_args = { 'depends_on_past': True,
How to Make Your DAG Backfill ready
먼저 모든 DAG가 backfill을 필요로 하지는 않음
- Full Refresh를 한다면 backfill은 의미가 없음
여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함
- 마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간 기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음
데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수
- airflow가 큰 도움이 됨
- 하지만 데이터 소스의 도움 없이는 불가능
->어떻게 backfill로 구현할 것인가
제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함
“execution_date”을 사용해서 업데이트할 데이터 결정
“catchup” 필드를 True로 설정
start_date/end_date을 backfill하려는 날짜로 설정
다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함
최종정리
Airflow란
Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임웍
- 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍
- Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
Airflow의 장점
- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 데이터 웨어하우스를 지원
- 백필(Backfill)이 쉬움
Airflow 관련 중요 용어/개념
- start_date, execution_date, catchup
스케일링 방식
- Scale Up vs. Scale Out vs. 클라우드 버전 vs. K8s 사용
데이터 파이프라인 작성시 기억할 점
데이터 파이프라인에 관한 정보를 수집하는 것이 중요
- 비지니스 오너와 데이터 리니지에 주의할 것
- 결국 데이터 카탈로그가 필요
데이터 품질 체크
- 입력 데이터와 출력 데이터
코드 실패를 어설프게 복구하려는 것보다는 깔끔하게 실패하는 것이 좋음
가능하면 Full Refresh -> 데이터가 작은경우, 항상 올바른 방법이 아
- Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 -> Airflow가 필요한 이유
주기적인 청소 (데이터, 테이블, Dag)
다음 스텝
Airflow 고도화 강의에서 더 많은 내용을 공부할 예정
- 운영, 고급 기능 (다양한 방식의 DAG 트리거), 구글 스프레드시트 연동, 슬랙 연동
컨테이너 기술 공부 (Docker와 K8s)
Spark에 대해 학습하여 빅데이터 처리
배치가 아닌 리얼타임 기준으로 스트리밍 데이터 처리 (Kafka, Kinesis) -> 이부분은 많이 궁금하다
공부하며 어려웠던 내용
'프로그래머스 데브코스-데이터 엔지니어 > TIL(Today I Learned)' 카테고리의 다른 글
06/13 47일차 개발환경 구축을 위한 Docker & K8s (2) (0) | 2023.06.13 |
---|---|
06/12 46일차 개발환경 구축을 위한 Docker & K8s (1) (0) | 2023.06.12 |
06/08 44일차 데이터 파이프라인과 Airflow (4) (0) | 2023.06.08 |
06/07 43일차 데이터 파이프라인과 Airflow (3) (1) | 2023.06.07 |
06/06 42일차 데이터 파이프라인과 Airflow (2) (0) | 2023.06.06 |