PythonOperator를 사용한 DAG

pythonOperator를 활용하여 쉽게 DAG를 작성 가능하다. 두가지 방법을 소개한다.

 

1. task decorator가 없는 방식

from airflow.operators.python import PythonOperator

t1 = PythonOperator(
	dag = dag,
	task_id='task_id',
	python_callable = python_func,
	params={
		'table':'test_table',
		'schema':'raw_data'
	},
)

# python function
def python_func(**cxt):
	table = cxt["params"]["table"]
	schema = cxt["params"]["schema"]
	ex_datge = cxt["execution_date"]

 

 

2. task decorator를 사용한 방식

from airflow.decorators import task

dag = DAG(
	dag_id = 'HelloWorld',
	...
)
# python functions
@task
def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
	dag_id = "HelloWorld",
	...
)as dag:

# taskID == function_name
print_hello() >> print_goodbye()

 

참고) 중요한 DAG 파라미터들
- max_active_runs : 동시에 수행 가능한 DAG의 수
- max_active_tasks : 동시에 수행되는 task들의 수
- catchup : DAG의 start_date 이전에 밀린 작업들의 수행 여부 (default = True)

 

Connections & Variables

두 기능 모두 Airflow의 webUI를 통해 쉽게 설정 가능하다. CLI로 진행하는 방법도 있습니다.

 

1. Connections

연결에 필요한 정보들을 환경설정의 형태로 코드 밖으로 빼내는 일을 수행한다.

 

Connection 설정 후에 연결 작업 수행이 가능하다. 아래 예시는PostgresHook을 사용하여 Redshift와 연결한 예제이다.

from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_Redshift_connection(autocommit=True):
		# Connection에서 수행한 연결의 id를 적어준다.
    hook = PostgresHook(postgres_conn_id='<Connection id>')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

 

2. Variables

변수 역시 환경설정으로 뺄 수 있다.

airflow를 key-value 스토리지 형태로 사용한다. from airflow.models import Variable코드로 사용가능하다.

 

Xcom

  • 태스크들간에 데이터를 주고 받기 위한 방식
  • Operator의 리턴값을 Operator에서 읽는 형태
  • 메타데이터 DB에 저장되기 때문에 큰 데이터는 불가능하다.
    • 큰 데이터는 S3등에 로드하고 위치를 넘기는 것이 일반적이다.
  • 예제 코드 (Extract->Transform->Load 순으로 진행된다.)
def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)
    
# extract의 리턴값이 transform의 입력으로 받아진다.
def transform(**context):
    logging.info("Transform started")
		# xcom은 params가 아니다. task_instance를 통해서 가져옴
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    ...
    return records
    
# Operator 정의
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)
transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    ... 
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    ...
    dag = dag)

extract >> transform >> load

 

CLI 환경에서 DAG 확인하기

: 작업을 하다보면 CLI환경에서 DAG들을 테스트하고, 작업을 수행해야하는 경우가 발생한다.

  • airflow의 컨테이너 로그인
    1. docker ps로 로그인 하려는 컨테이너의 id를 찾는다.
    2. 쉘 실행 : docker exec -it <container_id> sh
  • 쉘에서 유용한 명령들 : Scheduler에서 사용
    • DAG 출력 : airflow dags list
    • DAG안의 동작 task들을 출력 : airflow tasks list <DAG_id>
    • 변수 리스트 출력 : airflow variables list
    • 변수의 내용을 출력 : airflow variables get <variable>
    • DAG의 task list 확인 : airflow tasks list UpdateSymbol (dag-id)
    • cli에서 dag 실행 : airflow dags test UpdateSymbol 2023-05-30

Task의 수

  • task를 너무 많이 만드는 경우 전체 DAG 실행에 있어 너무 오래걸리게 된다.
  • task를 적게만들면 모듈화가 안되고, 실패 시에 재실행 시간이 오래걸린다.
  • 적절한 수의 task로 분할해서 사용하는 것이 중요하다.

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29
Airflow - Data pipeline & airflow  (0) 2024.03.20

최근에 2.9.0 버전이 나왔다는 소식을 들었다.

이 실습이 4개월 전에 진행되었는데 당시에는 2.7버전 정도였던 같았는데

엄청나게 빠른 속도로 개발되는걸 보니무시무시해진다...

기존에 정리했던 내용을 기반으로 설치 방법을 작성한다.


Airflow 설치 (Airflow 2.5.1)

직접 설치 (Docker)

  • 메타 데이터베이스:  Postgres
  • Airflow 설치 경로: /var/lib/airflow/
  • Airflow에는 3개의 어카운트가 사용된다.
    • ubuntu : 메인 어카운트
    • porstgres : root이외에 postgres 액세스를 위한 airflow 계정이 필요
    • airflow : airflow용 어카운트.

1. pip install (버전을 명시했습니다. 주의)

pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"

 

2. docker-compose.yaml 다운

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

 

