Mage를 이용해서 간단한 파이프라인을 구축해보는 작업을 수행하려한다.

이번 실습에서는 스케줄을 만들지는 않고, 가볍게 데이터를 수집해서 postgres에 적재하는 과정까지를 구성한다.

환경 변수

시작에 앞서 왜 환경변수를 했는지?

: 이번에는 docker-compose를 이용해서 postgres와 mage를 같이 올릴거라 postgres의 연결을 위한 환경변수들을 한번에 관리하고자 .env를 이용해서 진행할것이다.

# .env
PROJECT_NAME=magic-zoomcamp
POSTGRES_DBNAME=postgres
POSTGRES_SCHEMA=magic
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=postgres
POSTGRES_PORT=5432

 

이전 실습에서 수행하면서 작성했던 postgres 세팅값들을 넣어주었다.

혹시나 궁금하시다면 https://itpori.tistory.com/33

 

Docker - Postgres

Docker를 활용해서 PostgreSQL을 사용하고 데이터를 적재해보는 실습을 수행했다. Docker 명령어 작성하기 - postgres docker run -it \ -e POSTGRES_USER="root" \ -e POSTGRES_PASSWORD="root" \ -e POSTGRES_DB="ny_taxi" \ -v $(pwd)/ny

itpori.tistory.com

 

여튼 이렇게 환경변수를 설정해두면 docker-compose.yml에서 값들을 받아서 사용 가능하다.

# docker-compose.yml

env_file:
      - .env
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      USER_CODE_PATH: /home/src/${PROJECT_NAME}
      POSTGRES_DBNAME: ${POSTGRES_DBNAME}
      POSTGRES_SCHEMA: ${POSTGRES_SCHEMA}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_HOST: ${POSTGRES_HOST}
      POSTGRES_PORT: ${POSTGRES_PORT}

 

mage의 환경변수 세팅값들이다. 참고로 postgres도 같이 올리기 때문에 postgres의 환경변수 값들도 세팅한다.

env_file:
      - .env
    environment:
      POSTGRES_DB: ${POSTGRES_DBNAME}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}

 

자 이제 실행을 해주면 정상적으로 mage가 작동한다. 그런데 하나 더 수행해야할 단계가 남아있다.

project_name > Files에 들어가면 io_config.yaml을 찾을 수 있다. 여기서 환경변수 값들을 추가해주어야하는데

후에 postgreSQL을 사용하는 단계에서 편하게 사용할 수 있기 때문이다.

# io_config.yaml

dev:
  # PostgresSQL
  POSTGRES_CONNECT_TIMEOUT: 10
  POSTGRES_DBNAME: "{{ env_var("POSTGRES_DBNAME") }}"
  POSTGRES_SCHEMA: "{{ env_var("POSTGRES_SCHEMA") }}"
  POSTGRES_USER: "{{ env_var("POSTGRES_USER") }}"
  POSTGRES_PASSWORD: "{{ env_var("POSTGRES_PASSWORD") }}"
  POSTGRES_HOST: "{{ env_var("POSTGRES_HOST") }}"
  POSTGRES_PORT: "{{ env_var("POSTGRES_PORT") }}"

 

docs를 참고하면 더 좋을 것 같다.

https://docs.mage.ai/data-integrations/configuration

 

Configure data integration pipelines - Mage

Configure data integration pipelines

docs.mage.ai


파이프라인 구축하기

우선 배치로 간단하게 파이프라인을 생성해준다.

 

 

이제 안의 내용을 채울 시간이다

데이터 로더 만들기 (수집)

Data loader > python > API를 클릭해서 로더를 생성한다. API로 수집할거라서 이렇게 만들었다. 다른 방식도 많다!

