Jinja Template

: python에서 널리 사용되는 템플릿 엔진이다. Django 템플릿 엔진에서 영감을 받아 개발되었다고 한다. 현재는 Flask에서 많이 사용된다고 한다.

  • 변수를 이중 중괄호로 감싸 사용 <h1>name: {{name}}<h1>
  • 제어문은 퍼센트 기호를 사용한다.
# 반복문 예시
{% for item in items %}
	...
{% endfor %}

Jinja + Airflow

: 작업 이름, 파라미터 또는 SQL 쿼리와같은 작업 매개변수를 템플릿화된 문자열로 정의 할 수 있다.

  • execution_date를 코드 내에서 쉽게 사용 가능하다. : {{ ds }}
  • BashOperator에서의 사용 방식 → bash_command에서 jinja사용 가능
# params를 활용해서 사용도 가능하다.
task = BashOperator(
	...
	bash_command='echo "test,{{params.name}}"",
	params={'name':'pori'},
	dag=dag
)
  • 각 Operator에서 어떤 Parameters에 jinja가 사용가능한가?
    : docs를 확인해서 다음과 같이 Parameters 뒤에 (templated)가 적혀있는 경우에 사용가능하다.

Operators — Airflow Documentation

{{ ds }}연도-월-일
{{ ds_nodash }} 대시없이 ds 출력
{{ ts }} 연도-월-일-시-분-초
{{ dag }} dag이름
{{ task }} task에 대한 정보
{{ dag_run }}  
{{ var.value}}: {{ var.value.get(’my.var’, ‘fallback’) }} Variable 읽어오기 (value)
{{ var.json }}: {{ var.json.my_dict_var.key1 }} Variable 읽어오기 (json)
{{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password } Connection 생성

Dag를 실행하는 방법

  • 주기적 실행 방법: schedule을 지정해서 수행하기
  • 다른 Dag에 의해 트리거하기
    • Explicit Trigger: DAG A가 명시적으로 DAG B를 트리거 → TriggerDagOperator
    • Reactive Trigger: B가 A가 끝나기를 대기, A는 이를 알지 못한다. → ExternalTaskSensor

TriggerDagRunOperator

  • jinja 사용가능 params : trigger_dag_id, conf, execution_date
  • conf : 다음 DAG에 넘기고 싶은 정보. conf = { ‘name’: ‘pori’ }
    • 다음 DAG에서 jinja로 접근하기: {{ dag_run.conf[”name”] }}
    • PythonOperator에서 접근: kwargs['dag_run'].conf.get('name')
  • 참고: airflow.cfg의 dag_run_conf_overrides_params가 True로 되어야한다.
# TriggerDagRunOperator
trigger_task = TriggerDagRunOperator(
	...
	conf = {...}
	execution_date = '{{ ds }}'
	...
	dag=dag
)
# targetDAG
task1 = BashOperator(
	...
	bash_command ="""echo '{{ds}}, {{ dag_run.conf.get("name","none")' """
)

Sensor

: 특정 조건이 충족될 때까지 대기하는 Operator, 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용하게 사용된다.

  • 내장 Sensor
    • FileSensor: 지정된 위치에 파일이 생길 때까지 대기
    • HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
    • SqlSensor: SQL DB에서 특정 조건을 충족할 때까지 대기
    • TimeSensor: 특정 시간에 도달할 때까지 워크플로를 일시중지
    • ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
  • 주기적인 poke : mode를 사용해서 방법을 선택한다.
    • poke: 주기적으로 체크하기, 하나의 worker에서 전담해서 체크 → 체크 주기가 명확해진다.
    • reschedule: worker를 릴리스하고, 다시 잡아서 poke수행 → 상황에 따라서 worker를 잡는것이 힘들 수 있다.

ExternalTaskSensor

: 앞의 DAG의 특정 Task가 완료되었는지를 확인한다.

  • execution date, schedule interval이 동일해야한다.
  • 서로 다른 interval을 갖는 경우에는 execution_date_fn을 사용가능하다.
  • 제약 조건이 까다로워 실제 사용하는 경우는 드물다고 한다.

BranchPythonOperator

: 상황에 따라 뒤에서 실행되어야할 태스크를 동적으로 결정해주는 Operator

  • 미리 정해둔 Operator중에 선택하는 형태
  • Learn_BranchPythonOperator.py
# 조건을 걸어줄 함수를 생성
def decide_branch(**context):
	current_hour = datetime.now().hour
	if current_hour < 12:
		return 'morning_task'
	else:
		return 'afternoon_task'

# BranchPythonOperator정의, python함수를 호출한다.
branching_operator = BranchPythonOperator(
	...
	python_callable=decide_branch,
	dag=dag
)
# branch의 결과에 따라서 실행되는 operator들
morning_task = EmptyOperator(
	task_id='morning_task'
)
afternoon_task= EmptyOperator(
	task_id='afternoon_task'
)

# 실행 순서 설정
branching_operator >> morning_task
branching_operator >> afternoon_task
  • 실행되지 않는 task의 경우 skipped로 된다.

LatestOnlyOperator

Time-sensitive한 task들이 과거의 backfill시 실행되는 것을 막기 위해 사용된다.
현재 시간이 execution_date보다 미래이고, 다음execution_date보다 과거인 경우에만 실행을 이어가고 아니면 중단된다. → 현재보다 과거의 경우에는 중단!

  • 시간에 영향을 많이 받는 task들 앞에 사용하여 flow를 중단하는 역할을 수행한다.
  • catchup=True 인 경우에 유용하게 사용 가능하다.
from airflow.operators.latest_only import LatestOnlyOperator

with DAG(
	...
	t1 = EmptyOperator(task_id='task1')
	t2 = LatestOnlyOperator(task_id='latest_only')
	t3 = EmptyOperator(task_id='task3')
	t4 = EmptyOperator(task_id='task4')
)

t1 >> t2 >> [t3,t4]

Trigger Rule

: Upstream task의 상황에 따라서 뒷단의 task의 실행 여부를 결정하기위해 사용

  • trigger_rule 파라미터를 이용해서 결정 가능하다.
    • 가능한 값들 : ALL_SUCCESS (default), ALL_FAILED, ALL_DONE, ONE_FAILED, ONE_SUCCESS, NONE_FAILED, NONE_FAILED_MIN_ONE_SUCCESS
    • task의 상태는 success, fail, skip 3가지 상태가 존재하는 것에 유의해야한다.
    • airflow.utils.trigger_rule.TriggerRule을 가져와서 사용해야한다.
# 예시 task1,2가 모두 성공해야 task3가 실행된다.
from airflow.utils.trigger_rule import TriggerRule

...
t1 = BashOperator(...)
t2 = BashOperator(...)
t3 = BashOperator(
	...
	trigger_rule=TriggerRule.ALL_DONE
)
[t1,t2] >> t3

Task Grouping

: task들을 성격에 따라서 관리하는 경우에 용이하다.

  • 다수의 파일 처리하는 DAG의 경우
    • 파일 다운로드, 파일 체크, 데이터 처리 ⇒ 3개의 태스크로 구성가능하다.
  • TaskGroup 안에 TaskGroup nesting이 가능하다.
    • TaskGroup도 실행 순서 정의가 가능하다.
from airflow.utils.task_group import TaskGroup

start = EmptyOperator(task_id="start")

with TaskGroup("Download", tooltip="Tasks for downloading daga") as section_1:
	task1 = ...
	task2 = ...
	...
	task_1 >> task2

	# nesting
	with TaskGroup(...) as inner_serction_2:
		...

start >> section_1 >> 

Dynamic Dags

: 템플릿과 yaml을 기반으로 dag를 동적으로 만드려는 것. 비슷한 dag를 계속해서 매뉴얼하게 개발하는 것을 방지한다.

  • 오너가 다르거나, 태스크의 수가 많아지는 경우에는 DAG를 복제하는 것이 좋다.
  • 동작 구조: .yaml → template & generator → DAGs

예제

  • yml
# config_appl.yml

dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
  • template.jinja2
# templated_dag.jinja2

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_{{ dag_id }}",
	start_date = ...
	schedule = '{{ schedule }}",
	catchup = {{ catchup or True }} # catchup을 사용하거나 값이 없으면 True로 설정
) as dag:

