Superstore ETL Pipeline
A full end-to-end data pipeline — from raw CSV extraction to a clean PostgreSQL database — with Python ETL scripts and advanced SQL analytics.
ETL Pipeline Flow
Extract
Read raw CSV using Pandas with latin-1 encoding. Log shape, data types and preview rows.
Transform
Remove duplicates & nulls, rename 24 columns to snake_case, cast date columns to datetime.
Load
Push clean DataFrame into PostgreSQL via SQLAlchemy engine. Credentials stored in .env file.
Analyse
SQL queries surface KPIs, top products, monthly trends, customer rankings and regional revenue.
Scripts & Queries
import pandas as pd
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
data_path = os.path.join(BASE_DIR, "data", "raw_data.csv")
def extract_data(path=data_path):
df = pd.read_csv(path, encoding="latin-1")
print("Shape:", df.shape)
print(df.info())
print(df.head())
return df
if __name__ == "__main__":
df = extract_data()
import pandas as pd
def clean_data(df):
# remove duplicates and nulls
df = df.drop_duplicates()
df = df.dropna()
# rename columns to match DB schema
df = df.rename(columns={
"Row ID": "row_id",
"Order ID": "order_id",
"Order Date": "order_date",
"Ship Date": "ship_date",
"Ship Mode": "ship_mode",
"Customer ID": "customer_id",
"Customer Name": "customer_full_name",
"Segment": "segment",
"Country": "country",
"City": "city",
"State": "state_name",
"Postal Code": "postal_code",
"Region": "region",
"Product ID": "product_id",
"Category": "category",
"Sub-Category": "sub_category",
"Product Name": "product_name",
"Sales": "sales",
"Quantity": "quantity",
"Discount": "discount",
"Profit": "profit",
"Shipping Cost": "shipping_cost",
"Order Priority": "order_priority",
"Market": "market"
})
# convert data types
df['ship_date'] = pd.to_datetime(df['ship_date'], errors='coerce', dayfirst=True)
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce', dayfirst=True)
return df
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv
load_dotenv()
engine = create_engine(os.getenv("DB_URL"))
def load_data(df, table_name="sales"):
df.to_sql(table_name, engine, if_exists="append", index=False)
print(f"Data loaded successfully into {table_name}")
if __name__ == "__main__":
from extract import extract_data
from transform import clean_data
df = extract_data()
df = clean_data(df)
load_data(df)
CREATE TABLE sales (
row_id int,
order_id text,
order_date date,
ship_date date,
ship_mode text,
customer_id text,
customer_full_name text,
segment text,
city text,
state_name text,
country text,
postal_code int,
market text,
region text,
product_id text,
category text,
sub_category text,
product_name text,
sales numeric,
quantity int,
discount numeric,
profit numeric,
shipping_cost numeric,
order_priority text
);
-- Verify columns and types
SELECT column_name "Columns",
data_type "Types"
FROM information_schema.columns
WHERE table_name = 'sales';
-- KPIs: total orders, revenue, profit
SELECT
COUNT(*) AS total_orders,
ROUND(SUM(sales), 2) AS total_revenue,
ROUND(SUM(profit), 2) AS total_profit,
ROUND(AVG(profit), 2) AS avg_profit_per_order
FROM sales;
-- Best 3 Selling Products
SELECT product_name,
ROUND(SUM(sales), 2) AS total_sales
FROM sales
GROUP BY product_name
ORDER BY 2 DESC
LIMIT 3;
-- Monthly Sales Trend
SELECT to_char(order_date, 'Month') AS month,
ROUND(SUM(sales), 2) AS total_sales
FROM sales
GROUP BY 1, date_part('month', order_date)
ORDER BY date_part('month', order_date);
-- Top 5 Customers by Sales
SELECT customer_full_name,
ROUND(SUM(sales), 2) AS total_sales
FROM sales
GROUP BY customer_full_name
ORDER BY total_sales DESC
LIMIT 5;
-- Sales by Region
SELECT region,
ROUND(SUM(sales), 2) AS total_revenue
FROM sales
GROUP BY region
ORDER BY 2 DESC;
Tech Stack
Python
Core scripting language for the ETL orchestration via main.py
Pandas
Data extraction, column renaming, type casting, deduplication
PostgreSQL
Target relational database. Schema designed and queried with SQL
SQLAlchemy
Python SQL toolkit used as the database engine connector
python-dotenv
Secures DB credentials by loading them from a .env file
Jupyter
Visualizations notebook for exploratory data analysis
Key Takeaways
ETL Design Patterns
Structuring a pipeline into separate Extract, Transform, and Load modules makes the code reusable, testable, and easy to extend.
Database Schema First
Designing and creating the PostgreSQL table before loading data ensures type safety and prevents silent failures on insert.
SQL Analytics Power
Writing grouped and ordered SQL queries directly against the loaded table produces business insights instantly without additional tooling.
Secrets Management
Using python-dotenv and .gitignore to protect credentials is essential even in personal projects — good habits from day one.
Folder Structure
See the full source code
All scripts, SQL files, and the Jupyter notebook are available on GitHub.
View on GitHub