Mysql 복사하기
: Production MySQL Table(OLTP)의 prod.nps 를 AWS Redshift의 raw_data.nps에 전달한다.(OLAP)
AWS 권한 설정
- Airflow DAG에서 S3 접근 권한 (쓰기권한)
- IAM User를 생성, acceess key와 secret key를 사용
- Redshift가 S3 접근 (읽기 권한)
- S3접근 Role을 만들고 Redshift에 지정해야한다.
MySQL Connection 설정 시 유의 사항
- MySQLdb에러를 피하기 위해서 다음과 같은 명령어를 적용해야한다.
- Airflow Scheduler에 다음 명령어들을 통해 설치 진행
- sudo apt-get install -y default-libmysqlclient-dev
- sudo apt-get install -y gcc
- sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
- Airflow Scheduler에 다음 명령어들을 통해 설치 진행
MySQL & Redshfit 테이블 리뷰
-- Mysql
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
-- Redshift
CREATE TABLE (본인의스키마).nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
MySQL_to_Redshift DAG의 Task 구성
- SqlToS3Operator
- mysql의 결과를 s3에 넣어준다.
- S3ToRedshiftOperator
- S3 → Redshfit, COPY 명령어 사용
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Incremental Update 방식
- MySQL, PostgreSQL 테이블이면 created→modified→deleted로 수행된다.
- deleted 시에는 레코드를 삭제하지 않고, deleted를 True로 설정한다.
- ROW_NUMBER로 구현하기도함.
- A테이블의 내용을 temp_A로 복사
- A 테이블의 레코드 중 modified의 날짜가 지난 일에 해당하는 모든 레코드를 읽어다가 temp_A로 복사 (execution_date)
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date) - temp_A의 레코드들을 pk를 기준으로 파티션 후 modified 값을 기준으로 DESC정렬해서 일련번호가 1인것들만 복사
- S3ToRedshiftOperator로 구현
- query parameter : SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- method : UPSERT → UPSERT KEYs로 지정이된 PK를 기준으로 값이 같은 레코드들은 복사되는 값으로 대체되고, 없는 값은 추가된다.
- upsert_keys로 pk를 지정, 리스트 형태
# SqlToS3Operator
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
# S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
Backfill 실행해보기
- `airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
- catchUp이 True여야함
- execution_date를 사용해서 incremental update가 구현되어야한다.
- 실행순서가 랜덤이기 때문에 DAG defaul_args 의 depends_on_past를 True로 설정해야한다.
- start_date부터 시작하지만, end_date는 포함되지 않는다.
'Data Engineering > Airflow' 카테고리의 다른 글
Airflow > Jinja & DAG Dependencies (0) | 2024.05.11 |
---|---|
Airflow > Time, Pk uniqueness, Backfill (1) | 2024.05.01 |
Airflow > airflow.cfg (0) | 2024.05.01 |
Airflow > PythonOperator & 예제 (0) | 2024.04.29 |
Airflow > Airflow 설치해보기 2.5.1 (0) | 2024.04.29 |