@task
def extract(symlbol):
	return symbol
@task
def process(symbol):
	return symbol

@task
def store(symbol):
		return symbol

store(process(extract("{{ symbol }}")))
  • generator
# generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os

# 현재 실행중인 파일의 폴더의 절대 경로를 반환한다.
fire_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')

for f in os.listdir(file_dir): # 파일 디렉토리 내에서 모든 파일 읽어오기
	if f.endswith(".yml"): # yml파일 읽기
		with open(f"{file_dir}/{f}","r") as cf: #yml 파일을 읽기모드로 열기
			config = yaml.safe_load(cf)
			with open(f"dags/get_price{config['dag_id']}.py","w") as f:  # 쓰기모드로 dag 생성
				# yml로 읽은 것을 template에 render 한 후 파일에 쓰는 작업 수행
				f.write(template.render(config))

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29

Mysql 복사하기

: Production MySQL Table(OLTP)의 prod.nps 를 AWS Redshift의 raw_data.nps에 전달한다.(OLAP)

AWS 권한 설정

  • Airflow DAG에서 S3 접근 권한 (쓰기권한)
    • IAM User를 생성, acceess key와 secret key를 사용
  • Redshift가 S3 접근 (읽기 권한)
    • S3접근 Role을 만들고 Redshift에 지정해야한다.

