본문 바로가기

데브코스/TIL

[TIL] 10주차_Day41: 데이터 파이프라인, Airflow (1)

💡Today I Learned

  • 데이터 파이프라인과 Airflow에 대한 첫 번째 수업을 진행했습니다.
  • Airflow 설치 및 실습

1. 데이터 파이프라인(=ETL) 소개

- 데이터 웨어하우스 구성

1️⃣데이터 소스(프로덕션 DB, 트랜잭션 데이터, 유저 데이터, 콜 데이터, 세일즈 데이터, ...) → 다수의 ETL → 2️⃣Airflow+데이터 웨어하우스 (요약 테이블 만들기 = ELT = 데이터 분석) → 3️⃣대시보드

 

- ETL

: ETL = Extract(데이터 소스로부터 Data Dump ex: 파일 다운로드, API 호출) + Transform(원하는 형태로 변환, 추출) + Load(데이터 웨어하우스에 테이블 형태 등으로 적재)

: DAG = Directed Acyclic Graph (루프 x)

: 즉, 데이터 파이프라인 = ETL = 데이터 Workflow = DAG (in Airflow)

: by. 데이터 엔지니어

 

- ELT

: 데이터 웨어하우스 내부 데이터를 조작, 새로운 데이터를 만듦

: 데이터 레이크(웨어하우스보다 더 scalable한 DB) or 웨어하우스 위에서 작업

: by. 데이터 분석가 (SQL의 CTAS 문법)

: DBT (Data Build Tool)

 

- 데이터 레이크

: 구조화 + 비구조화 데이터

: 보존 기한 없는 원래 형태의 데이터를 보존하는 스토리지에 가까움

: 웨어하우스 <<< 레이크

: ex) AWS의 S3

 

- 데이터 웨어하우스

: 보존 기한이 있는 구조화 데이터 저장/처리

: BI 툴(superset, looker, tableau, ...)은 데이터 웨어하우스를 백엔드로 사용함

: ex) AWS의 Redshift, Snowflake

 

- 데이터 레이크 & ELT 과정

1. 데이터 소스

2. raw format의 데이터 (데이터 레이크)

3. 데이터 변환 (Hadoop, Spark, Athena, Hive, Presto 등.. SQL 기반 빅데이터 프로세싱 프레임워크)

4. 데이터 웨어하우스

 

→ 이 때 각 과정 사이에 위치한 여러 데이터 파이프라인스케줄링&관리 툴 필요 = Airflow !

 

- 데이터 파이프라인 정의

: 데이터 소스 → 목적지로 복사

: 데이터 소스 = 데이터 웨어하우스, 데이터 레이크, API, etc..

: 데이터 목적지 = 데이터 웨어하우스, 프로덕션 db(MySQL, Postgresql), 캐시 시스템(Redis), NoSQL, 데이터 레이크(S3), ...

 

- 데이터 파이프라인 종류

1. Raw data ETL job

: 데이터 시스템 밖의 데이터 소스[API 호출] → 데이터 포맷 변환[파이썬, Spark, ..] → 데이터 웨어하우스 로드

: by. 데이터 엔지니어

 

2. Summary/Report job

: DW(또는 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL

: raw data → report/summary 형태의 테이블을 다시 만드는 용도

: 요약 테이블 → SQL(CTAS)만으로 생성 or DBT 사용

 

3. Production Data job

: DW로부터 데이터 읽어 다른 스토리지(주로 프로덕션 환경)으로 쓰는 ETL

 

- ex 1) 인기강좌 정보 실시간 업데이트

1. 여러 raw table을 join, group by... 해서 만든 summary 정보 프로덕션 환경에서 필요

2. But 실시간 연산 수행 시마다 프로덕션 db에 부하가 생김

3. 성능을 위해 실시간성을 줄이고 (ex: 1시간에 한 번 계산, 약간의 오차범위 허용) 필요한 정보를 프로덕션 db(=데이터 시스템 바깥의 스토리지, MySQL)에 push하도록

