최근에 2.9.0 버전이 나왔다는 소식을 들었다.
이 실습이 4개월 전에 진행되었는데 당시에는 2.7버전 정도였던 같았는데
엄청나게 빠른 속도로 개발되는걸 보니무시무시해진다...
기존에 정리했던 내용을 기반으로 설치 방법을 작성한다.
Airflow 설치 (Airflow 2.5.1)
직접 설치 (Docker)
- 메타 데이터베이스: Postgres
- Airflow 설치 경로: /var/lib/airflow/
- Airflow에는 3개의 어카운트가 사용된다.
- ubuntu : 메인 어카운트
- porstgres : root이외에 postgres 액세스를 위한 airflow 계정이 필요
- airflow : airflow용 어카운트.
1. pip install (버전을 명시했습니다. 주의)
pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"
2. docker-compose.yaml 다운
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
3. 환경 변수와 dags 폴더 생성
- mkdir -p ./dags ./logs ./plugins
- echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
- 만약 수동으로 할 경우에 AIRFLOW_UID=50000 를 .env에 붙여넣는다.
4. 초기화 후 실행
- docker compose up airflow-init
- docker compose up
Running Airflow in Docker — Airflow Documentation
Running Airflow in Docker — Airflow Documentation
airflow.apache.org
EC2를 활용하여 설치 (Mac 이용)
EC2 & SSH
- Key pair 설정 → 서버에 설치된다.
- airflow-dev.pem 다운로드
- 보안 그룹 설정 (24, 8080포트 오픈이 필요)
- SSH traffic 설정
- Instance를 누르고 private IPv4 address를 복사.
- 터미널 오픈 후 SSH 커맨드를 사용
- ssh - i airflow-dev.pem ubuntu@<복사한 IPv4 address>
- 키 권한에러 발생 시 chmod 600 airflow-dev.pem ⇒ 권한을 줄여야한다.
접속 후 기본 세팅
- sudo apt-get update 를 통해 apt-get 업데이트 진행
- python3 세팅을 해주어야 한다.
- wget https://bootstarp.pypa.io/get-pip.py
- sudo python3 get-pip.py
- sudo apt-get install -y python3-pip
- openssl 버전 업그레이드
- sudo pip3 install pyopenssl --upgrade
Airflow 모듈 설치
- mySQL 모듈 업그레이드
- sudo apt-get install -y libmysqlclient-dev
- Airflow 모듈 설치 (airflow: 2.5.1, modules : celery,amazon,mysql,postgres)
- pip install "apache-airflow[celery,amazon,mysql,postgres]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt" 위 코드를 활용해서 버전과 모듈을 설치한다.
- ubuntu에서 airflow를 사용하기 위한 유저 생성
- sudo groupadd airflow
- sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
Postgresql
- sudo apt-get install -y postgresql postgresql-contrib
- 설치 완료가 되면 postgres라는 계정이 생성된다.
- 계정 변경 : sudo su postgres
- postgresql에서 사용할 airflow 계정을 만들어야한다. psql 로 접속
- CREATE USER airflow PASSWORD 'airflow';
- 전용 db 생성 : CREATE DATABASE airflow;
- postrgresql 재시작하기 : sudo service postgresql restart
Airflow 계정 변환 후 설치
- sudo su airflow
- Home directory로 이동 : cd ~/
- mkdir dags 로 dag들이 만들어질 폴더 생성
- DB 초기화 : AIRFLOW_HOME = /var/lib/airflow airflow db init
- airflow.cfg 파일설정
- dags_folder 위치를 맞추기
- executor를 LocalExecutor로 변경해준다. → Postgresql용 최적의 executor
- Postgresql 설정
- [database]의 sql_alchemy_conn을 변경해야한다.
- sqlite를 postgresql+psycopg2: //airflow:airflow@localhost:5432/airflow 로 변경한다.
- cfg 재설정 완료 후 DB 초기화를 다시 실행한다. AIRFLOW_HOME = /var/lib/airflow airflow db init
Airflow가 백그라운드에서 돌아가게 하기.
- 웹서버를 서비스로 등록: sudo vi /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver
After=network.target
[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver -p 8080
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
- 스케쥴러 서비스 등록
[Unit]
Description=Airflow scheduler
After=network.target
[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
서비스 활성화
sudo systemctl daemon-reload
sudo systemctl enable airflow-webserver
sudo systemctl enable airflow-scheduler
# 서비스 시작
sudo systemctl start airflow-webserver
sudo systemctl start airflow-scheduler
- Airflow webserver에 로그인 어카운트 생성하기
- sudo su airflow
- AIRFLOW_HOME=/var/lib/airflow airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password adminpassword
- ubuntu에서 실행한 경우에 admin 계정을 지워야한다.
- AIRFLOW_HOME=/var/lib/airflow airflow users delete --username admin
- 웹서버 포트를 열어야한다.
- EC2 > 보안그룹 > 인바운드 규칙에서 8080포트를 열어준다.
Airflow 기본 구조
- DAG를 대표하는 객체를 먼저 만든다.
- DAG를 구성하는 태스크들을 생성
- 태스크들간의 실행 순서를 결정한다.
DAG 설정 예제
- default_args 설정 : 이곳에 지정되는 인자들은 모든 태스크들에 공통으로 적용된다.
- owner
- retries
- retry_delay
from datetime import datetime,timedelta
default_args = {
'owner' : 'poriz',
'retries': 1,
...
}
- DAG 정의
- catchup : 실행날짜 이전에 계획된 것이 있는 경우 True로 변경하면 실행된다.
- schedule : cron 표현식을 사용한다.
dag = DAG(
"dag_v1" # DAG name
start_date= datetime(2020,1,1,hour=0,minute=00),
schedule = "0 * * * *"
catchup = False
)
Bash Operator 예제
- 3개의 태스크
- t1은 현재시간을 출력
- t2는 5초간 대기 후 종료
- t3는 서버의 /tmp 디렉토리의 내용 출력
- t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# default_args 설정
default_args{
'owner' : 'poriz',
'start_date' : datetime(2023,1,1,hour=0,minute=00),
...
}
# dag 정의
test_dag = DAG(
"dag_v1" # DAG name
start_date= datetime(2020,1,1,hour=0,minute=00),
schedule = "0 * * * *"
catchup = False
)
# task 생성
t1 = BashOperator(
task_id = 'print_date',
# bash 명령어 사용
bash_command='date',
dag = test_dag)
t2 = BashOperator(
task_id = 'sleep',
bash_command='sleep 5',
dag = test_dag)
t3 = BashOperator(
task_id = 'ls',
bash_command='ls /tmp',
dag = test_dag)
t1 >> [t2,t3]
'Data Engineering > Airflow' 카테고리의 다른 글
Airflow > MySQL, Backfill (1) | 2024.05.02 |
---|---|
Airflow > Time, Pk uniqueness, Backfill (1) | 2024.05.01 |
Airflow > airflow.cfg (0) | 2024.05.01 |
Airflow > PythonOperator & 예제 (0) | 2024.04.29 |
Airflow - Data pipeline & airflow (0) | 2024.03.20 |