본문 바로가기

데브코스/TIL

[TIL] 10주차_Day43: 데이터 파이프라인, Airflow (3)

💡Today I Learned

  • 데이터 파이프라인과 Airflow에 대한 세 번째 수업을 진행했습니다.
  • DAG 작성 실습

1. Airflow 예제 프로그램 살펴보기

- PythonOperator

: 태스크 실행 (내부에 dag와 task 지정)

: python_callable = [파이썬으로 짠 task_함수명 지정] → PythonOperator의 entry 함수

: params = {딕셔너리 형태} → task 함수(즉, entry 함수)에 공통으로 적용되는 파라미터, task함수로 전달하고 싶은 인자들

 

- Airflow Decorators 사용하기

from airflow.decorators import task

@task
def print_hello():
	print("hello!")
    return "hello!"

: annotation

: @task 아래의 함수 = PythonOpertor 임을 알려줌

: 이 PythonOperator의 entry 함수가 @task로 decorate한 함수 자체

: 함수 정의 + 그 자체가 operator

: task_id를 따로 지정하지 않음 = 함수명 자체가 task_id (순서 지정할 때 함수이름 자체를 사용)

 

- DAG 파라미터 (*task param이 아님!)

1. max_active_runs: 한 번에 동시에 실행될 수 있는 DAG 인스턴스의 수

    ex) Daily incremental update를 하는 DAG의 Backfill시, (어떤 이슈로) 지난 1년간의 데이터를 다시 읽어와야할 때 → 한 번에 1개의 DAG만 실행된다면 365번 실행 → max_active_runs=30이면 약 12번 실행

2. max_active_tasks: 한 번에 동시에 실행될 수 있는 DAG에 속한 task의 수

3. catchup: DAG의 start_date이 지금 시점보다 과거일 때, DAG를 활성화 시켰을 때 그만큼 따라잡을지(default = True) 말지(False) *) Full Refresh는 의미 x / Incremental Update를 하는 DAG에서만 적용 o

 

: (1. 2.에 아무리 큰 값을 지정한다 해도) upper bound는 Airflow Worker 노드에 할당된 CPU 총합!!

 

 

2. Name Gender 예제 프로그램 Airflow로 포팅

- 예제 프로그램 개선 버전

1. v2: params를 통해 entry 함수로 입력 파라미터 전달하기, context 아래에 Airflow가 관리하는 시스템 변수 사용하기

# link = url 하드코딩 -> 'context' 딕셔너리 key 사용
def etl(**context):
    link = context["params"]["url"]
    
    # task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
    # https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
    # context 아래 시스템 변수(key) 사용
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)

    data = extract(link)
    lines = transform(data)
    load(lines)
    
    
task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    params = {
        'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    },
    dag = dag)

 

 

2. v3: Variable을 이용해 csv(=S3 url) param 넘김 + Xcom 사용해서 1개의 태스크(Operator) → 3개의 태스크(Operator)로

def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]
	
    # Xcom 사용해서 다른 Operator의 return 읽어오기(pull)
    	# context의 'task_instance'에서 함수 호출
    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    

# extract, transform, load 세 개의 Operator
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    
    # S3에 저장된 csv_url을 Variable의 key:value pair로 구성, .get()으로 받아오기
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

 

 

3. v4: PostgresHook 모듈로 Redshift connection 정보 받아오기 (+ task 실패 시 호출될 콜백함수 지정)

from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()
    

dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        
        # task 실패 시 호출될 콜백함수 지정
        # 'on_failure_callback': slack.on_failure_callback,
    }
)

 

 

4. v5: @task 데코레이터 사용해서 Operator 별도 지정 x → task 호출 시 함수 호출하듯

# 각각의 task 함수(e, t, l)에 @task annotation
# task_id는 task 함수의 이름
# 함수(**context) 대신 함수(입력 param) 으로 더 직관적

@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text
    
    
with DAG(
    # DAG 정의는 동일
) as dag:

    url = Variable.get("csv_url")
    schema = [본인 ID]
    table = 'name_gender'

	# task 호출 시 함수 호출하듯
    lines = transform(extract(url))
    load(schema, table, lines)

 

 

 

- Airflow Web UI

1. Connections: 데이터 웨어하우스(Redshift)와 connect시 연결 정보(hostname, 포트넘버, etc..)를 configuration 환경설정 형태로 코드 바깥으로 빼내줌, 하드코딩 방지

