Docker - Postgres
Docker를 활용해서 PostgreSQL을 사용하고 데이터를 적재해보는 실습을 수행했다.
Docker 명령어 작성하기 - postgres
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
postgres:13
코드를 뜯어보자.
- -e POSTGRES_USER ... : POSTGRES에서 사용할 환경 변수를 입력해주었다.
- -v &(pwd)/ny_taxi_... : 호스트의 특정 위치를 볼륨으로 지정해준다. postgresql/data를 현재 디렉토리 하위의 nv_taxi_postgres_data와 연결해주었다.
- -p 5432:5432 : PC의 포트를 도커 내부의 포트로 연결한다. 5432포트로 접속이 가능해진다. (앞이 현재pc에서 사용하는 포트)
- 더 자세한 커멘드들은 다음 블로그를 참고하기를 바란다. https://www.daleseo.com/docker-run/
Postgres에 접속하기
다음 명령을 이용해서 연결을 진행하였다.
pgcli -h localhost -p 5432 -u root -d ny_taxi
뒤에 미리 앞에서 설정해둔 password를 입력하면 다음과 같은 화면을 볼 수 있다.
NewYork Taxi data 받아와서 처리하기
Data Source
연습용 데이터로 Taxi Data를 받아와 사용하였다.
TLC Trip Record Data - TLC
TLC Trip Record Data Yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data use
www.nyc.gov
- Yellow Taxi Trip Records를 받기위해 다음 명령어를 수행했다.
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
Data 처리 및 SQL에 저장하기
jupyter notebook을 이용해서 데이터를 가져오고 postgreSQL과 연결해서 데이터를 업로드 하는 과정이다.
강의는 csv로 진행되었는데 ny_taxi_data가 parquet형식이여서 당황했지만 다행히 github에 parquet으로 진행하는 방법을 알려주어 참고해서 진행하였다.
1. 데이터 불러오기 - pyarrow 사용
import pyarrow.parquet as pq
# read metadata
pq.read_metadata('yellow_tripdata_2023-01.parquet')
file= pq.ParquetFile('yellow_tripdata_2023-01.parquet')
table = file.read()
table.schema
이렇게 읽어온 파일을 판다스 데이터 프레임으로 다시 변환하고 싶다면 다음 작업을 수행하면된다.
# convert to pandas
df = table.to_pandas()
2. postgresql 연결하기
sqlalchemy를 사용해서 postgresql과 연결한다. (sqlalchemy는 신인가...?)
# connect PostgreSQl with sqlalchemy
from sqlalchemy import create_engine
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()
- pandas에는 테이블을 쉽게 만들기 위해서 데이터 프레임을 테이블 생성코드로 변환해주는 기능이 있다!
(그러나 후에 to_sql을 하는 경우에는 테이블이 없어도 생성되어 크게 쓸일은 없었다..)
# Generate CREATE SQL
print(pd.io.sql.get_schema(df, name='yellow_taxi_data',con=engine))
3. batch size를 설정해서 insert하기
배치 사이즈를 10,000으로 설정해서 반복해서 insert를 수행한다.
# Insert values into the table
t_start = time()
count = 0
for batch in file.iter_batches(batch_size=100000):
count+=1
batch_df = batch.to_pandas()
print(f'inserting batch {count} ...')
b_start = time()
batch_df.to_sql(name='ny_taxi_data', con=engine, if_exists='append')
b_end = time()
print(f'inserted, time taken {b_end-b_start:10.3f} seconds.\n')
t_end = time()
print(f'Completed. Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')
이제 postgresql에 접속해서 데이터를 확인하면?
정상적으로 잘 동작한 것을 확인 가능하다.
전체 코드는 Github에 업로드 하였습니다.