💡Today I Learned
- 데이터 파이프라인과 Airflow에 대한 네 번째 수업을 진행했습니다.
- Open Weather API로 DAG 구현, 기본키 유일성 보장, Backfill 실습
1. Review
- airflow.cfg
*) vi 에디터에서 /[검색어] 로 검색해서 각 키:값 찾아보기
1) DAGs 폴더: Airflow 설치 디렉토리 아래의 'dags' 폴더, dags_folder 키로 지정됨 (dags_folder = /var/lib/airflow/dags)
2) 새로운 DAG 생성 시 스캔 주기: 5분(300초), dag_dir_list_interval 키로 지정됨 (dags_folder 키가 가지는 경로를 스캔)
*) 주의: 스캔하는 모든 파일들의 main 함수가 실행됨 → test용으로 짠 코드도 실행
3) Airflow를 API 형태로 외부에서 조작 시: [api] 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경
4) Variables의 value값 encrypted 시키기 ('*'로 표현): Variables의 키에 특정 단어 포함 (password, secret, etc..)
5) .cfg 파일 수정 시 최종 반영: (EC2 서버 사용 시)
$ sudo systemctl restart airflow-webserver
$ sudo systemctl restart airflow-scheduler
: Docker 사용하는 경우에는 해당 서비스 내렸다가 다시 올리기
6) 메타데이터DB 내용(=Airflow 정보)을 암호화하는 데 사용되는 키: fernet_key
- Airflow와 timezone
1. default_ui_timezone: 웹 서버단에 표시되는 시간임(default = UTC), 변경 시 airflow.cfg에서 or 웹 서버 UI에서 선택
2. default_timezone: start_date, end_date, schedule이 따라가는 타임존
*) 주의: execution_date은 항상 UTC를 따름
→ start_date와 execution_date간의 관계, UTC를 일관되게 사용하는 것이 권장
- CountryInfo DAG 작성
: restcountries.com API 사용
1) def extract_transform(api_endpoint)
: api endpoint를 requests.get()으로 받아옴
: response를 .json() 으로 딕셔너리 형태로 파싱
: 원하는 필드 값만 레코드로 만들어 반환
2) def load
: country, area, population을 갖는 테이블 생성
: INSERT INTO로 적재 (Full Refresh 형태)
3) DAG
: 매주 토요일 오전 6시 30분에 실행되도록 crontab 스케줄링
: extract_transform() → load() task 순서 지정
2. Open Weathermap API로 DAG 작성하기
- API Key 저장
: Open Weathermap에 등록 후 받은 API Key 정보
: Airflow의 Variables로 저장 (open_weather_api_key)
: One-Call API endpoint 사용 → json 형태로 response (.json() 사용)
- DAG 구현
: [본인 스키마]/weather_forecast 테이블로 저장
: 서울(위도, 경도 이용)의 다음 7일간의 낮/최소/최대 온도
: create_date 필드 = Incremental Update를 적용할 때 사용됨
: (일별/시간별/분별 필드 중) 'daily'라는 필드에 앞으로의 8일간 날씨 정보가 들어있음
3. Primary Key Uniqueness 보장하기
- 기본키 유일성
OLTP: 보장 o, 기본적으로 데이터 크기가 작음, 매번 체크
OLAP: 빅데이터 기반 데이터 웨어하우스는 보장 x, 데이터 크기가 큰 관계형 DB임 = 적재 시마다 기본키 체크한다면 비효율적 → 데이터 인력의 책임으로.. (DE: ETL 구현 시 보장 / DA: ELT 시 보장)
- 중복 제거
: SQL window함수인 ROW_NUMBER 사용
: partition by [그룹핑할 필드] order by [그룹 내의 순서 필드] ASC/DESC
: 레코드의 순서를 지정해서 특정 순번의 레코드만 취하겠다
- Upsert
: Insert + Update의 하이브리드 버전
: 이미 동일한 기본키를 가지는 레코드가 있다면 → 새 레코드로 수정 (Update)
: 없다면 → 새 레코드로 적재 (Insert)
4. Backfill과 Airflow
- Backfill
: Full Refresh = 다시 실행하면 끝
: Incremental Update = 과거 특점 시점부터 다시 실행
: Airflow에서는 이를 프레임워크 차원에서 쉽게 제공
- Airflow의 Backfill 접근 방식
: 웹 UI → 실패했던 날짜의 DAG를 클릭 & clear해서 재실행
: 읽어와야 할 데이터를 내가 계산하는 게 아닌 Airflow가 system variable(=execution date)로 제공해줌
: 문제가 생겼을 시 execution date 읽어와서 재실행
1. execution date: 이 DAG가 어느 날짜의 데이터인지 (Airflow가 계산해서 제공해줌 _ start_date 기반으로 계산함), 시스템 변수로 읽어와야하는 데이터의 날짜를 지정 (사용자가 가져다가 쓰는 값)
2. start date: 특정 DAG가 처음 읽어와야하는 데이터의 시작 날짜 기준 (ex: 2020/11/07) → daily DAG가 start_date을 기준으로 실행되는 날짜는 그 다음날 (ex: 2020/11/08)
3. 결국 DAG의 실제 첫 실행 날짜 = start_date + DAG 실행 주기
4. catchup: 중요... critical한 variable이라는 걸 기억!!
'데브코스 > TIL' 카테고리의 다른 글
[TIL] 10주차_Day45: 데이터 파이프라인, Airflow (5) (0) | 2023.12.26 |
---|---|
[TIL] 10주차_Day43: 데이터 파이프라인, Airflow (3) (1) | 2023.12.22 |
[TIL] 10주차_Day42: 데이터 파이프라인, Airflow (2) (0) | 2023.12.20 |
[TIL] 10주차_Day41: 데이터 파이프라인, Airflow (1) (1) | 2023.12.19 |
[TIL] 8주차_Day35: 데이터 웨어하우스 관리, 고급 SQL, BI 대시보드 (5) (1) | 2023.12.01 |