유시동물 대시보드 고도화
주요 메모 사항
지난 프로젝트의 결과물
보고서 PPT
보고서 DOCS
회고록 포스팅
1일차
사실 5일 정도의 세미 프로젝트여서 하루가 중요했었는데, 고도화하는 과정에서는 먼저 목표와 문제정의가 필요했다.
-> 이는 결국 소통을 많이 해야한다고 생각.
성현님이 만나서 회의하자고 제안해주셔서 팀원들과 모두 만나서 회의했다.
문제정의와 목표 설정
사실 기존에 작성했던 Cloud Functions을 그대로 사용하는게 비용적인 측면에서 유리하고, 서버리스하게 잘 구성된 아키텍처는 맞다.
ㄴ> 다만, 이번 프로젝트를 시작하기 전에 배웠던 Airflow와 Docker, CI/CD를 사용해보기 위해 아키텍처 구성을 변경하고자 했다.
결론은 GCP에서 Compute Engine를 하나 할당해 Docker Compose로 airflow를 서비스하기로 했다.
CI/CD가 완성되기 이전에는 서버에 직접 접근해야 했기에, 중앙 집중 VCS를 구성하고자 했고 1일차에 바로 Ubuntu VM을 할당받았다.
ㄴ 대충 방화벽 세팅과 ssh server 세팅을 하고 docker compose - airflow 공식 이미지를 서버에 올려놨다.
2일차
기존에 작성했던 Cloud Functions 함수를 Airflow DAG로 실행하기 위해 코드를 수정하는 작업을 진행했다.
먼저 docker-compose.yaml 에서 Airflow config를 수정했다.
enviroment:에서 data와 key 폴더의 PATH를 지정해주고, install할 패키지들을 선언해줬다.
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
AIRFLOW_VAR_KEY_DIR: /opt/airflow/keys
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- google-cloud-storage pendulum pandas numpy}
volumes에 proj_dir과 mount해줬다.
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
- ${AIRFLOW_PROJ_DIR:-.}/keys:/opt/airflow/keys
그리고 DAG를 작성 -> 테스트 -> 수정을 거쳤다.
OPENAPI_URL = Variable.get("OpenAPI_URL")
OPENAPI_KEY = Variable.get("OpenAPI_KEY")
DAG_ID = "Daily_OpenAPI_30days"
dag = DAG(
dag_id=DAG_ID,
schedule_interval="1 15 * * *",
start_date=datetime(2023, 6, 25),
catchup=False,
)
해당 DAG를 작성할때 생각보다 변경해야 될 것이 많았다.
pytz 모듈을 사용해서 datetime을 Asia/Seoul로 명시적으로 선언했고, 그냥 Exception과 Airflow에 raise할 Exception을 구분하는 등의 작업이 필요했다.
from pytz import timezone
raise AirflowException("10회 이상 추출 실패 데이터, OpenAPI 서버 문제로 추출 중단")
raise Exception("공공데이터 OpenAPI Server error: SERVICE_KEY_IS_NOT_REGISTERED_ERROR -> 등록되지 않은 서비스키")
3일차
2일차에 작성했던 DAG는 KST 기준 00시 05분에 실행된다.
전날부터 전날-1달 만큼의 데이터를 수집해오는 task가 먼저 실행이 되고 Cloud Storage에 업로드하는 task가 실행이 되어야 했는데, 첫번째 task에서 에러가 발생해 적재가 안되는 문제가 생겼었다.
사실 해당 Error은 아직도 이해가 가지 않는다... 아무리 소스를 봐도 문제가 없었는데, docker compose를 다시 up한 뒤로는 정상적으로 작동했었다. 아마도 소스의 문제가 아니라 yaml에서의 문제가 아니였나 싶다. (여기서 삽질을 좀 했다)
그리고 새로운 DAG를 작성했다.
지난 프로젝트 기간에는 위에서 말한 것 처럼 1달동안의 데이터를 하루에 1번 가져왔는데, 당일날의 데이터를 15분마다 추출해 tracking 되는 느낌으로 오늘의 유기동물 수를 보여주고자 했다.
사실 소스코드가 크게 바뀐 것은 없었다. 그저 스케줄 인터벌이 15분 단위이고, 당일의 데이터만 가져와서 서버에는 quarter_yyyymmdd-hhmm으로 저장하고 Cloud Stroage에는 quarter_yyyymmdd 로 override 되도록 수정했다.
DAG_ID = "Quarter_OpenAPI_today"
dag = DAG(
DAG_ID,
schedule_interval="*/15 * * * *",
start_date=datetime(2023, 6, 25),
catchup=False,
)
그리고 계속해서 쌓이게 되는 /data/*.csv와 /logs/dag_id/task_id=* 를 삭제하는 dag가 필요하겠다고 생각했다.
- 모든 DAG에서 나온 log들을 스토리지에 적재
- .yaml에 AIRFLOW__LOGGING__REMOTE_LOGGING: True 로 GCS에 자동으로 적재가 된다.
- airflow 내 로그 파일 청소하기 ( 주기 : 약 3일 )
- dags/clean_data_log.py
- data
- strayanimal_30days_data → 2일이 지나면
- strayanimal_today_data → 당일 데이터만 보존
- logs
- $ find $AIRFLOW_HOME/logs/ -maxdepth 2 -name "run_id=*" -type d -mtime +0 -exec rm -rf {} +
- run_id= 로 시작하는 /logs/run_id=~~/ 위의 폴더들을 모두 하루지나면 삭제
- data
- dags/clean_data_log.py
위에서 정의한대로 DAG를 작성해 data와 log 들을 cleanup하게 했다.
그리고 하루 작업 상황을 공유하는 스크럼을 진행하는데, 발견했던 Issue를 공유했다. (서버에 여러번 CD 되는 Issue)
CICD 작업을 맡으신 분이 수정 작업을 바로 시작해주셨다.
담당하신 팀원이 작성해두신 해결방안이다.

4일차
LOG 관련해서 기존에 로컬에 남는 로그들을 dag로 작성해서 삭제했었다.
ㄴ> Airflow에서 로컬에 남는 로그를 핸들링되게 지원해줘서 적용했다. -> 실패
ㄴ> 핸들링 하는 환경변수가 작동을 안해서 그냥 DELETE 하게했다. (주석문)
docker-compose.yaml
# Airflow config [Logging]
AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: 'gcp_conn_service'
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: 'gs://strayanimal-bucket/dag-logging-test/'
# AIRFLOW__LOGGING__DELETE_LOCAL_LOGS: 'true'
AIRFLOW__LOGGING__REMOTE_TASK_HANDLER_KWARGS: '{{"delete_local_copy": true}}'
airflow는 2.6.2고 공식 docs에서도 REMOTE_TASK_HANDLER_KWARGS 가 2.6.0부터 추가된 기능인데 왜 이런 오류가 발생하는지 모르겠다
ㄴ 그래서 4번째 라인인 DELETE_LOCAL_LOGS를 사용했다.
ㄴ 해당 방법을 사용하게 되면 서버에는 로그가 아예 남지 않는다.
ㄴ flow : container에 로그가 적재 -> GCS에 적재 -> container 로그 삭제
Issue Timezone
이슈 해결을 위해 작성하면서 진행, notion 링크
LOG가 남는 시간이 UTC 기준으로 작성되는 것을 눈치챘다.
1. DAG가 실행되고 남겨지는 logs/dag_id=~/run_id= ~~__datetime 을 KST로 저장하려고 했다.
그래서 먼저 yaml에서 Timezone 관련 environment 수정 -> Airflow와 container들의 Timezone
AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Seoul
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Seoul
# AIRFLOW__CORE__LOG_TIMEZONE: Asia/Seoul # bard가 알려줬으나, 작동 안함
# 상속하는 모든 컨테이너 TimeZone 변경
TZ: 'Asia/Seoul'
모든 airflow 컨테이너에서 $ date 출력시에 KST로 나오는게 확인이 되고, dag에서 pythonOperator로 datetime 모듈을 출력해도 KST로 출력이 되는 상태임!!
ㄴ> 노션에 기록되어 있음!
DAG를 manual 하게 실행 했을때 Airflow logger가 찍어주는 시간은 KST
ㄴ 근데 scheduled에 의해 실행되면 UTC로 찍힘... -> 멘토님께 질문해봤으나, 멘토님도 무슨 문제인지 모르겠다고하심.. ㅜ
현재 logging에 remote logging을 GCS로 연결해서 자동으로 적재가 되고, 로컬에서는 삭제가 되도록 했는데 logs 내부 폴더의 이름이 utc로 적용되서 보기가 어렵다는게 문제점
1. airflow를 커스텀으로 작성한 이미지가 아니라 공식 페이지에서 가져온 이미지라서 그런지, airflow의 timezone이 aware datetime objects를 사용하는데 utc로 하드코딩되어 있는게 있었음.
2. 그래서 스택 트레이스 따라가면서 모두 다 kst로 변경했었는데, 이 경우에 로그 폴더의 이름이 manual로 실행되면 kst로 나오고 scheduled로 실행되면 utc로 나오는 현상이 생겨서 그만 뒀음... (airflow에서 내부적으로 한차례 더 UTC로 변환한다는 얘기가 있었는데 너무 깊어진 것 같아서 시간 소요가 너무 심할 것 같았다)
ㄴ 이 문제는 로그파일 내의 Timestamp가 아닌 log 폴더의 이름을 얘기하는 것임!!
그래서 container Timezone은 KST로 설정해서 python Datetime 모듈에서는 KST 시간이 출력되게 하고, LOG 관련한 모든 TimeStamp는 UTC로 통합하기로 했다.
ㄴ 해당 이슈를 경험해본 사람이 있다면 꼭 물어보고 싶다...
5일차
4일차에 Timezone 관련 삽질을 너무 오랫동안 했더니 하루가 다 지나고 5일차가 되버렸다..
팀원분들이 하는 작업에 들어가기엔 너무 늦은 감이 있어서, 1달 동안의 데이터를 가져오는 Task를 비동기화 하기로 했다. -> 코루틴 -> 파이썬 asyncio 모듈을 적용했다.
Notine Ticket 링크
실제 실행 시간이 약 1/5 ~ 1/7 배로 줄어 의미있는 시간 단축을 할 수 있었다!!
이번에 진행했던 작업 Ticket
수정중
'프로그래머스 데브코스-데이터 엔지니어 > TIL(Today I Learned)' 카테고리의 다른 글
07/04 62일차 Spark (0) | 2023.07.05 |
---|---|
07/03 61일차 빅데이터 처리 시스템, Hadoop, Spark (0) | 2023.07.03 |
06/23 55일차 DBT와 데이터 카탈로그 (2) | 2023.06.23 |
06/22 54일차 Airflow 운영과 DBT (0) | 2023.06.22 |
06/21 53일차 Airflow 다양한 기능 사용해보기 (0) | 2023.06.21 |