PythonOperator를 사용한 DAG

pythonOperator를 활용하여 쉽게 DAG를 작성 가능하다. 두가지 방법을 소개한다.

 

1. task decorator가 없는 방식

from airflow.operators.python import PythonOperator

t1 = PythonOperator(
	dag = dag,
	task_id='task_id',
	python_callable = python_func,
	params={
		'table':'test_table',
		'schema':'raw_data'
	},
)

# python function
def python_func(**cxt):
	table = cxt["params"]["table"]
	schema = cxt["params"]["schema"]
	ex_datge = cxt["execution_date"]

 

 

2. task decorator를 사용한 방식

from airflow.decorators import task

dag = DAG(
	dag_id = 'HelloWorld',
	...
)
# python functions
@task
def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
	dag_id = "HelloWorld",
	...
)as dag:

# taskID == function_name
print_hello() >> print_goodbye()

 

참고) 중요한 DAG 파라미터들
- max_active_runs : 동시에 수행 가능한 DAG의 수
- max_active_tasks : 동시에 수행되는 task들의 수
- catchup : DAG의 start_date 이전에 밀린 작업들의 수행 여부 (default = True)

 

Connections & Variables

두 기능 모두 Airflow의 webUI를 통해 쉽게 설정 가능하다. CLI로 진행하는 방법도 있습니다.

 

1. Connections

연결에 필요한 정보들을 환경설정의 형태로 코드 밖으로 빼내는 일을 수행한다.

 

Connection 설정 후에 연결 작업 수행이 가능하다. 아래 예시는PostgresHook을 사용하여 Redshift와 연결한 예제이다.

from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_Redshift_connection(autocommit=True):
		# Connection에서 수행한 연결의 id를 적어준다.
    hook = PostgresHook(postgres_conn_id='<Connection id>')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

 

2. Variables

변수 역시 환경설정으로 뺄 수 있다.

airflow를 key-value 스토리지 형태로 사용한다. from airflow.models import Variable코드로 사용가능하다.

 

Xcom

  • 태스크들간에 데이터를 주고 받기 위한 방식
  • Operator의 리턴값을 Operator에서 읽는 형태
  • 메타데이터 DB에 저장되기 때문에 큰 데이터는 불가능하다.
    • 큰 데이터는 S3등에 로드하고 위치를 넘기는 것이 일반적이다.
  • 예제 코드 (Extract->Transform->Load 순으로 진행된다.)
def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)
    
# extract의 리턴값이 transform의 입력으로 받아진다.
def transform(**context):
    logging.info("Transform started")
		# xcom은 params가 아니다. task_instance를 통해서 가져옴
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    ...
    return records
    
# Operator 정의
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)
transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    ... 
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    ...
    dag = dag)

extract >> transform >> load

 

CLI 환경에서 DAG 확인하기

: 작업을 하다보면 CLI환경에서 DAG들을 테스트하고, 작업을 수행해야하는 경우가 발생한다.

  • airflow의 컨테이너 로그인
    1. docker ps로 로그인 하려는 컨테이너의 id를 찾는다.
    2. 쉘 실행 : docker exec -it <container_id> sh
  • 쉘에서 유용한 명령들 : Scheduler에서 사용
    • DAG 출력 : airflow dags list
    • DAG안의 동작 task들을 출력 : airflow tasks list <DAG_id>
    • 변수 리스트 출력 : airflow variables list
    • 변수의 내용을 출력 : airflow variables get <variable>
    • DAG의 task list 확인 : airflow tasks list UpdateSymbol (dag-id)
    • cli에서 dag 실행 : airflow dags test UpdateSymbol 2023-05-30

Task의 수

  • task를 너무 많이 만드는 경우 전체 DAG 실행에 있어 너무 오래걸리게 된다.
  • task를 적게만들면 모듈화가 안되고, 실패 시에 재실행 시간이 오래걸린다.
  • 적절한 수의 task로 분할해서 사용하는 것이 중요하다.

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29
Airflow - Data pipeline & airflow  (0) 2024.03.20

최근에 2.9.0 버전이 나왔다는 소식을 들었다.

이 실습이 4개월 전에 진행되었는데 당시에는 2.7버전 정도였던 같았는데

엄청나게 빠른 속도로 개발되는걸 보니무시무시해진다...

기존에 정리했던 내용을 기반으로 설치 방법을 작성한다.


Airflow 설치 (Airflow 2.5.1)

직접 설치 (Docker)

  • 메타 데이터베이스:  Postgres
  • Airflow 설치 경로: /var/lib/airflow/
  • Airflow에는 3개의 어카운트가 사용된다.
    • ubuntu : 메인 어카운트
    • porstgres : root이외에 postgres 액세스를 위한 airflow 계정이 필요
    • airflow : airflow용 어카운트.