@data_loader
def load_data_from_api(*args, **kwargs):
	url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz'
    
    taxi_dtyps = {
        'VenderID': pd.Int64Dtype(),
        'passenger_count' : pd.Int64Dtype(),
        'trip_distance': float,
        'RatecodeID': pd.Int64Dtype(),
        ...
        
    return pd.read_csv(url, header = 0, sep=",",compression ="gzip", dtype=taxi_dtyps, parse_dates = parse_dates)

데코레이터를 이용해서 loader를 구축하는데 정말 간단하다. (url 탭을 잘 했는데 이상하게 보인다.. 주의)

API를 이용해서 값들을 받고, dtyps를 추가해서 csv로 읽어들였다. 이대로 return하면 뒤의 transformers나 explorter에서도 사용 가능하다.

데이터 변환

간단한 변환 작업을 수행했다. passenger_count가 0보다 큰 경우의 값들만 가져오고 싶어서 진행하였고, 파일 크기가 큰 관계로 내 mac에서 에러를 자꾸 발생시켜 전체적인 row의 수를 줄였다.

@transformer
def transform(data, *args, **kwargs):
    print(f'Preprocessing: row with zero passengers: {data["passenger_count"].isin([0]).sum()}')

    return data[data["passenger_count"]>0][:10000]

 

추가로 각 블록에서 test 데코레이터를 사용해서 테스트를 진행할 수 있다. 변환을 수행하고도 0인 값이 혹시나 남아있는지 테스트를 진행해보았다.

@test
def test_output(output, *args):
    assert output["passenger_count"].isin([0]).sum() == 0, 'There are rides with zero passengers'

 

데이터 적재

Data exporter를 이용해서 적재를 진행한다. postgreSQL을 잘 선택해서 진행하면된다.

코드가 템플릿처럼 대부분 구성되어있어 별다른 변경없이 테이블 이름과 config_profile정도만 작성해주면된다.

schema_name = 'ny_taxi'  # Specify the name of the schema to export data to
    table_name = 'yellow_cab_data'  # Specify the name of the table to export data to
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'dev'

정상 적재!

데이터 확인해보기

마지막으로 Loader를 다시 사용해서 postgres에 잘 적재되었는지 확인해보자.

우선 loader에서 사용할 변수값들을 잘 선택해줘야한다.

이제 SQL문으로 확인을하면

SELECT * FROM ny_taxi.yellow_cab_data Limit 10

정상적으로 출력된다.

 


코드는 Github에 올려두었습니다.

https://github.com/poriz/data-engineering-zoomcamp-poriz/tree/main/02-workflow-orchestrations

'Data Engineering > Mage' 카테고리의 다른 글

Mage - 소개 및 설치  (0) 2024.03.29

Mage란?

데이터 수집, 변환 그리고 오케스트레이팅을 위한 오픈소스 파이프라인 툴이다.

ETL작업을 도와주고 오케스트레이션을 해준다고 하는데 이번에는 간단한 설치를 진행해보려한다.

 

설치(quickstart)

Docker compose를 활용해서 설치 및 실행환경을 구축하는 방법이다. (매우간단)

git clone https://github.com/mage-ai/compose-quickstart.git

 

git clone을 통해서 우선 파일들을 받아주고 빌드한다.

docker compose build

 

변수들이 없다고 나오는 경우가 있는데 dev.env 파일을 .env로 교체해주면 된다!

정상적으로 빌드 후에 실행을 하게되면 잘 구동되는 모습을 볼 수 있다.

docker compose up

 

http://localhost:6789 다음을 통해서 웹 ui를 살펴보자.

 

웹 UI

간단하게 pipelines를 확인해보려한다.

 

좌측 탭을 통해서 파이프라인을 확인하면 파이프라인의 생성이 가능하다. 예시 파이프라인을 한번 가져와봤다.

(zoomcamp에서 하나 생성해줘서 가져왔다.)

 

이런식으로 etl 파이프라인이 생성되는것을 확인 가능하다.

 

 

'Data Engineering > Mage' 카테고리의 다른 글

Mage - 간단한 Pipeline 구축해보기  (0) 2024.04.15

+ Recent posts