Today I start my first day of the DataTalksClub course that I will be doing on my own, without any intention of getting a certificate and purely to learn about the fundamentals of data engineering.
DataTalksClub - Day 1
Date: 2025-08-11
#dataengineering #datatalksclub
Intro to Docker (1.2.1)
Note: CI/CD is not covered in the course, make sure to look at this in a later date
Docker is useful because:
- Local experimentation
- Integration testing (CI/CD)
- Reproducibility
- Running pipelines on the cloud (AWS batch, kubernetes)
- Spark (defining data pipelines, used to define dependencies)
- Serverless (AWS Lambda, processing data one record at a time)
Docker Basics
docker run -it ubuntu bash
- docker is the base command for using docker
- run means to execute an image
- -it means to run in an interactive terminal
- ubuntu means the name of the image we want to run
- bash means the command we want to execute in this image (parameter)
Running this image and deleting everything and running it again returns it to its original state because it is isolated
docker run -it python:3.9
- python is the name of the image
- 3.9 is the tag, otherwise known as the version
docker run -it --entrypoint=bash python:3.9
In order to install modules, we can use pip install pandas
, but in order to do this we need
to go to a bash terminal since we cannot do this in python
Docker in VSCode
When we are creating our own container we need specifications for our dockerfile, so we start with the base image:
FROM python:3.9
Then we can do a run command:
RUN pip install pandas
Since we can only install pandas in bash and not python we do:
ENTRYPOINT [ "bash" ]
In order to build this image from VSCode, using the docker file we do:
docker build -t test:pandas .
- build tells docker to build the image
- . means to build it in the current directory
Then when that is done, you can run it by using (which will bring you into the bash terminal):
docker run -it test:pandas
Data Pipeline
We can now create a python file (.py):
import pandas as pd
# whatever fancy stuff it will be doing
print('yay job done')
Then in our docker file we add:
COPY pipeline.py pipeline.py
Which copies the file to docker container
And we can specify the work directory:
WORKDIR /app
Now you run the container, and go into the python terminal and you can do pipeline.py
when
you are in the working directory
Data Pipeline - Automation
To the pipeline file we add:
import sys
print(sys.argv)
day = sys.argv[1]
print(f'job done good for day = {day}')
Then in the docker file we do:
ENTRYPOINT [ "python", "pipeline.py" ]
You build the container, and then run it:
docker run -it test:pandas 2025-08-10
This is how to parameterize the data pipeline scripts:
docker run -it test:pandas 2025-08-10 123 hello
Ingesting NY Taxi Data to Postgres (1.2.2)
Docker Compose
We need to set up:
- Environment variables - we use a
-e
flag for this (postgres_user/password/db) - Volumes - a way of mapping folder in host machine file system to a folder in a
container (this is called mounting) - we use a
-v
flag for this (needs a full path on Windows machines, for Mac you can do$(pwd)/blahblahblah
) - Ports - we need to specify a port on our host machine to a port on the container
(needed to send a request to the db) - we use a
-p
flag for this (5432:5432)
From the tutorial section we put the code into the terminal to run it:
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v "/Users/ignasprakapas/Coding Projects/data-engineering/data-engineering-zoomcamp/01-docker-terraform/2_docker_sql/ny_taxi_postgres_data":/var/lib/postgresql/data \
-p 5432:5432 \
postgres:13
Now we want to access the database (pip install pgcli):
pgcli -h localhost -p 5432 -u root -d ny_taxi
Tip: Sometimes the port 5432 is already in use by a previous container, so on Mac we do
sudo lsof -i -P | grep LISTEN | grep :$PORT
to list the ports in use and then dosudo kill -9 (PID)
When we are in the container using pgcli, we can do:
\dt
- to list the tables in there
Working with Jupyter
We are going to use jupyter now, to install we can do pip install jupyter
Then we can do jupyter notebook
Important: .parquet and not .csv anymore → just replace read_csv with read_parquet and remove the nrows argument
Handy Data Commands
When looking at a .csv file you can look at it by using the less
command
head -n 100 xyz.csv > xyz_head.csv
= means get the top 100 lines and convert that into a new filewc -l xyz.csv
= counts the amount of rows in the .csv file, the -l specifies lines
Dataset Information
The dataset:
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
Documentation:
Data
Dictionary for Trip Records
Zone ID CSV file:
Taxi Zone Lookup Table
Working with Parquet Files in Jupyter
Since the file was in .parquet format and not .csv format, we had to use .parquet and work with that in jupyter:
import pandas as pd
!pip install pyarrow
import urllib.request
import pyarrow.parquet as pq
Download the file:
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet'
filename = 'yellow_tripdata_2021-01.parquet'
urllib.request.urlretrieve(url, filename)
print("File downloaded")
Read the parquet file:
parquet_file = pq.ParquetFile(filename)
trips_df = parquet_file.read(use_pandas_metadata=True).slice(0, 100).to_pandas()
trips_df
Creating Database Schema
Now we want to put this data into our postgres, and to start off we need to turn this into a schema. First we make this into a data definition language (which is used for specifying schemas in SQL):
print(pd.io.sql.get_schema(trips_df, name="yellow_taxi_data"))
We notice in the schema that what are meant to be timestamps are in a text format so we need to convert them into timestamp:
trips_df.tpep_pickup_datetime = pd.to_datetime(trips_df.tpep_pickup_datetime)
trips_df.tpep_dropoff_datetime = pd.to_datetime(trips_df.tpep_dropoff_datetime)
This would go above the DDL schema conversion
Connecting to PostgreSQL
Then we need to import sqlalchemy:
from sqlalchemy import create_engine
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()
Then we edit our schema DDL code:
print(pd.io.sql.get_schema(trips_df, name="yellow_taxi_data", con=engine))
Batch Processing with Iterators
So since this is a large dataset and we are only really inserting the first 100 data points, we can do batch processing, using an iterator.
We create the iterator here:
parquet_file = pq.ParquetFile('yellow_tripdata_2021-01.parquet')
df_iter = parquet_file.iter_batches(batch_size=100000)
Get the first chunk:
df = next(df_iter).to_pandas()
Check length:
len(df)
To make sure the data type is timestamps:
trips_df.tpep_pickup_datetime = pd.to_datetime(trips_df.tpep_pickup_datetime)
trips_df.tpep_dropoff_datetime = pd.to_datetime(trips_df.tpep_dropoff_datetime)
Now we can use:
df.head(n=0)
This will show us the headers of the data set
Inserting Data into PostgreSQL
Now what we want to do is insert the table and then the data chunk by chunk.
There is a function in dataframes which is called to_sql
:
trips_df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')
The replace keyword will replace a row if it's already there with the new row (it will drop the table before inserting new values)
This command inserts the tables into our postgres in docker. This was produced by Claude AI:
%time df_iter = parquet_file.iter_batches(batch_size=100000)
for i, batch in enumerate(df_iter):
chunk_df = batch.to_pandas()
# Convert datetime columns
chunk_df.tpep_pickup_datetime = pd.to_datetime(chunk_df.tpep_pickup_datetime)
chunk_df.tpep_dropoff_datetime = pd.to_datetime(chunk_df.tpep_dropoff_datetime)
print(f"Inserting chunk {i+1} with {len(chunk_df)} rows...")
# Insert chunk
%time chunk_df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
print(f"Finished inserting chunk {i+1}")
This concludes my learning for the day!