MySQL Connection 설정 시 유의 사항

  • MySQLdb에러를 피하기 위해서 다음과 같은 명령어를 적용해야한다.
    • Airflow Scheduler에 다음 명령어들을 통해 설치 진행
      - sudo apt-get install -y default-libmysqlclient-dev
      - sudo apt-get install -y gcc
      - sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

MySQL & Redshfit 테이블 리뷰

-- Mysql
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);

-- Redshift
CREATE TABLE (본인의스키마).nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);

MySQL_to_Redshift DAG의 Task 구성

  • SqlToS3Operator
    • mysql의 결과를 s3에 넣어준다.
  • S3ToRedshiftOperator
    • S3 → Redshfit, COPY 명령어 사용
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

Incremental Update 방식

  • MySQL, PostgreSQL 테이블이면 created→modified→deleted로 수행된다.
    • deleted 시에는 레코드를 삭제하지 않고, deleted를 True로 설정한다.
  • ROW_NUMBER로 구현하기도함.
    • A테이블의 내용을 temp_A로 복사
    • A 테이블의 레코드 중 modified의 날짜가 지난 일에 해당하는 모든 레코드를 읽어다가 temp_A로 복사 (execution_date)
      SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    • temp_A의 레코드들을 pk를 기준으로 파티션 후 modified 값을 기준으로 DESC정렬해서 일련번호가 1인것들만 복사
  • S3ToRedshiftOperator로 구현
    • query parameter : SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    • method : UPSERT → UPSERT KEYs로 지정이된 PK를 기준으로 값이 같은 레코드들은 복사되는 값으로 대체되고, 없는 값은 추가된다.
    • upsert_keys로 pk를 지정, 리스트 형태
# SqlToS3Operator
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)
# S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

