기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

06/20 52일차 ELT 작성과 구글시트_슬랙 연동 (후반부)

usiohc 2023. 6. 20. 17:44

Airflow


주요 메모 사항


구글 시트 연동하기 (1): 시트 => Redshift 테이블

개요

회사가 GCP를 쓴다면 Spread sheet를 사용하는 경우가 있

 

 

구현 절차들

1. 시트 API 활성화하고 구글 서비스 어카운트 생성하고 그 내용을 JSON 파일로 다운로드

2. 어카운트에서 생성해준 이메일을 조작하고 싶은 시트에 공유

3. Airflow DAG쪽에서 해당 JSON 파일로 인증하고 앞서 시트를 조작

 

 

Colab에서 Table 생성

%%sql

CREATE TABLE
asltn99.spreadsheet_copy_testing (
  col1 int,
  col2 int,
  col3 int,
  col4 int
);

 

구글 서비스 어카운트 생성

구글 클라우드 로그인

구글 스프레드시트 API 활성화 필요 

https://console.cloud.google.com/apis/library/sheets.googleapis.com

 

다음으로 구글 서비스 어카운트 생성, 아래 두 문서 중 하나를 참고

https://robocorp.com/docs/development-guide/google-sheets/interacting-with-google-sheets

https://denisluiz.medium.com/python-with-google-sheets-service-account-step-by-step-8f74c26ed28e

 

이 JSON 파일의 내용을 google_sheet_access_token이란 이름의 Variable로 등록

 

이 JSON 파일을 보면 이메일 주소가 하나 존재

ㄴ 이를 읽고 싶은 구글스프레드시트 파일에 공유. 이 이메일은 iam.gserviceaccount.com로 끝남

 

 

1. Credentials or 사용자 인증 정보에서 Service account or 서비스 계정으로 사용자 인증 정보를 만들자

 

 

 

 

 

 

해당 서비스 계정에 들어가서 새로운 키를 만들어야 한다 -> JSON로 선택하면 해당 Key가 다운로드

해당 키를  Airflow Variable에 등록할 것임!

위에서 만든 gsheet용 서비스 계정의 이메일을 공유하고자 하는 gsheet에서 공유 이메일로 추가해야 한다!

 

그리고 Json 값을 google_sheet_access_token의 이름으로 Key 추가

 

 

 

구글 시트를 테이블로 복사하는 예제

ㄴ 실제 스프레드시트와 연동하는 방법은 아래 코드 두 개를 참고

    ㄴ Gsheet_to_Redshift.py

    ㄴ plugins/gsheet.py

 


결과 확인

테이블 생성과 spreadsheet 링크를 제대로 못해서 실패를 했었 

 

 

테이블 확인

select * from asltn99.spreadsheet_copy_testing limit 5;

 

 


구글 시트 연동하기 (2): Redshift 테이블 => 시트

개요

 

예제

SQL_to_Sheet.py

plugins/gsheet.py의 update_sheet

 

 

Issue

만약, Sheet가 Google Drive에 위치하고 있다 Google Drive API도 활성화 해야한다!!

 

 

 

 

앞서 데모에서 사용했던 동일한 시트에 새로운 탭을 하나 만듬

ㄴ 이미 필요한 이메일 주소가 해당 시트에 편집자로 공유가 되어 있기에 별도 작업이 필요 없음

 

거기에 “SELECT * FROM analytics.nps_summary”의 내용을 복사

ㄴ 이 과정을 PythonOperator로 구현

ㄴ 해당 기능은 gsheet 모듈내에 있는 update_sheet라는 함수로 구현했음

 

scheduler 터미널에서 실행

 

SQL이 포힘된 이름을 가진 dag 리스트 출력

 

 

우리가 진행할 SQL_to_Sheet를 실행

 

성공적으로 진행되었다면, google sheet에서 확인

 

 


 

컬럼의 이름이 round로 되어있는 것을 nps로, date를 기준으로 정렬해서 집어넣자 -> SQL문 수정

        params = {
            "sql": "SELECT date, round AS nps FROM analytics.nps_summary Order By date",
            "sheetfilename": "spreadsheet-copy-testing",
            "sheetgid": "RedshiftToSheet"
        }

 

scheduler 터미널에서 실행

 

 

 

 

 


API & Airflow 모니터링

Airflow가 제공해주는 API에 대해서 알아보고 이를 이용해 모니터링 방법에 대해 알아보자

 

 

이번 섹션에서 해보고자 하는 일들

Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습

Airflow API로 외부에서 Airflow를 조작해보는 방법에 대해 학습

 

 

Airflow API 활성화

airflow.cfg의 api 섹션에서 auth_backend의 값을 변경

ㄴ [api] auth_backend = airflow.api.auth.backend.basic_auth

ㄴ 터미널에 들어가지 않고 docker desktop에서 수정이 가능하다!!

ㄴ 다만 이미 Docker Compose yaml에서 override했기 때문에 따로 건드릴 필요가 없음

 

docker-compose.yaml에는 이미 설정이 되어 있음 (environments)

ㄴ 위의 cfg를 yaml에서 수정

ㄴ 언더바 2개 __ 를 사용하는게 무슨 의미? : Airflow Section Key 라고 한다, -> airflow.cfg를 Override하는 것

    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'

 

아래 명령으로 확인해보기

ㄴ 여기서 신기한게 위에서 변수명이 auth_backends로 되어있는데, auth_backend로 써도 값 출력이 가능하다? 물론, 노란색 메세지로 s를 붙이라고 한다

ㄴ 이 부분을 설명해주셨는데 auth_backend는 airflow config에서 사용, s가 븥은 복수형은 yaml에서 사용한다? 라고 이해했

 

 

Airflow Web UI에서 새로운 사용자 추가 (API 사용자)

ㄴ Security -> List Users -> +

ㄴ 이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1) 로 만들어줬다.

 

 

 

 

Health API 호출

ㄴ airflow 상태를 /health API로 호출해서 return 받아보자 (윈도우에서는 PS로 안된다 CMD에서 실행)

ㄴ 그냥 web server run중인지 확인 용

 

API 사용 Reference

ㄴ 프로젝트 할 때 써봐야 겠다.

 

API 레퍼런스 살펴보기

특정 DAG를 API로 Trigger하기

모든 DAG 리스트하기

모든 Variable 리스트하기

모든 Config 리스트하기

 

 

API 사용예 - 특정 DAG를 API로 Trigger하기

ㄴ 윈도우에서만 발생하는 이슈인지 모르겠다 -> 조금 찾아봤는데 Content-Type를 application/json으로 전달해서 airflow api server 쪽에서는 x-ww-form-urlencoded 로 직렬화? 받아서 응답하는 것 같은데, 이 문제는 api에서 역직렬화 하는 코드를 추가해야 한다고 한다? 

 

 

이 외의 API들은 모두 401 error를 리턴해주는데, 윈도우의 문제인지는 모르겠다.

stackoverflow를 확인해봐도 명확한 해결법이 없다

 

 

 


공부하며 어려웠던 내용

alirflow api 트러블 슈팅 중