💡Today I Learned
- 데이터 파이프라인과 Airflow에 대한 다섯 번째 수업을 진행했습니다.
- Production DB to Data Warehouse DAG, Backfill 실습
1. Review
- start_date, execution_date, DAG의 첫 실행 날짜
1. start_date: DAG가 읽어와야 할 데이터의 시점 (ex: 매일 1번 실행되는 DAG의 start_date = 2021/02/05이라면 DAG의 첫 실행 날짜 = 2021/02/06) → 5일의 데이터 읽고싶음 = 5일에 쌓인 모든 데이터를 6일에 실행해서 읽어와야 함
2. execution_date: DAG가 처음 읽어와야하는 데이터의 날짜, 따라서 DAG 실행동안 변경되지 x, 각 task가 실행될 때 사용되는 날짜 (start_date을 기반으로 계산됨)
3. 첫 실행 날짜: DAG가 처음 실행되어야하는 날짜 (ex: 2021/02/06)
- Incremental Update 방식의 중복 제거
1. DISTINCT: primary uniqueness의 완벽한 보장은 불가.. (데이터 소스에 따라 다름)
2. ROW_NUMBER + created_date(레코드 생성 날짜 필드 추가!) 사용해서 중복 처리
created_date timestamp DEFAULT GETDATE()
*) 중복 레코드 발생 이유: 해당 일자 중간에 API가 호출된 경우 → 추후 종가, volumn이 달라질 수 있음
2. MySQL > S3 > Redshift 개요
- Production DB(OLTP: MySQL) → Warehouse(OLAP: Redshift)
: prod 스키마 밑의 nps 파일 → raw_data 스키마 밑의 nps 파일
: INSERT INTO (데이터 크면 오버헤드 커짐) → COPY로 벌크 업데이트
: 데이터 소스(DB 서버_MySQL)에서 파일 다운 → S3에 적재 → Redshift로 복사
- 보안/권한 설정
1. Airflow DAG에서 - S3 접근 (읽기/쓰기 권한)
2. Redshift - S3 접근 (읽기 권한)
*) 특정 S3 bucket에만 해당 권한들을 연결하기 위해 IAM user 생성 > Custom Policy를 따로 작성하기
- Airflow 연결 설정
1. Airflow - MySQL
2. Airflow - S3
3. MySQL 테이블 복사 실습
- Airflow에서 제공되는 Native Operator
*) 각 Operator를 기반으로 만든 task에 필요한 param
1. SqlToS3Operator
: {s3_bucket}/{s3_key} 이라는 위치(=table path)에 적재됨
: aws_conn_id = S3 connection id
: replace = 덮어쓰기 True/False 지정
: pd_kwargs = index(레코드 일련번호), header 포함 여부 결정 (True/False)
*) 내부적으로 pandas를 이용해서 읽어옴, pandas 관련 arg임
2. S3ToRedshiftOperator
: {s3_bucket}/{s3_key} 이라는 위치(=table path)에서 가져옴
: redshift_conn_id = Redshift connection id
: {schema}.{table}이 복사해서 붙여넣을 타겟 테이블
: copy_options = 옵션 지정 (파일 확장자 .csv)
: method = {'REPLACE', 'APPEND', 'UPSERT'} 중 Replace(Full Refresh), UPSERT(기본키 기준, Update+Insert 하이브리드 형태)
- Incremental Update 방식(1~3)
1. DISTINCT (위험, 예외사항 o)
2. ROW_NUMBER (기본키 기준 partition, modified 날짜 기준 정렬 후 중복 제거)
3. SqlToS3Operator의 query, S3ToRedshiftOperator의 method, upsert_keys 파라미터 지정
# 날짜 기준, 생성된 날짜가 execution_date와 동일한 레코드만 새로 적재하기
# {{ }} 안에 Airflow 시스템 변수
# -> Airflow의 시스템 변수 넘겨받아 파이썬 코드 내에서 사용
query = "SELECT * FROM t WHERE DATE(created_at) = DATE('{{ execution_date }}')"
method = "UPSERT"
upsert_keys = [테이블의 기본키 필드 이름]
- IAM User
: Airflow와 S3 연결할 때 필요한 사용자 권한
: IAM > User 생성 > Attach policies directly(내가 직접 policy 지정하겠다) > Create Policy로 json 사용해 특정 S3 버킷에만 권한 지정하기
: 만든 User > 보안 및 권한 > 액세스 키 생성 > AWS 외부 application(=Airflow니까) > 생성 → 액세스 키 & 비밀 키 보관해놓고 사용!
- IAM Role
: Redshift가 S3 접근
: IAM > Role 생성 > AWS 서비스 > Redshift > Redshif-customize > Create Policy로 json 사용해 특정 S3 버킷에만 권한 지정하기
: Redshift 콘솔 > cluster > 작업 > IAM 역할 관리 > 위에서 만든 IAM role 연결 > 저장
- Docker MySQL config
# MySQL 세팅을 위해 루트 유저로 접속하기
$ docker exec --user root -it [airflow-scheduler 컨테이너 ID] sh
# 우분투 계열에서 사용하는 프로그램 매니저(apt-get) 업데이트
$ sudo apt-get update
# mysql client 라이브러리 설치
$ sudo apt-get install -y default-libmysqlclient-dev
# mysql 관련 모듈에 필요한 cpp 기반 컴파일러(=gcc) 설치
$ sudo apt-get install -y gcc
# airflow의 mysql 모듈 재설치
$ sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
# airflow 유저로 다시 로그인
$ docker exec -it [airflow-scheduler 컨테이너 ID] sh
# DAG v1(FullRefresh) 실행
# test [DAG id] 주기
$ airflow dags test MySQL_to_Redshift
# DAG v2(Incremental Update) 실행
# execution_date에 해당하는 날짜 함께 지정하기 !!
$ airflow dags test MySQL_to_Redshift_v2 2023-01-27
4. Backfill
- Backfill의 방식 (ex: 한 달_31일의 DAG를 실행)
1. 하루씩 31번 실행
2. 한 번에 여러 날짜의 DAG를 동시에 실행
- 2번 방식의 Issue
: production DB의 slow-down 문제 발생 가능
: 이 때 production DB의 데이터 팀 전용 slave 데이터를 읽어가도록 세팅
*) slave: production DB에서 master의 복사본 (일반적으로 Read 시)
*) master (일반적으로 Write 시)
: 구현 방식에 따라 서로 다른 날짜의 DAG를 동시에 실행 시 충돌 가능 (순서 이상, 덮어쓰기, etc..)
: 한 번에 한 날짜의 DAG만 실행하는 것이 안전.. (max_active_runs = 1)
- CLI에서 Backfill 실행
: [-s 날짜, -e 날짜) (end 날짜 전까지)
- Backfill 구현에 필요한 요소
: 데이터 소스가 Backfill 방식 지원해야 함 = 주어진 날짜에 변경/생성된 레코드들만 읽어올 수 있어야 함
'데브코스 > TIL' 카테고리의 다른 글
[TIL] 10주차_Day44: 데이터 파이프라인, Airflow (4) (1) | 2023.12.23 |
---|---|
[TIL] 10주차_Day43: 데이터 파이프라인, Airflow (3) (1) | 2023.12.22 |
[TIL] 10주차_Day42: 데이터 파이프라인, Airflow (2) (0) | 2023.12.20 |
[TIL] 10주차_Day41: 데이터 파이프라인, Airflow (1) (1) | 2023.12.19 |
[TIL] 8주차_Day35: 데이터 웨어하우스 관리, 고급 SQL, BI 대시보드 (5) (1) | 2023.12.01 |