2. Variables: csv 파일 링크를 params 인자로 pass(v2_하드코딩됨→ config로 빼낼 수 있음 / Airflow 내에서 Key:Value 스토리지처럼 사용 → Key에 해당하는 Value를 읽어서 코드에서 사용

 

- Xcom

: 태스크들(즉, Operator) 간 데이터를 주고받을 때 제공되는 Airflow 기능

: ex) Operator의 리턴 → 이어서 다른 Operator의 입력(읽어감)으로 사용

: 함수 return value, 해당 함수의 task_id 메타데이터DB에 저장 → 다음 task(Operator)에서 Pull
: 이 때 리턴값 클 경우 DB 오버헤드 작은 데이터에만 사용 or S3같은 큰 스토리지에 저장 + 해당 S3 링크를 return&Pull

 

 

3. 데모 (using CLI, Web UI)

- 깃허브 dags의 모든 파일 → airflow-setup의 dags 폴더 아래에 복사해오기

$ ls -tl -- 리스트 확인
$ git clone https://github.com/learndataeng/learn-airflow

# -r: recursive하게 상위 dir까지 복사 [src 경로] [dest 경로]
$ cp -r learn-ariflow/dags/* dags/

 

- Web UI (Airflow webserver 컨테이너에서 or localhost:8080으로 접속)

: Admin > Variables > csv_url(key):실제 S3 url(value) 생성

: Admin > Connections > id(redshift_dev_db), type(AWS Redshift or Postgres), host, user+pwd(할당받은) 설정

 

- CLI로 진행

# airflow-scheduler 컨테이너 ID 확인
$ docker ps

$ docker exec -it [컨테이너 ID] sh

# airflow-setup/dags와 sync됨
$ ls -tl dags 

# DAG 리스트 확인 (Web UI의 리스트와 동일)
$ airflow dags list

$ airflow tasks list [DAG ID]

# variables 관련 명령어 확인
$ airflow variables
$ airflow variables get [등록해둔 key]

 

- PostgresHook autocommit 파라미터

: Redshift connection 정보를 Airflow Connections obj로 만듦 (환경설정으로 바꿔 하드코딩 방지) →  id를 사용 (in v4, v5)

: for Redshift 접근

: autocommit의 default = False (자동으로 커밋하지 않음) → 모든 쓰기 작업을 명시적으로 COMMIT/END 해줘야 실제 물리적인 테이블에 씌여짐

 

- DAG 밑에 task를 몇 개를 놓을지

: 장단점이 각각 존재, 따라서 밸런스있게 조절

: (적게) 실패시 처음부터 다시 실행, 스케줄러에 부하 o

: (많이) readability 증가, 실패시 실패 지점부터 다시 실행 / 많은 수의 task들이 개별적으로 스케줄러에 의해 스케줄링, 스케줄러에 부하 o

: 재실행 이슈 발생 시 시간을 최대한 단축시킬 수 있도록 task의 수를 조절 !

 

- Airflow의 Variable 사용

: 내부에서 실행해야하는 SQL이 자주 수정돼야하는 경우 → Variable로 설정하기도 함

: 추적 용이성을 위해 코드로 기록 → 디버깅, 로깅으로 확인 가능

 

- airflow.cfg

: Airflow 자체의 환경이 저장되는 파일

 

 

4. API로 DAG 작성하기

- Yahoo Finance API

$ docker exec -it [airflow-scheduler 컨테이너 ID]
$ pip3 install yfinance

# DAG ID(UpdateSymbol)로 DAG 안의 task들 확인하기
$ airflow tasks list UpdateSymbol

# DAG 실행
$ airflow dags test [DAG ID] [날짜]

 

- Docker container에 루트 유저로 로그인

: 원래 airflow-scheduler 실행 시 (airflow) 유저

$ docker exec --user root -it [컨테이너 ID] sh

 

- Incremental Update 시 _create_table 함수 호출

1. drop first if exists: 이미 존재하는 테이블의 경우 삭제

2. create temp table: 임시 테이블 생성 (임시라서 에러 나도 별상관 x..)

3. create table if not exists: 만드려는 테이블이 존재하지 않을 경우 생성

 

: Incremental Update 순서

→ DAG가 최초 1회 실행 시에는 drop_first = False, 원본 테이블 최초 생성

 임시 테이블 생성 (CREATE TEMP TABLE)

→ 이후 Incremental Update 시 drop_first = Ture, 원본 테이블 삭제

(원본 테이블 삭제됐으니 _create_table 함수 내에서 create table if not exists 가 실행됨) 비어있는 원본 테이블이 생성되고 임시 테이블 내용 복사

반응형