본문 바로가기

데브코스/TIL

[TIL] 10주차_Day45: 데이터 파이프라인, Airflow (5)

💡Today I Learned

  • 데이터 파이프라인과 Airflow에 대한 다섯 번째 수업을 진행했습니다.
  • Production DB to Data Warehouse DAG, Backfill 실습

1. Review

- start_date, execution_date, DAG의 첫 실행 날짜

1. start_date: DAG가 읽어와야 할 데이터의 시점 (ex: 매일 1번 실행되는 DAG의 start_date = 2021/02/05이라면 DAG의 첫 실행 날짜 = 2021/02/06) → 5일의 데이터 읽고싶음 = 5일에 쌓인 모든 데이터를 6일에 실행해서 읽어와야 함

2. execution_date: DAG가 처음 읽어와야하는 데이터의 날짜, 따라서 DAG 실행동안 변경되지 x, 각 task가 실행될 때 사용되는 날짜 (start_date을 기반으로 계산됨)

3. 첫 실행 날짜: DAG가 처음 실행되어야하는 날짜 (ex: 2021/02/06)

 

- Incremental Update 방식의 중복 제거

1. DISTINCT: primary uniqueness의 완벽한 보장은 불가.. (데이터 소스에 따라 다름)

2. ROW_NUMBER + created_date(레코드 생성 날짜 필드 추가!) 사용해서 중복 처리

created_date timestamp DEFAULT GETDATE()

 

*) 중복 레코드 발생 이유: 해당 일자 중간에 API가 호출된 경우 → 추후 종가, volumn이 달라질 수 있음

 

2. MySQL > S3 > Redshift 개요

- Production DB(OLTP: MySQL) → Warehouse(OLAP: Redshift)

: prod 스키마 밑의 nps 파일 → raw_data 스키마 밑의 nps 파일

: INSERT INTO (데이터 크면 오버헤드 커짐) → COPY로 벌크 업데이트

: 데이터 소스(DB 서버_MySQL)에서 파일 다운 → S3에 적재 → Redshift로 복사

 

- 보안/권한 설정

1. Airflow DAG에서 - S3 접근 (읽기/쓰기 권한)

2. Redshift - S3 접근 (읽기 권한)

 

*) 특정 S3 bucket에만 해당 권한들을 연결하기 위해 IAM user 생성 > Custom Policy를 따로 작성하기

 

- Airflow 연결 설정

1. Airflow - MySQL

2. Airflow - S3

 

3. MySQL 테이블 복사 실습

- Airflow에서 제공되는 Native Operator

*) 각 Operator를 기반으로 만든 task에 필요한 param

 

1. SqlToS3Operator

: {s3_bucket}/{s3_key} 이라는 위치(=table path)에 적재됨

: aws_conn_id = S3 connection id

: replace = 덮어쓰기 True/False 지정

: pd_kwargs = index(레코드 일련번호), header 포함 여부 결정 (True/False)

        *) 내부적으로 pandas를 이용해서 읽어옴, pandas 관련 arg임

 

2. S3ToRedshiftOperator

: {s3_bucket}/{s3_key} 이라는 위치(=table path)에서 가져옴

: redshift_conn_id = Redshift connection id

: {schema}.{table}이 복사해서 붙여넣을 타겟 테이블

: copy_options = 옵션 지정 (파일 확장자 .csv)

: method = {'REPLACE', 'APPEND', 'UPSERT'} 중 Replace(Full Refresh), UPSERT(기본키 기준, Update+Insert 하이브리드 형태)

 

- Incremental Update 방식(1~3)

1. DISTINCT (위험, 예외사항 o)

2. ROW_NUMBER (기본키 기준 partition, modified 날짜 기준 정렬 후 중복 제거)

3. SqlToS3Operator의 query, S3ToRedshiftOperator의 method, upsert_keys 파라미터 지정

# 날짜 기준, 생성된 날짜가 execution_date와 동일한 레코드만 새로 적재하기
	# {{ }} 안에 Airflow 시스템 변수
    # -> Airflow의 시스템 변수 넘겨받아 파이썬 코드 내에서 사용
    
query = "SELECT * FROM t WHERE DATE(created_at) = DATE('{{ execution_date }}')"

method = "UPSERT"

upsert_keys = [테이블의 기본키 필드 이름]

 

- IAM User

: Airflow와 S3 연결할 때 필요한 사용자 권한

: IAM > User 생성 > Attach policies directly(내가 직접 policy 지정하겠다) > Create Policy로 json 사용해 특정 S3 버킷에만 권한 지정하기

: 만든 User > 보안 및 권한 > 액세스 키 생성 > AWS 외부 application(=Airflow니까) > 생성 → 액세스 키 & 비밀 키 보관해놓고 사용!

 

- IAM Role

: Redshift가 S3 접근

: IAM > Role 생성 > AWS 서비스 > Redshift > Redshif-customize > Create Policy로 json 사용해 특정 S3 버킷에만 권한 지정하기

: Redshift 콘솔 > cluster > 작업 > IAM 역할 관리 > 위에서 만든 IAM role 연결 > 저장

 

- Docker MySQL config

# MySQL 세팅을 위해 루트 유저로 접속하기
$ docker exec --user root -it [airflow-scheduler 컨테이너 ID] sh

# 우분투 계열에서 사용하는 프로그램 매니저(apt-get) 업데이트
$ sudo apt-get update

# mysql client 라이브러리 설치
$ sudo apt-get install -y default-libmysqlclient-dev

# mysql 관련 모듈에 필요한 cpp 기반 컴파일러(=gcc) 설치
$ sudo apt-get install -y gcc

# airflow의 mysql 모듈 재설치
$ sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

# airflow 유저로 다시 로그인
$ docker exec -it [airflow-scheduler 컨테이너 ID] sh

# DAG v1(FullRefresh) 실행
	# test [DAG id] 주기
$ airflow dags test MySQL_to_Redshift

# DAG v2(Incremental Update) 실행
	# execution_date에 해당하는 날짜 함께 지정하기 !!
$ airflow dags test MySQL_to_Redshift_v2 2023-01-27

 

 

4. Backfill

- Backfill의 방식 (ex: 한 달_31일의 DAG를 실행)

1. 하루씩 31번 실행

2. 한 번에 여러 날짜의 DAG를 동시에 실행

 

- 2번 방식의 Issue

: production DB의 slow-down 문제 발생 가능

: 이 때 production DB의 데이터 팀 전용 slave 데이터를 읽어가도록 세팅

*) slave: production DB에서 master의 복사본 (일반적으로 Read 시)

*) master (일반적으로 Write 시)

 

: 구현 방식에 따라 서로 다른 날짜의 DAG를 동시에 실행 시 충돌 가능 (순서 이상, 덮어쓰기, etc..)

: 한 번에 한 날짜의 DAG만 실행하는 것이 안전.. (max_active_runs = 1)

 

- CLI에서 Backfill 실행

: [-s 날짜, -e 날짜)  (end 날짜 전까지)

 

- Backfill 구현에 필요한 요소

: 데이터 소스가 Backfill 방식 지원해야 함 = 주어진 날짜에 변경/생성된 레코드들만 읽어올 수 있어야 함

반응형