본문 바로가기

데브코스/TIL

[TIL] 10주차_Day44: 데이터 파이프라인, Airflow (4)

💡Today I Learned

  • 데이터 파이프라인과 Airflow에 대한 네 번째 수업을 진행했습니다.
  • Open Weather API로 DAG 구현, 기본키 유일성 보장, Backfill 실습

1. Review

- airflow.cfg

*) vi 에디터에서 /[검색어] 로 검색해서 각 키:값 찾아보기

1) DAGs 폴더: Airflow 설치 디렉토리 아래의 'dags' 폴더, dags_folder 키로 지정됨 (dags_folder = /var/lib/airflow/dags)

2) 새로운 DAG 생성 시 스캔 주기: 5분(300초), dag_dir_list_interval 키로 지정됨 (dags_folder 키가 가지는 경로를 스캔)

        *) 주의: 스캔하는 모든 파일들의 main 함수가 실행됨 → test용으로 짠 코드도 실행

3) Airflow를 API 형태로 외부에서 조작 시: [api] 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경

4) Variables의 value값 encrypted 시키기 ('*'로 표현): Variables의 키에 특정 단어 포함 (password, secret, etc..)

5) .cfg 파일 수정 시 최종 반영: (EC2 서버 사용 시)

$ sudo systemctl restart airflow-webserver
$ sudo systemctl restart airflow-scheduler

 

: Docker 사용하는 경우에는 해당 서비스 내렸다가 다시 올리기

 

6) 메타데이터DB 내용(=Airflow 정보)을 암호화하는 데 사용되는 키: fernet_key

 

- Airflow와 timezone

1. default_ui_timezone: 웹 서버단에 표시되는 시간임(default = UTC), 변경 시 airflow.cfg에서 or 웹 서버 UI에서 선택

2. default_timezone: start_date, end_date, schedule이 따라가는 타임존

*) 주의: execution_date은 항상 UTC를 따름

→ start_date와 execution_date간의 관계, UTC를 일관되게 사용하는 것이 권장

 

- CountryInfo DAG 작성

: restcountries.com API 사용

 

1) def extract_transform(api_endpoint)

: api endpoint를 requests.get()으로 받아옴

: response를 .json() 으로 딕셔너리 형태로 파싱

: 원하는 필드 값만 레코드로 만들어 반환

 

2) def load

: country, area, population을 갖는 테이블 생성

: INSERT INTO로 적재 (Full Refresh 형태)

 

3) DAG

: 매주 토요일 오전 6시 30분에 실행되도록 crontab 스케줄링

: extract_transform() → load() task 순서 지정

 

 

2. Open Weathermap API로 DAG 작성하기

- API Key 저장

: Open Weathermap에 등록 후 받은 API Key 정보

: Airflow의 Variables로 저장 (open_weather_api_key)

: One-Call API endpoint 사용 → json 형태로 response (.json() 사용)

 

- DAG 구현

: [본인 스키마]/weather_forecast 테이블로 저장

: 서울(위도, 경도 이용)의 다음 7일간의 낮/최소/최대 온도

: create_date 필드 = Incremental Update를 적용할 때 사용됨

: (일별/시간별/분별 필드 중) 'daily'라는 필드에 앞으로의 8일간 날씨 정보가 들어있음

 

 

3. Primary Key Uniqueness 보장하기

- 기본키 유일성

OLTP: 보장 o, 기본적으로 데이터 크기가 작음, 매번 체크

OLAP: 빅데이터 기반 데이터 웨어하우스는 보장 x, 데이터 크기가 큰 관계형 DB임 = 적재 시마다 기본키 체크한다면 비효율적 → 데이터 인력의 책임으로.. (DE: ETL 구현 시 보장 / DA: ELT 시 보장) 

 

- 중복 제거

: SQL window함수인 ROW_NUMBER 사용

: partition by [그룹핑할 필드] order by [그룹 내의 순서 필드] ASC/DESC

: 레코드의 순서를 지정해서 특정 순번의 레코드만 취하겠다

 

- Upsert

: Insert + Update의 하이브리드 버전

: 이미 동일한 기본키를 가지는 레코드가 있다면 → 새 레코드로 수정 (Update)

: 없다면 → 새 레코드로 적재 (Insert)

 

 

4. Backfill과 Airflow

- Backfill

: Full Refresh = 다시 실행하면 끝

: Incremental Update = 과거 특점 시점부터 다시 실행

: Airflow에서는 이를 프레임워크 차원에서 쉽게 제공

 

- Airflow의 Backfill 접근 방식

: 웹 UI → 실패했던 날짜의 DAG를 클릭 & clear해서 재실행

: 읽어와야 할 데이터를 내가 계산하는 게 아닌 Airflow가 system variable(=execution date)로 제공해줌

: 문제가 생겼을 시 execution date 읽어와서 재실행

 

1. execution date: 이 DAG가 어느 날짜의 데이터인지 (Airflow가 계산해서 제공해줌 _ start_date 기반으로 계산함), 시스템 변수로 읽어와야하는 데이터의 날짜를 지정 (사용자가 가져다가 쓰는 값)

2. start date: 특정 DAG가 처음 읽어와야하는 데이터의 시작 날짜 기준 (ex: 2020/11/07) → daily DAG가 start_date을 기준으로 실행되는 날짜는 그 다음날 (ex: 2020/11/08)

3. 결국 DAG의 실제 첫 실행 날짜 = start_date + DAG 실행 주기

4. catchup: 중요... critical한 variable이라는 걸 기억!!

 

반응형