1. pip install (버전을 명시했습니다. 주의)

pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"

 

2. docker-compose.yaml 다운

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

 

3. 환경 변수와 dags 폴더 생성

  1. mkdir -p ./dags ./logs ./plugins
  2. echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  3. 만약 수동으로 할 경우에 AIRFLOW_UID=50000 를 .env에 붙여넣는다.

4. 초기화 후 실행

  1. docker compose up airflow-init
  2. docker compose up

Running Airflow in Docker — Airflow Documentation

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

EC2를 활용하여 설치 (Mac 이용)

EC2 & SSH

  • Key pair 설정 → 서버에 설치된다.
    • airflow-dev.pem 다운로드
  • 보안 그룹 설정 (24, 8080포트 오픈이 필요)
    • SSH traffic 설정
  • Instance를 누르고 private IPv4 address를 복사.
  • 터미널 오픈 후 SSH 커맨드를 사용
    • ssh - i airflow-dev.pem ubuntu@<복사한 IPv4 address>
    • 키 권한에러 발생 시 chmod 600 airflow-dev.pem ⇒ 권한을 줄여야한다.

접속 후 기본 세팅

  • sudo apt-get update 를 통해 apt-get 업데이트 진행
  • python3 세팅을 해주어야 한다.
  • openssl 버전 업그레이드
    • sudo pip3 install pyopenssl --upgrade

Airflow 모듈 설치

  • ubuntu에서 airflow를 사용하기 위한 유저 생성
    • sudo groupadd airflow
    • sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m

Postgresql

  • sudo apt-get install -y postgresql postgresql-contrib
  • 설치 완료가 되면 postgres라는 계정이 생성된다.
  • 계정 변경 : sudo su postgres
  • postgresql에서 사용할 airflow 계정을 만들어야한다. psql 로 접속
    • CREATE USER airflow PASSWORD 'airflow';
    • 전용 db 생성 : CREATE DATABASE airflow;
  • postrgresql 재시작하기 : sudo service postgresql restart

Airflow 계정 변환 후 설치

  • sudo su airflow
  • Home directory로 이동 : cd ~/
  • mkdir dags 로 dag들이 만들어질 폴더 생성
  • DB 초기화 : AIRFLOW_HOME = /var/lib/airflow airflow db init
  • airflow.cfg 파일설정
    • dags_folder 위치를 맞추기
    • executor를 LocalExecutor로 변경해준다. → Postgresql용 최적의 executor
    • Postgresql 설정
      • [database]의 sql_alchemy_conn을 변경해야한다.
      • sqlite를 postgresql+psycopg2: //airflow:airflow@localhost:5432/airflow 로 변경한다.
  • cfg 재설정 완료 후 DB 초기화를 다시 실행한다. AIRFLOW_HOME = /var/lib/airflow airflow db init

Airflow가 백그라운드에서 돌아가게 하기.

  • 웹서버를 서비스로 등록: sudo vi /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver -p 8080
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
  • 스케쥴러 서비스 등록
[Unit]
Description=Airflow scheduler
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target

 

서비스 활성화

sudo systemctl daemon-reload
sudo systemctl enable airflow-webserver
sudo systemctl enable airflow-scheduler

# 서비스 시작
sudo systemctl start airflow-webserver
sudo systemctl start airflow-scheduler
  • Airflow webserver에 로그인 어카운트 생성하기
    • sudo su airflow
    • AIRFLOW_HOME=/var/lib/airflow airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password adminpassword
    • ubuntu에서 실행한 경우에 admin 계정을 지워야한다.
      • AIRFLOW_HOME=/var/lib/airflow airflow users delete --username admin
  • 웹서버 포트를 열어야한다.
    • EC2 > 보안그룹 > 인바운드 규칙에서 8080포트를 열어준다.

Airflow 기본 구조

  1. DAG를 대표하는 객체를 먼저 만든다.
  2. DAG를 구성하는 태스크들을 생성
  3. 태스크들간의 실행 순서를 결정한다.

DAG 설정 예제

  • default_args 설정 : 이곳에 지정되는 인자들은 모든 태스크들에 공통으로 적용된다.
    • owner
    • email
    • retries
    • retry_delay
from datetime import datetime,timedelta

default_args = {
	'owner' : 'poriz',
	'retries': 1,
	...
}

 

  • DAG 정의
    • catchup : 실행날짜 이전에 계획된 것이 있는 경우 True로 변경하면 실행된다.
    • schedule : cron 표현식을 사용한다.
dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

Bash Operator 예제

  • 3개의 태스크
  • t1은 현재시간을 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# default_args 설정