4. BE/FE 개발자는 MySQL에서 직접 계산 x, DW나 Production DB에 push된 summary 테이블을 읽어다가 보여줌

 

- ex 2) 머신러닝 모델의 input으로 필요한 feature들을 미리 계산하는 경우

 

- 타겟 스토리지(프로덕션 환경)

: NoSQL (DynamoDB, Cassandra, ..)

: OLTP (MySQL 같은 관계형 db)

: 캐시 (Redis, Memcache, ..)

: 검색엔진 (ElasticSearch, ..)

 

 

2. 데이터 파이프라인 작성의 best practices

- Best Practices (1): Full Refresh

: 데이터가 작을 경우, 가능하면 매번 통째로 복사해서 테이블 만들기

: 데이터 양이 크다면, Incremental update (운영이 좀 더 까다롭) 대상 데이터 소스(프로덕션 db / API)의 몇가지 조건이 필요

 

- Best Practices (2): Idempotency(멱등성)

: 동일한 입력 데이터로 데이터 파이프라인을 여러 번 수행해도 최종 테이블의 내용이 달라지지 않아야 함 = 멱등성을 보장 !

: SQL의 transaction이 꼭 필요

 

- Best Practices (3): Backfill

: 실패한 데이터 파이프라인의 재실행 = Backfill (과거 데이터를 다시 채우는 과정)

: Backfill이 쉬워야 함

 

- Best Practices (4): 문서화

: 데이터 파이프라인의 입/출력을 명확히, 문서화

: 데이터 카탈로그 내에 해당 문서 포함  데이터 디스커버리에 사용 가능

: 데이터 리니지(=데이터의 흐름을 가시화)의 중요성

 

- Best Practices (5): 주기적 관리

: 주기적으로 쓸모없는 데이터들을 삭제

 

- Best Practices (6): 사고 리포트

: 데이터 파이프라인 사고시마다 사고 리포트(post-mortem) 작성

 

- Best Practices (7): 유닛 테스트

: 중요 데이터 파이프라인의 입/출력 체크하기

: 데이터를 대상으로 하는 유닛 테스트

 

 

3. Airflow

- Airflow란

: 데이터 파이프라인(ETL) 프레임임워크

: Airbnb에서 시작된 아파치 오픈소스 프로젝트

: ETL 스케줄링(=정해진 시간에 특정 ETL 실행 or 여러 개 순서대로)

: 웹 UI 제공

 

: (in Airflow,) 데이터 파이프라인 = DAG(Directed Acyclic Graph)

: 하나의 DAG = 하나 이상의 task로 구성됨 (task 단위로 실행)

 

- Airflow의 5개의 컴포넌트

1. 웹 서버: 파이썬 Flask로 구현됨

2. 스케줄러: DAG 내의 task를 실행 순서대로 / DAG들을 워커들에 배정

3. 워커: 실제 DAG 내의 task 코드를 실행하는 모듈 / 서버의 CPU 수 = 워커 개수

4. 메타 데이터 DB: 스케줄러, DAG 실행결과 저장 / Sqlite가 디폴트, but MySQL나 Postgres 설치해서 쓰는 게 일반적

5. 큐: 다중서버 구성의 경우에만 사용, 큐에 task 적재, 놀고있는 워커가 task를 실행

 

- Airflow의 스케일링 방법

1. 스케일 업 (서버 사양 높이기)

2. 스케일 아웃 (서버 개수 늘리기): 서버 한 대는 웹서버+스케줄러, 나머지는 전부 워커용으로 세팅 (but 비용+운영 어렵.. 클라우드 옵션 선택)

- Airflow가 다수의 서버로 구성될 때

:  스케줄러의 Executor → (큐를 통해 통신함) 다수 서버에 존재하는 워커에 task를 할당

 

- DAG

: 여러 개의 task로 구성됨 ex) Extract - Transform - Load 3개의 태스크 (각 태스크 = 코드)

: Airflow의 오퍼레이터로 만들어짐 (빌트인, 사용자 정의 operator..) ex) S3 Read/Write, Redshift writing, ...

 

반응형