Airflow & Timezone
- airflow.cfg에는 두 종류의 타임존 키가 존재한다.
- default_timezone
- default_ui_timezone
- start_date, end_date, schedule : default_timezone을 따른다.
- execution_date, log_time : 항상 UTC를 따른다.
execution_date
dag가 실행되어야하는 기대값, 주문번호에 가깝다는 이야기가 많다.
DAG가 실행되어야하는 시간 값을 변수로 사용하고자 할 때 많이 사용한 것 같다.
특히 과거의 task를 재실행하는 경우에 유용했다.
예를 들어본다면 2020-01-01에 실행되는 DAG가 있다. 이 DAG는 두 task로 이루어지는데
하나는 오후 6시에 데이터를 수집하고 다른 하나는 자정이 넘어가 다음날이 되어서야 실행된다. 그런데 자정이 넘어가서 실행되는 task에는 파일 저장 로직이 담겨있다. 그리고 파일명에는 데이터 수집일자가 적혀있어야한다.
이 때 execution_date를 사용하면 DAG가 실행되는 날짜가 담기기 때문에 수집일자인 2020-01-01 18:00:00을 담을 수 있다.
https://dydwnsekd.tistory.com/108
Airflow execution_date 이해하기
이번 글에서는 Airflow의 execution_date에 대해서 이해해보도록 하자. execution_date에 대한 설명을 하기에 앞서 Airflow는 batch schedulering을 위한 tool이라는 것을 기억하자. execution_date란 ? 먼저 execution_date
dydwnsekd.tistory.com
Primary Key Uniqueness 보장하기
Primary Key Uniqueness란?
- 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
- 다수의 필드 사용도 가능하다.
- CRATE TABLE 사용시에 지정한다.
-- 방식 1
CREATE TABLE pktest(
id INT PRIMARY KEY,
...
);
-- 방식 2
CREATE TABLE pktest(
id INT,
...
PRIMARY KEY(id),
);
빅데이터 기반 데이터 웨어하우스들은 PK를 지켜주지 않는다.
- 데이터 인력이 이를 보장해야한다.
- 유일성을 보장하지 않는 이유 : 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 되기 때문이다.
- 보장되지 않는 예시
CREATE TABLE schema.table_name(
date date primary key,
value bigint
);
- 아래 작업의 수행에 있어 문제가 없다.
INSERT INTO schema.table_name VALUES ('2023-12-12',100);
INSERT INTO schema.table_name VALUES ('2023-12-12',150);
PK 보장방법
- create_date 생성 → Date가 같아도 create_date는 다르기 때문에 최근 정보 선택이 가능하다.
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
- ROW_NUMBER를 사용
- 2개의 컬럼을 사용해서 일련번호를 하고싶다.
- A컬럼별로 레코드를 모으고, 그 안에서 B 컬럼의 역순으로 정렬 후 1번부터 번호를 부여하고 싶은 경우 : ROW_NUMBER() OVER (partition by A order by B DESC) seq
- 임시 테이블을 사용
- 임시테이블을 이용하여 중복을 체크하고, 제거 후 원본 테이블로 복사
- 임시테이블에 일련번호를 추가해서 사용
- 임시 테이블 추가 : CREATE TEMP TABLE t AS SELECT * FROM schema.table;
- 임시 테이블에 레코드 추가(DAG)
- 기존 테이블 제거 : DELETE FROM schema.table;
- 중복 없는 테이블 생성
INSERT INTO schema1.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
Upsert
- PK를 기준으로 존재하는 레코드면 새 정보로 수정
- 존재하지 않는 레코드면 새 레코드로 적재
- 데이터 웨어하우스에서 UPSERT를 효율적으로 해주는 문법을 지원한다.
Backfill
Incremental Update
- Incremental Update를 하게되면 효율성은 좋을 수 있지만, 운영/유지보수의 난이도가 올라간다.
- Incremental Update pipeline에서 실패한 부분을 재실행하는 경우 난이도가 높다. → backfill수행
Backfill
- 정의 : 실패한 데이터 파이프라인 재실행 혹은 데이터의 문제로 다시 읽어와야하는 경우를 의미한다.
- Backfill 해결은 Incremental Update에서 복잡해진다.
- Airflow에서는 Backfill을 잘 지원한다.
- DAG 작성 시에는 Backfill이 쉽게 진행될 수 있도록 염두하고 코딩해야한다.
- 시스템적으로 쉽게 해주는 방법을 구현한다.
- 실패한 날짜에 해당 날짜를 입력해둔다.
- Airflow의 접근 방식 : execution_date이 모든 DAG실행에 지정되어 있다. → 이를 바탕으로 데이터를 갱신하도록 코드를 구성해야한다.
Daily Incremental Update & Backfill
- 2020년 11월 7일의 데이터부터 매일 하루의 데이터를 읽어온다고 가정
- ETL의 동작은 11월 8일부터이다.
- start_date : 처음 읽어와야 할 데이터의 날짜 → 위의 경우 start_date = 2020-11-07
- 그러나 다른 날이라면? → 11월9일에 실행해도 start_date 는 동일하다.
- 읽어와야하는 데이터의 날짜는 하루전이다. → 어떻게 가져오는가?
- execution_date : 읽어야하는 데이터의 날짜를 지정할 수 있다.
- BigQuery나Snowflake 같이 고정되지 않은 요금을 사용한다면 큰 금액이 청구 될 수 있다.
Backfill과 관련된 Airflow 변수들
- Start_date : DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기이다.
- execution_date : DAG가 읽어와야하는 데이터의 날짜와 시간
- catchup : DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안된 것들에 대해 처리 여부를 결정하는 파라미터. True인 경우 실행 안된 것들을 모두 실행하게된다. (default : True)
- end_date : Backfill을 날짜 범위에 대해 하는 경우에 필요하다.
'Data Engineering > Airflow' 카테고리의 다른 글
Airflow > Jinja & DAG Dependencies (0) | 2024.05.11 |
---|---|
Airflow > MySQL, Backfill (1) | 2024.05.02 |
Airflow > airflow.cfg (0) | 2024.05.01 |
Airflow > PythonOperator & 예제 (0) | 2024.04.29 |
Airflow > Airflow 설치해보기 2.5.1 (0) | 2024.04.29 |