default_args{
	'owner' : 'poriz',
	'start_date' : datetime(2023,1,1,hour=0,minute=00),
...
}

# dag 정의
test_dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

# task 생성
t1 = BashOperator(
	task_id = 'print_date',
	# bash 명령어 사용
	bash_command='date',
	dag = test_dag)

t2 = BashOperator(
	task_id = 'sleep',
	bash_command='sleep 5',
	dag = test_dag)

t3 = BashOperator(
	task_id = 'ls',
	bash_command='ls /tmp',
	dag = test_dag)

t1 >> [t2,t3]
 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow - Data pipeline & airflow  (0) 2024.03.20

데이터 파이프라인과 Airflow에 대해서 간단하게 소개한다.

데이터 파이프라인이란

ETL

: Extract, Transform, Load ⇒ Data Pipeline, ETL, Data Workflow, DAG이라 부르기도 한다.

  • 데이터를 웨어하우스 외부에서 내부로 가져오는 프로세스
  • DAG : (Directed Acyclic Graph) : 루프가 존재하지 않는 비순환 방향 그래프.

ELT

: 데이터 웨어하우스 내부의 데이터를 조작해서 새로운 데이터를 생성하는 프로세스

  • 데이터 레이크 위에서 이루어지기도 한다.
  • dbt : Data Build Tool, ELT 전용 기술들이 존재.
  • 데이터 레이크 : 스토리지
    • 모든 데이터를 원래의 형태로 보존하는 스토리지에 가깝다.
  • 데이터 웨어하우스
    • 보존 기한이 있는 구조화된 데이터를 저장하고 처리하는 스토리지
    • BI 툴들의 백엔드로 많이 사용된다.

데이터 파이프라인

  • 데이터를 소스로부터 목적지로 복사하는 작업
    • ETL, ELT 모두 가능하다.
    • 데이터의 목적지가 웨어하우스가 대부분이나, 다른 시스템인 경우도 있다.
  • Summary/Report Jobs
    • 리포트나, 요약본 형태로 테이블을 다시 만들기도 한다.
  • Production Data Jobs
    • 프로덕션 환경에서 필요에 의한 경우에 역으로 쓰기도 한다.
    • 예) 실시간 정보 표출을 위한 경우에 사용되기도 한다.

고려할 점

  • 파이프라인의 수가 늘어나면 유지보수 비용이 늘어남.
    • 데이터 소스간의 의존도가 생기는 경우
    • 각 파이프라인간의 이해도 부족
  • 좋은 방법들
    • Full Refresh : 매번 통으로 복사해서 테이블을 만들기
      • 불가능한 경우에는 업데이트 방식으로 레코드들을 읽도록 수행 (Incremental update)
    • 멱등성이 보장되어야 한다. : 동일한 입력 데이터로 다수의 파이프라인을 실행해도 최종 테이블의 내용이 변하지 않아야한다.
    • 실패한 파이프라인의 재실행이 쉬워야한다.
    • 파이프라인의 입출력을 명확하게 하고, 문서화 해야한다.
      • 데이터 요청을 명시하기.
    • 주기적으로 쓸모없는 데이터들을 삭제해야한다.
    • 사고가 발생한 경우에 리포트를 쓰기 → 원인을 파악해야한다.
    • 중요한 파이프라인의 입출력을 체크해야한다.
      • 레코드의 수를 체크
      • PK uniqueness 보장
      • 중복 체크

Airflow

소개

  • Python으로 작성된 데이터 파이프라인 프레임워크
  • 데이터 파이프라인 스케쥴링을 지원한다.
  • 데이터 파이프라인을 DAG라 부른다.

구성

  • 컴포넌트
    1. 웹 서버
    2. 스케쥴러 : DAG들을 워커들에게 배정하는 역할을 수행
    3. 워커 : 실제로 DAG를 실행하는 역할을 수행
    4. 메타 데이터 데이터베이스
      1. Sqlite가 기본이다.
      2. 일반적으로는 mysql, Postgres같은 다른 데이터베이스를 사용한다. (성능상의 이유)
    5. 큐 (다수 서버 구성에서 사용된다.)
      1. Executor가 달라진다.
  • 스케일링 방법
    • 스케일 업 : 더 좋은 사양의 서버 사용
    • 스케일 아웃 : 서버 추가
  • Airflow 개발의 장단점
    • 장점
      • 데이터 파이프라인의 세밀한 제어가 가능하다.
      • 백필이 쉽다.
      • 다양한 데이터 소스와 웨어하우스를 지원한다.
    • 단점
      • 개발 환경을 구성하기 쉽지 않다.
      • 직접 운영이 쉽지않아, 클라우드 버전 사용이 선호된다.

 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29

+ Recent posts