Backfill 실행해보기

  • `airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
    • catchUp이 True여야함
    • execution_date를 사용해서 incremental update가 구현되어야한다.
    • 실행순서가 랜덤이기 때문에 DAG defaul_args 의 depends_on_past를 True로 설정해야한다.
    • start_date부터 시작하지만, end_date는 포함되지 않는다.

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > Jinja & DAG Dependencies  (0) 2024.05.11
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29

Airflow & Timezone

  • airflow.cfg에는 두 종류의 타임존 키가 존재한다.
    1. default_timezone
    2. default_ui_timezone
  • start_date, end_date, schedule : default_timezone을 따른다.
  • execution_date, log_time : 항상 UTC를 따른다.

execution_date

dag가 실행되어야하는 기대값, 주문번호에 가깝다는 이야기가 많다.

DAG가 실행되어야하는 시간 값을 변수로 사용하고자 할 때 많이 사용한 것 같다.

특히 과거의 task를 재실행하는 경우에 유용했다.

예를 들어본다면 2020-01-01에 실행되는 DAG가 있다. 이 DAG는 두 task로 이루어지는데

하나는 오후 6시에 데이터를 수집하고 다른 하나는 자정이 넘어가 다음날이 되어서야 실행된다. 그런데 자정이 넘어가서 실행되는 task에는 파일 저장 로직이 담겨있다. 그리고 파일명에는 데이터 수집일자가 적혀있어야한다.

이 때 execution_date를 사용하면 DAG가 실행되는 날짜가 담기기 때문에 수집일자인 2020-01-01 18:00:00을 담을 수 있다.

https://dydwnsekd.tistory.com/108

 

Airflow execution_date 이해하기

이번 글에서는 Airflow의 execution_date에 대해서 이해해보도록 하자. execution_date에 대한 설명을 하기에 앞서 Airflow는 batch schedulering을 위한 tool이라는 것을 기억하자. execution_date란 ? 먼저 execution_date

dydwnsekd.tistory.com


Primary Key Uniqueness 보장하기

Primary Key Uniqueness란?

  • 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
    • 다수의 필드 사용도 가능하다.
    • CRATE TABLE 사용시에 지정한다.
-- 방식 1
CREATE TABLE pktest(
	id INT PRIMARY KEY,
...
);
-- 방식 2
CREATE TABLE pktest(
	id INT,
	...
	PRIMARY KEY(id),
);

빅데이터 기반 데이터 웨어하우스들은 PK를 지켜주지 않는다.

  • 데이터 인력이 이를 보장해야한다.
  • 유일성을 보장하지 않는 이유 : 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 되기 때문이다.
  • 보장되지 않는 예시
    CREATE TABLE schema.table_name(
    	date date primary key,
    	value bigint
    );
  • 아래 작업의 수행에 있어 문제가 없다.
    INSERT INTO schema.table_name VALUES ('2023-12-12',100);
    INSERT INTO schema.table_name VALUES ('2023-12-12',150);

PK 보장방법

  • create_date 생성 → Date가 같아도 create_date는 다르기 때문에 최근 정보 선택이 가능하다.
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
  • ROW_NUMBER를 사용
    • 2개의 컬럼을 사용해서 일련번호를 하고싶다.
    • A컬럼별로 레코드를 모으고, 그 안에서 B 컬럼의 역순으로 정렬 후 1번부터 번호를 부여하고 싶은 경우 : ROW_NUMBER() OVER (partition by A order by B DESC) seq
  • 임시 테이블을 사용
    • 임시테이블을 이용하여 중복을 체크하고, 제거 후 원본 테이블로 복사
  • 임시테이블에 일련번호를 추가해서 사용
    1. 임시 테이블 추가 : CREATE TEMP TABLE t AS SELECT * FROM schema.table;
    2. 임시 테이블에 레코드 추가(DAG)
    3. 기존 테이블 제거 : DELETE FROM schema.table;
    4. 중복 없는 테이블 생성
    INSERT INTO schema1.weather_forecast
    SELECT date, temp, min_temp, max_temp, created_date
    FROM (
    	SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
    	FROM t
    )
    WHERE seq = 1;

Upsert

  • PK를 기준으로 존재하는 레코드면 새 정보로 수정
  • 존재하지 않는 레코드면 새 레코드로 적재
  • 데이터 웨어하우스에서 UPSERT를 효율적으로 해주는 문법을 지원한다.

Backfill

Incremental Update

  • Incremental Update를 하게되면 효율성은 좋을 수 있지만, 운영/유지보수의 난이도가 올라간다.
  • Incremental Update pipeline에서 실패한 부분을 재실행하는 경우 난이도가 높다. → backfill수행

Backfill

  • 정의 : 실패한 데이터 파이프라인 재실행 혹은 데이터의 문제로 다시 읽어와야하는 경우를 의미한다.
  • Backfill 해결은 Incremental Update에서 복잡해진다.
  • Airflow에서는 Backfill을 잘 지원한다.
  • DAG 작성 시에는 Backfill이 쉽게 진행될 수 있도록 염두하고 코딩해야한다.
    • 시스템적으로 쉽게 해주는 방법을 구현한다.
    • 실패한 날짜에 해당 날짜를 입력해둔다.
    • Airflow의 접근 방식 : execution_date이 모든 DAG실행에 지정되어 있다. → 이를 바탕으로 데이터를 갱신하도록 코드를 구성해야한다.

Daily Incremental Update & Backfill

  • 2020년 11월 7일의 데이터부터 매일 하루의 데이터를 읽어온다고 가정
    • ETL의 동작은 11월 8일부터이다.
    • start_date : 처음 읽어와야 할 데이터의 날짜 → 위의 경우 start_date = 2020-11-07
    • 그러나 다른 날이라면? → 11월9일에 실행해도 start_date 는 동일하다.
  • 읽어와야하는 데이터의 날짜는 하루전이다. → 어떻게 가져오는가?
    • execution_date : 읽어야하는 데이터의 날짜를 지정할 수 있다.
  • BigQuery나Snowflake 같이 고정되지 않은 요금을 사용한다면 큰 금액이 청구 될 수 있다.

Backfill과 관련된 Airflow 변수들

  • Start_date : DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기이다.
  • execution_date : DAG가 읽어와야하는 데이터의 날짜와 시간
  • catchup : DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안된 것들에 대해 처리 여부를 결정하는 파라미터. True인 경우 실행 안된 것들을 모두 실행하게된다. (default : True)
  • end_date : Backfill을 날짜 범위에 대해 하는 경우에 필요하다.
 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > Jinja & DAG Dependencies  (0) 2024.05.11
Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > airflow.cfg  (0) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29

airflow.cfg

  • 질문에 앞서 airflow.cfg의 위치 및 찾는 방법
    • CLI : airflow-webserver에 위치하기 때문에 shell로 접근해야한다.
      • docker ps 후 docker exec -it -u root <container_id> sh
    • GUI
      • docker-desktop으로 접근
      • airflow-webserver 컨테이너를 클릭 후 Files 탭을 클릭한다.
      • opt>airflow>airflow.cfg를 클릭해 수정한다.
  1. DAGS 폴더의 저장 위치
    - airflow.cfg에 들어가 확인하면 dags_folder = /opt/airflow/dags 를 확인 가능하다.
  2. 새로운 DAG 생성시 Airflow 시스템에서 스캔하는 주기 및 키의 이름
    • 기본 값 : 300초(5분)
    • 키 : dag_dir_list_interval
  3. airflow.cfg에서 API 형태로 외부에서 조작하고 싶은 경우 변경해야하는 부분 : api섹션
    • [api]
      • enable_experimental_api : 실험적 API의 활성변수이다. 2.0 이후에는 Stable REST API로 대체되었다고 한다.
      • auth_backends : API 사용자 인증에 사용할 백엔드를 지정한다.
        airflow.api.auth.backend.basic_auth로 변경하면 ID/PW로 인증하는 형태로 변경되어 API형태로 외부에서 조작이 가능하다. (기본은 session)
      • maximum_page_limit : API 요청에 대한 최대 페이지 제한을 설정한다. (기본 100)
      • access_control_allow_headers, methods, origins : HTTP 헤더를 알려준다.
        해당 변수의 헤더만 사용하고 싶게 하면 “Context-Type,Authorization”으로 설정한다고 한다.
  4. Variable에서 변수의 값을 encrypted로 만들기위해 변수 이름에 들어가야할 단어들
    • value가 *로 암호화되려면 secret, password, passwd, authorization, api_key, apikey, access_token이라는 키워드가 들어가면 된다.
  5. 환경 설정 파일이 수정되고, 실제 반영을 위해서 해야할 일은?
    • restart : sudo systemctl restart airflow-webserver & sudo systemctl restart airflow-scheduler
    • Docker 컨테이너 재실행
  6. Metadata DB의 내용을 암호화하는데 사용되는 키
    • fernet_key를 사용하여 암호화 가능하다.
    • 해당 키는 암호화와 복호화에 같이 쓰이는 대칭키이다.

참고 블로그 글)

[Docker, Airflow] Windows Docker 기반 Airflow에서 airflow.cfg 접근+수정

[Airflow] Airflow.cfg 일부 살펴보기

 

[Airflow] Airflow.cfg 일부 살펴보기

airflow를 설치할 때 Airflow.cfg도 함께 제공된다. 이 파일로 airflow의 동작, 설정, 옵션을 수정할 수 있는데 파일 안에 뭐가 있는지 알아야 맛보고 즐길 수 있을 것이다. airflow.cfg 내부의 일부분을 살

hongcana.tistory.com

 

[Docker, Airflow] Windows Docker 기반 Airflow에서 airflow.cfg 접근+수정

내가 시도한 방법은 다음과 같다. ( 환경 : Windows Docker : Airflow ) (1) GUI 기반 방법 1. docker GUI로 들어간다. 2. airflow-webserver 컨테이너 이름을 클릭하고, Files 탭을 클릭 3. opt > airflow > airflow.cfg 더블 클

hongcana.tistory.com

 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow > MySQL, Backfill  (1) 2024.05.02
Airflow > Time, Pk uniqueness, Backfill  (1) 2024.05.01
Airflow > PythonOperator & 예제  (0) 2024.04.29
Airflow > Airflow 설치해보기 2.5.1  (0) 2024.04.29
Airflow - Data pipeline & airflow  (0) 2024.03.20

+ Recent posts