Mage를 이용해서 간단한 파이프라인을 구축해보는 작업을 수행하려한다.
이번 실습에서는 스케줄을 만들지는 않고, 가볍게 데이터를 수집해서 postgres에 적재하는 과정까지를 구성한다.
환경 변수
시작에 앞서 왜 환경변수를 했는지?
: 이번에는 docker-compose를 이용해서 postgres와 mage를 같이 올릴거라 postgres의 연결을 위한 환경변수들을 한번에 관리하고자 .env를 이용해서 진행할것이다.
# .env
PROJECT_NAME=magic-zoomcamp
POSTGRES_DBNAME=postgres
POSTGRES_SCHEMA=magic
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
이전 실습에서 수행하면서 작성했던 postgres 세팅값들을 넣어주었다.
혹시나 궁금하시다면 https://itpori.tistory.com/33
Docker - Postgres
Docker를 활용해서 PostgreSQL을 사용하고 데이터를 적재해보는 실습을 수행했다. Docker 명령어 작성하기 - postgres docker run -it \ -e POSTGRES_USER="root" \ -e POSTGRES_PASSWORD="root" \ -e POSTGRES_DB="ny_taxi" \ -v $(pwd)/ny
itpori.tistory.com
여튼 이렇게 환경변수를 설정해두면 docker-compose.yml에서 값들을 받아서 사용 가능하다.
# docker-compose.yml
env_file:
- .env
build:
context: .
dockerfile: Dockerfile
environment:
USER_CODE_PATH: /home/src/${PROJECT_NAME}
POSTGRES_DBNAME: ${POSTGRES_DBNAME}
POSTGRES_SCHEMA: ${POSTGRES_SCHEMA}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_HOST: ${POSTGRES_HOST}
POSTGRES_PORT: ${POSTGRES_PORT}
mage의 환경변수 세팅값들이다. 참고로 postgres도 같이 올리기 때문에 postgres의 환경변수 값들도 세팅한다.
env_file:
- .env
environment:
POSTGRES_DB: ${POSTGRES_DBNAME}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
자 이제 실행을 해주면 정상적으로 mage가 작동한다. 그런데 하나 더 수행해야할 단계가 남아있다.
project_name > Files에 들어가면 io_config.yaml을 찾을 수 있다. 여기서 환경변수 값들을 추가해주어야하는데
후에 postgreSQL을 사용하는 단계에서 편하게 사용할 수 있기 때문이다.
# io_config.yaml
dev:
# PostgresSQL
POSTGRES_CONNECT_TIMEOUT: 10
POSTGRES_DBNAME: "{{ env_var("POSTGRES_DBNAME") }}"
POSTGRES_SCHEMA: "{{ env_var("POSTGRES_SCHEMA") }}"
POSTGRES_USER: "{{ env_var("POSTGRES_USER") }}"
POSTGRES_PASSWORD: "{{ env_var("POSTGRES_PASSWORD") }}"
POSTGRES_HOST: "{{ env_var("POSTGRES_HOST") }}"
POSTGRES_PORT: "{{ env_var("POSTGRES_PORT") }}"
docs를 참고하면 더 좋을 것 같다.
https://docs.mage.ai/data-integrations/configuration
Configure data integration pipelines - Mage
Configure data integration pipelines
docs.mage.ai
파이프라인 구축하기
우선 배치로 간단하게 파이프라인을 생성해준다.
이제 안의 내용을 채울 시간이다
데이터 로더 만들기 (수집)
Data loader > python > API를 클릭해서 로더를 생성한다. API로 수집할거라서 이렇게 만들었다. 다른 방식도 많다!
@data_loader
def load_data_from_api(*args, **kwargs):
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz'
taxi_dtyps = {
'VenderID': pd.Int64Dtype(),
'passenger_count' : pd.Int64Dtype(),
'trip_distance': float,
'RatecodeID': pd.Int64Dtype(),
...
return pd.read_csv(url, header = 0, sep=",",compression ="gzip", dtype=taxi_dtyps, parse_dates = parse_dates)
데코레이터를 이용해서 loader를 구축하는데 정말 간단하다. (url 탭을 잘 했는데 이상하게 보인다.. 주의)
API를 이용해서 값들을 받고, dtyps를 추가해서 csv로 읽어들였다. 이대로 return하면 뒤의 transformers나 explorter에서도 사용 가능하다.
데이터 변환
간단한 변환 작업을 수행했다. passenger_count가 0보다 큰 경우의 값들만 가져오고 싶어서 진행하였고, 파일 크기가 큰 관계로 내 mac에서 에러를 자꾸 발생시켜 전체적인 row의 수를 줄였다.
@transformer
def transform(data, *args, **kwargs):
print(f'Preprocessing: row with zero passengers: {data["passenger_count"].isin([0]).sum()}')
return data[data["passenger_count"]>0][:10000]
추가로 각 블록에서 test 데코레이터를 사용해서 테스트를 진행할 수 있다. 변환을 수행하고도 0인 값이 혹시나 남아있는지 테스트를 진행해보았다.
@test
def test_output(output, *args):
assert output["passenger_count"].isin([0]).sum() == 0, 'There are rides with zero passengers'
데이터 적재
Data exporter를 이용해서 적재를 진행한다. postgreSQL을 잘 선택해서 진행하면된다.
코드가 템플릿처럼 대부분 구성되어있어 별다른 변경없이 테이블 이름과 config_profile정도만 작성해주면된다.
schema_name = 'ny_taxi' # Specify the name of the schema to export data to
table_name = 'yellow_cab_data' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
데이터 확인해보기
마지막으로 Loader를 다시 사용해서 postgres에 잘 적재되었는지 확인해보자.
우선 loader에서 사용할 변수값들을 잘 선택해줘야한다.
이제 SQL문으로 확인을하면
SELECT * FROM ny_taxi.yellow_cab_data Limit 10
정상적으로 출력된다.
코드는 Github에 올려두었습니다.
https://github.com/poriz/data-engineering-zoomcamp-poriz/tree/main/02-workflow-orchestrations
'Data Engineering > Mage' 카테고리의 다른 글
Mage - 소개 및 설치 (0) | 2024.03.29 |
---|