3. 환경 변수와 dags 폴더 생성

  1. mkdir -p ./dags ./logs ./plugins
  2. echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  3. 만약 수동으로 할 경우에 AIRFLOW_UID=50000 를 .env에 붙여넣는다.

4. 초기화 후 실행

  1. docker compose up airflow-init
  2. docker compose up

Running Airflow in Docker — Airflow Documentation

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

EC2를 활용하여 설치 (Mac 이용)

EC2 & SSH

  • Key pair 설정 → 서버에 설치된다.
    • airflow-dev.pem 다운로드
  • 보안 그룹 설정 (24, 8080포트 오픈이 필요)
    • SSH traffic 설정
  • Instance를 누르고 private IPv4 address를 복사.
  • 터미널 오픈 후 SSH 커맨드를 사용
    • ssh - i airflow-dev.pem ubuntu@<복사한 IPv4 address>
    • 키 권한에러 발생 시 chmod 600 airflow-dev.pem ⇒ 권한을 줄여야한다.

접속 후 기본 세팅

  • sudo apt-get update 를 통해 apt-get 업데이트 진행
  • python3 세팅을 해주어야 한다.
  • openssl 버전 업그레이드
    • sudo pip3 install pyopenssl --upgrade

Airflow 모듈 설치

  • ubuntu에서 airflow를 사용하기 위한 유저 생성
    • sudo groupadd airflow
    • sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m

Postgresql

  • sudo apt-get install -y postgresql postgresql-contrib
  • 설치 완료가 되면 postgres라는 계정이 생성된다.
  • 계정 변경 : sudo su postgres
  • postgresql에서 사용할 airflow 계정을 만들어야한다. psql 로 접속
    • CREATE USER airflow PASSWORD 'airflow';
    • 전용 db 생성 : CREATE DATABASE airflow;
  • postrgresql 재시작하기 : sudo service postgresql restart

Airflow 계정 변환 후 설치

  • sudo su airflow
  • Home directory로 이동 : cd ~/
  • mkdir dags 로 dag들이 만들어질 폴더 생성
  • DB 초기화 : AIRFLOW_HOME = /var/lib/airflow airflow db init
  • airflow.cfg 파일설정
    • dags_folder 위치를 맞추기
    • executor를 LocalExecutor로 변경해준다. → Postgresql용 최적의 executor
    • Postgresql 설정
      • [database]의 sql_alchemy_conn을 변경해야한다.
      • sqlite를 postgresql+psycopg2: //airflow:airflow@localhost:5432/airflow 로 변경한다.
  • cfg 재설정 완료 후 DB 초기화를 다시 실행한다. AIRFLOW_HOME = /var/lib/airflow airflow db init

Airflow가 백그라운드에서 돌아가게 하기.

  • 웹서버를 서비스로 등록: sudo vi /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver -p 8080
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
  • 스케쥴러 서비스 등록
[Unit]
Description=Airflow scheduler
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target

 

서비스 활성화

sudo systemctl daemon-reload
sudo systemctl enable airflow-webserver
sudo systemctl enable airflow-scheduler

# 서비스 시작
sudo systemctl start airflow-webserver
sudo systemctl start airflow-scheduler
  • Airflow webserver에 로그인 어카운트 생성하기
    • sudo su airflow
    • AIRFLOW_HOME=/var/lib/airflow airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password adminpassword
    • ubuntu에서 실행한 경우에 admin 계정을 지워야한다.
      • AIRFLOW_HOME=/var/lib/airflow airflow users delete --username admin
  • 웹서버 포트를 열어야한다.
    • EC2 > 보안그룹 > 인바운드 규칙에서 8080포트를 열어준다.

Airflow 기본 구조

  1. DAG를 대표하는 객체를 먼저 만든다.
  2. DAG를 구성하는 태스크들을 생성
  3. 태스크들간의 실행 순서를 결정한다.

DAG 설정 예제

  • default_args 설정 : 이곳에 지정되는 인자들은 모든 태스크들에 공통으로 적용된다.
    • owner
    • email
    • retries
    • retry_delay
from datetime import datetime,timedelta

default_args = {
	'owner' : 'poriz',
	'retries': 1,
	...
}

 

  • DAG 정의
    • catchup : 실행날짜 이전에 계획된 것이 있는 경우 True로 변경하면 실행된다.
    • schedule : cron 표현식을 사용한다.
dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

Bash Operator 예제

  • 3개의 태스크
  • t1은 현재시간을 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# default_args 설정
default_args{
	'owner' : 'poriz',
	'start_date' : datetime(2023,1,1,hour=0,minute=00),
...
}

# dag 정의
test_dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

# task 생성
t1 = BashOperator(
	task_id = 'print_date',
	# bash 명령어 사용
	bash_command='date',
	dag = test_dag)

t2 = BashOperator(
	task_id = 'sleep',
	bash_command='sleep 5',
	dag = test_dag)

t3 = BashOperator(
	task_id = 'ls',
	bash_command='ls /tmp',
	dag = test_dag)

t1 >> [t2,t3]
 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow - Data pipeline & airflow  (0) 2024.03.20

+ Recent posts