💡Today I Learned
- 데이터 파이프라인과 Airflow에 대한 세 번째 수업을 진행했습니다.
- DAG 작성 실습
1. Airflow 예제 프로그램 살펴보기
- PythonOperator
: 태스크 실행 (내부에 dag와 task 지정)
: python_callable = [파이썬으로 짠 task_함수명 지정] → PythonOperator의 entry 함수
: params = {딕셔너리 형태} → task 함수(즉, entry 함수)에 공통으로 적용되는 파라미터, task함수로 전달하고 싶은 인자들
- Airflow Decorators 사용하기
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
: annotation
: @task 아래의 함수 = PythonOpertor 임을 알려줌
: 이 PythonOperator의 entry 함수가 @task로 decorate한 함수 자체
: 함수 정의 + 그 자체가 operator
: task_id를 따로 지정하지 않음 = 함수명 자체가 task_id (순서 지정할 때 함수이름 자체를 사용)
- DAG 파라미터 (*task param이 아님!)
1. max_active_runs: 한 번에 동시에 실행될 수 있는 DAG 인스턴스의 수
ex) Daily incremental update를 하는 DAG의 Backfill시, (어떤 이슈로) 지난 1년간의 데이터를 다시 읽어와야할 때 → 한 번에 1개의 DAG만 실행된다면 365번 실행 → max_active_runs=30이면 약 12번 실행
2. max_active_tasks: 한 번에 동시에 실행될 수 있는 DAG에 속한 task의 수
3. catchup: DAG의 start_date이 지금 시점보다 과거일 때, DAG를 활성화 시켰을 때 그만큼 따라잡을지(default = True) 말지(False) *) Full Refresh는 의미 x / Incremental Update를 하는 DAG에서만 적용 o
: (1. 2.에 아무리 큰 값을 지정한다 해도) upper bound는 Airflow Worker 노드에 할당된 CPU 총합!!
2. Name Gender 예제 프로그램 Airflow로 포팅
- 예제 프로그램 개선 버전
1. v2: params를 통해 entry 함수로 입력 파라미터 전달하기, context 아래에 Airflow가 관리하는 시스템 변수 사용하기
# link = url 하드코딩 -> 'context' 딕셔너리 key 사용
def etl(**context):
link = context["params"]["url"]
# task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
# https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
# context 아래 시스템 변수(key) 사용
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
data = extract(link)
lines = transform(data)
load(lines)
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
params = {
'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
},
dag = dag)
2. v3: Variable을 이용해 csv(=S3 url) param 넘김 + Xcom 사용해서 1개의 태스크(Operator) → 3개의 태스크(Operator)로
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
# Xcom 사용해서 다른 Operator의 return 읽어오기(pull)
# context의 'task_instance'에서 함수 호출
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
# extract, transform, load 세 개의 Operator
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
# S3에 저장된 csv_url을 Variable의 key:value pair로 구성, .get()으로 받아오기
params = {
'url': Variable.get("csv_url")
},
dag = dag)
3. v4: PostgresHook 모듈로 Redshift connection 정보 받아오기 (+ task 실패 시 호출될 콜백함수 지정)
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
# task 실패 시 호출될 콜백함수 지정
# 'on_failure_callback': slack.on_failure_callback,
}
)
4. v5: @task 데코레이터 사용해서 Operator 별도 지정 x → task 호출 시 함수 호출하듯
# 각각의 task 함수(e, t, l)에 @task annotation
# task_id는 task 함수의 이름
# 함수(**context) 대신 함수(입력 param) 으로 더 직관적
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
with DAG(
# DAG 정의는 동일
) as dag:
url = Variable.get("csv_url")
schema = [본인 ID]
table = 'name_gender'
# task 호출 시 함수 호출하듯
lines = transform(extract(url))
load(schema, table, lines)
- Airflow Web UI
1. Connections: 데이터 웨어하우스(Redshift)와 connect시 연결 정보(hostname, 포트넘버, etc..)를 configuration 환경설정 형태로 코드 바깥으로 빼내줌, 하드코딩 방지
2. Variables: csv 파일 링크를 params 인자로 pass(v2_하드코딩됨) → config로 빼낼 수 있음 / Airflow 내에서 Key:Value 스토리지처럼 사용 → Key에 해당하는 Value를 읽어서 코드에서 사용
- Xcom
: 태스크들(즉, Operator) 간 데이터를 주고받을 때 제공되는 Airflow 기능
: ex) Operator의 리턴 → 이어서 다른 Operator의 입력(읽어감)으로 사용
: 함수 return value, 해당 함수의 task_id → 메타데이터DB에 저장 → 다음 task(Operator)에서 Pull
: 이 때 리턴값 클 경우 DB 오버헤드 → 작은 데이터에만 사용 or S3같은 큰 스토리지에 저장 + 해당 S3 링크를 return&Pull
3. 데모 (using CLI, Web UI)
- 깃허브 dags의 모든 파일 → airflow-setup의 dags 폴더 아래에 복사해오기
$ ls -tl -- 리스트 확인
$ git clone https://github.com/learndataeng/learn-airflow
# -r: recursive하게 상위 dir까지 복사 [src 경로] [dest 경로]
$ cp -r learn-ariflow/dags/* dags/
- Web UI (Airflow webserver 컨테이너에서 or localhost:8080으로 접속)
: Admin > Variables > csv_url(key):실제 S3 url(value) 생성
: Admin > Connections > id(redshift_dev_db), type(AWS Redshift or Postgres), host, user+pwd(할당받은) 설정
- CLI로 진행
# airflow-scheduler 컨테이너 ID 확인
$ docker ps
$ docker exec -it [컨테이너 ID] sh
# airflow-setup/dags와 sync됨
$ ls -tl dags
# DAG 리스트 확인 (Web UI의 리스트와 동일)
$ airflow dags list
$ airflow tasks list [DAG ID]
# variables 관련 명령어 확인
$ airflow variables
$ airflow variables get [등록해둔 key]
- PostgresHook autocommit 파라미터
: Redshift connection 정보를 Airflow Connections obj로 만듦 (환경설정으로 바꿔 하드코딩 방지) → id를 사용 (in v4, v5)
: for Redshift 접근
: autocommit의 default = False (자동으로 커밋하지 않음) → 모든 쓰기 작업을 명시적으로 COMMIT/END 해줘야 실제 물리적인 테이블에 씌여짐
- DAG 밑에 task를 몇 개를 놓을지
: 장단점이 각각 존재, 따라서 밸런스있게 조절
: (적게) 실패시 처음부터 다시 실행, 스케줄러에 부하 o
: (많이) readability 증가, 실패시 실패 지점부터 다시 실행 / 많은 수의 task들이 개별적으로 스케줄러에 의해 스케줄링, 스케줄러에 부하 o
: 재실행 이슈 발생 시 시간을 최대한 단축시킬 수 있도록 task의 수를 조절 !
- Airflow의 Variable 사용
: 내부에서 실행해야하는 SQL이 자주 수정돼야하는 경우 → Variable로 설정하기도 함
: 추적 용이성을 위해 코드로 기록 → 디버깅, 로깅으로 확인 가능
- airflow.cfg
: Airflow 자체의 환경이 저장되는 파일
4. API로 DAG 작성하기
- Yahoo Finance API
$ docker exec -it [airflow-scheduler 컨테이너 ID]
$ pip3 install yfinance
# DAG ID(UpdateSymbol)로 DAG 안의 task들 확인하기
$ airflow tasks list UpdateSymbol
# DAG 실행
$ airflow dags test [DAG ID] [날짜]
- Docker container에 루트 유저로 로그인
: 원래 airflow-scheduler 실행 시 (airflow) 유저
$ docker exec --user root -it [컨테이너 ID] sh
- Incremental Update 시 _create_table 함수 호출
1. drop first if exists: 이미 존재하는 테이블의 경우 삭제
2. create temp table: 임시 테이블 생성 (임시라서 에러 나도 별상관 x..)
3. create table if not exists: 만드려는 테이블이 존재하지 않을 경우 생성
: Incremental Update 순서
→ DAG가 최초 1회 실행 시에는 drop_first = False, 원본 테이블 최초 생성
→ 임시 테이블 생성 (CREATE TEMP TABLE)
→ 이후 Incremental Update 시 drop_first = Ture, 원본 테이블 삭제
→ (원본 테이블 삭제됐으니 _create_table 함수 내에서 create table if not exists 가 실행됨) 비어있는 원본 테이블이 생성되고 임시 테이블 내용 복사
'데브코스 > TIL' 카테고리의 다른 글
[TIL] 10주차_Day45: 데이터 파이프라인, Airflow (5) (0) | 2023.12.26 |
---|---|
[TIL] 10주차_Day44: 데이터 파이프라인, Airflow (4) (1) | 2023.12.23 |
[TIL] 10주차_Day42: 데이터 파이프라인, Airflow (2) (0) | 2023.12.20 |
[TIL] 10주차_Day41: 데이터 파이프라인, Airflow (1) (1) | 2023.12.19 |
[TIL] 8주차_Day35: 데이터 웨어하우스 관리, 고급 SQL, BI 대시보드 (5) (1) | 2023.12.01 |