최근에 2.9.0 버전이 나왔다는 소식을 들었다.

이 실습이 4개월 전에 진행되었는데 당시에는 2.7버전 정도였던 같았는데

엄청나게 빠른 속도로 개발되는걸 보니무시무시해진다...

기존에 정리했던 내용을 기반으로 설치 방법을 작성한다.


Airflow 설치 (Airflow 2.5.1)

직접 설치 (Docker)

  • 메타 데이터베이스:  Postgres
  • Airflow 설치 경로: /var/lib/airflow/
  • Airflow에는 3개의 어카운트가 사용된다.
    • ubuntu : 메인 어카운트
    • porstgres : root이외에 postgres 액세스를 위한 airflow 계정이 필요
    • airflow : airflow용 어카운트.

1. pip install (버전을 명시했습니다. 주의)

pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"

 

2. docker-compose.yaml 다운

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

 

3. 환경 변수와 dags 폴더 생성

  1. mkdir -p ./dags ./logs ./plugins
  2. echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  3. 만약 수동으로 할 경우에 AIRFLOW_UID=50000 를 .env에 붙여넣는다.

4. 초기화 후 실행

  1. docker compose up airflow-init
  2. docker compose up

Running Airflow in Docker — Airflow Documentation

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

EC2를 활용하여 설치 (Mac 이용)

EC2 & SSH

  • Key pair 설정 → 서버에 설치된다.
    • airflow-dev.pem 다운로드
  • 보안 그룹 설정 (24, 8080포트 오픈이 필요)
    • SSH traffic 설정
  • Instance를 누르고 private IPv4 address를 복사.
  • 터미널 오픈 후 SSH 커맨드를 사용
    • ssh - i airflow-dev.pem ubuntu@<복사한 IPv4 address>
    • 키 권한에러 발생 시 chmod 600 airflow-dev.pem ⇒ 권한을 줄여야한다.

접속 후 기본 세팅

  • sudo apt-get update 를 통해 apt-get 업데이트 진행
  • python3 세팅을 해주어야 한다.
  • openssl 버전 업그레이드
    • sudo pip3 install pyopenssl --upgrade

Airflow 모듈 설치

  • ubuntu에서 airflow를 사용하기 위한 유저 생성
    • sudo groupadd airflow
    • sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m

Postgresql

  • sudo apt-get install -y postgresql postgresql-contrib
  • 설치 완료가 되면 postgres라는 계정이 생성된다.
  • 계정 변경 : sudo su postgres
  • postgresql에서 사용할 airflow 계정을 만들어야한다. psql 로 접속
    • CREATE USER airflow PASSWORD 'airflow';
    • 전용 db 생성 : CREATE DATABASE airflow;
  • postrgresql 재시작하기 : sudo service postgresql restart

Airflow 계정 변환 후 설치

  • sudo su airflow
  • Home directory로 이동 : cd ~/
  • mkdir dags 로 dag들이 만들어질 폴더 생성
  • DB 초기화 : AIRFLOW_HOME = /var/lib/airflow airflow db init
  • airflow.cfg 파일설정
    • dags_folder 위치를 맞추기
    • executor를 LocalExecutor로 변경해준다. → Postgresql용 최적의 executor
    • Postgresql 설정
      • [database]의 sql_alchemy_conn을 변경해야한다.
      • sqlite를 postgresql+psycopg2: //airflow:airflow@localhost:5432/airflow 로 변경한다.
  • cfg 재설정 완료 후 DB 초기화를 다시 실행한다. AIRFLOW_HOME = /var/lib/airflow airflow db init

Airflow가 백그라운드에서 돌아가게 하기.

  • 웹서버를 서비스로 등록: sudo vi /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver -p 8080
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
  • 스케쥴러 서비스 등록
[Unit]
Description=Airflow scheduler
After=network.target

[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target

 

서비스 활성화

sudo systemctl daemon-reload
sudo systemctl enable airflow-webserver
sudo systemctl enable airflow-scheduler

# 서비스 시작
sudo systemctl start airflow-webserver
sudo systemctl start airflow-scheduler
  • Airflow webserver에 로그인 어카운트 생성하기
    • sudo su airflow
    • AIRFLOW_HOME=/var/lib/airflow airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password adminpassword
    • ubuntu에서 실행한 경우에 admin 계정을 지워야한다.
      • AIRFLOW_HOME=/var/lib/airflow airflow users delete --username admin
  • 웹서버 포트를 열어야한다.
    • EC2 > 보안그룹 > 인바운드 규칙에서 8080포트를 열어준다.

Airflow 기본 구조

  1. DAG를 대표하는 객체를 먼저 만든다.
  2. DAG를 구성하는 태스크들을 생성
  3. 태스크들간의 실행 순서를 결정한다.

DAG 설정 예제

  • default_args 설정 : 이곳에 지정되는 인자들은 모든 태스크들에 공통으로 적용된다.
    • owner
    • email
    • retries
    • retry_delay
from datetime import datetime,timedelta

default_args = {
	'owner' : 'poriz',
	'retries': 1,
	...
}

 

  • DAG 정의
    • catchup : 실행날짜 이전에 계획된 것이 있는 경우 True로 변경하면 실행된다.
    • schedule : cron 표현식을 사용한다.
dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

Bash Operator 예제

  • 3개의 태스크
  • t1은 현재시간을 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# default_args 설정
default_args{
	'owner' : 'poriz',
	'start_date' : datetime(2023,1,1,hour=0,minute=00),
...
}

# dag 정의
test_dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

# task 생성
t1 = BashOperator(
	task_id = 'print_date',
	# bash 명령어 사용
	bash_command='date',
	dag = test_dag)

t2 = BashOperator(
	task_id = 'sleep',
	bash_command='sleep 5',
	dag = test_dag)

t3 = BashOperator(
	task_id = 'ls',
	bash_command='ls /tmp',
	dag = test_dag)

t1 >> [t2,t3]
 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow - Data pipeline & airflow  (0) 2024.03.20

+ Recent posts