Back to Portfolio
ETL Pipeline SQL Python End-to-End

Superstore ETL Pipeline

A full end-to-end data pipeline — from raw CSV extraction to a clean PostgreSQL database — with Python ETL scripts and advanced SQL analytics.

Stack Python · PostgreSQL · Pandas · SQLAlchemy
Type End-to-End Data Project
Dataset Global Superstore Sales
0 Records Loaded
0 Columns Mapped
0 Python Scripts
0 SQL Queries
Architecture

ETL Pipeline Flow

Extract

Read raw CSV using Pandas with latin-1 encoding. Log shape, data types and preview rows.

extract.py

Transform

Remove duplicates & nulls, rename 24 columns to snake_case, cast date columns to datetime.

transform.py

Load

Push clean DataFrame into PostgreSQL via SQLAlchemy engine. Credentials stored in .env file.

load.py

Analyse

SQL queries surface KPIs, top products, monthly trends, customer rankings and regional revenue.

data_analysis.sql
Code Deep Dive

Scripts & Queries

scripts/extract.py
import pandas as pd
import os

BASE_DIR = os.path.dirname(os.path.dirname(__file__))
data_path = os.path.join(BASE_DIR, "data", "raw_data.csv")

def extract_data(path=data_path):
    df = pd.read_csv(path, encoding="latin-1")
    
    print("Shape:", df.shape)
    print(df.info())
    print(df.head())
    
    return df


if __name__ == "__main__":
    df = extract_data()
scripts/transform.py
import pandas as pd

def clean_data(df):

    # remove duplicates and nulls
    df = df.drop_duplicates()
    df = df.dropna()

    # rename columns to match DB schema
    df = df.rename(columns={
        "Row ID":          "row_id",
        "Order ID":        "order_id",
        "Order Date":      "order_date",
        "Ship Date":       "ship_date",
        "Ship Mode":       "ship_mode",
        "Customer ID":     "customer_id",
        "Customer Name":   "customer_full_name",
        "Segment":         "segment",
        "Country":         "country",
        "City":            "city",
        "State":           "state_name",
        "Postal Code":     "postal_code",
        "Region":          "region",
        "Product ID":      "product_id",
        "Category":        "category",
        "Sub-Category":    "sub_category",
        "Product Name":    "product_name",
        "Sales":           "sales",
        "Quantity":        "quantity",
        "Discount":        "discount",
        "Profit":          "profit",
        "Shipping Cost":   "shipping_cost",
        "Order Priority":  "order_priority",
        "Market":          "market"
    })

    # convert data types
    df['ship_date']  = pd.to_datetime(df['ship_date'],  errors='coerce', dayfirst=True)
    df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce', dayfirst=True)

    return df
scripts/load.py
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv

load_dotenv()
engine = create_engine(os.getenv("DB_URL"))

def load_data(df, table_name="sales"):
    df.to_sql(table_name, engine, if_exists="append", index=False)
    print(f"Data loaded successfully into {table_name}")


if __name__ == "__main__":
    from extract import extract_data
    from transform import clean_data

    df = extract_data()
    df = clean_data(df)
    load_data(df)
sql/table_creation.sql
CREATE TABLE sales (
    row_id              int,
    order_id            text,
    order_date          date,
    ship_date           date,
    ship_mode           text,
    customer_id         text,
    customer_full_name  text,
    segment             text,
    city                text,
    state_name          text,
    country             text,
    postal_code         int,
    market              text,
    region              text,
    product_id          text,
    category            text,
    sub_category        text,
    product_name        text,
    sales               numeric,
    quantity            int,
    discount            numeric,
    profit              numeric,
    shipping_cost       numeric,
    order_priority      text
);

-- Verify columns and types
SELECT column_name  "Columns",
       data_type    "Types"
FROM   information_schema.columns
WHERE  table_name = 'sales';
sql/data_analysis.sql
-- KPIs: total orders, revenue, profit
SELECT
    COUNT(*) AS total_orders,
    ROUND(SUM(sales),  2) AS total_revenue,
    ROUND(SUM(profit), 2) AS total_profit,
    ROUND(AVG(profit), 2) AS avg_profit_per_order
FROM sales;



-- Best 3 Selling Products
SELECT product_name,
       ROUND(SUM(sales), 2) AS total_sales
FROM   sales
GROUP  BY product_name
ORDER  BY 2 DESC
LIMIT  3;



-- Monthly Sales Trend
SELECT to_char(order_date, 'Month') AS month,
       ROUND(SUM(sales), 2)          AS total_sales
FROM   sales
GROUP  BY 1, date_part('month', order_date)
ORDER  BY date_part('month', order_date);



-- Top 5 Customers by Sales
SELECT customer_full_name,
       ROUND(SUM(sales), 2) AS total_sales
FROM   sales
GROUP  BY customer_full_name
ORDER  BY total_sales DESC
LIMIT  5;



-- Sales by Region
SELECT region,
       ROUND(SUM(sales), 2) AS total_revenue
FROM   sales
GROUP  BY region
ORDER  BY 2 DESC;
Technologies

Tech Stack

🐍

Python

Core scripting language for the ETL orchestration via main.py

🐼

Pandas

Data extraction, column renaming, type casting, deduplication

🗄️

PostgreSQL

Target relational database. Schema designed and queried with SQL

⚙️

SQLAlchemy

Python SQL toolkit used as the database engine connector

🔐

python-dotenv

Secures DB credentials by loading them from a .env file

📊

Jupyter

Visualizations notebook for exploratory data analysis

Reflection

Key Takeaways

01

ETL Design Patterns

Structuring a pipeline into separate Extract, Transform, and Load modules makes the code reusable, testable, and easy to extend.

02

Database Schema First

Designing and creating the PostgreSQL table before loading data ensures type safety and prevents silent failures on insert.

03

SQL Analytics Power

Writing grouped and ordered SQL queries directly against the loaded table produces business insights instantly without additional tooling.

04

Secrets Management

Using python-dotenv and .gitignore to protect credentials is essential even in personal projects — good habits from day one.

Project Layout

Folder Structure

📁 Superstore/
├── 📁 data/
│  └── 📄 raw_data.csv Global Superstore dataset
├── 📁 scripts/
│  ├── 🐍 extract.py Read CSV with Pandas
│  ├── 🐍 transform.py Clean & rename columns
│  ├── 🐍 load.py Push to PostgreSQL
│  └── 🐍 main.py Pipeline orchestrator
├── 📁 sql/
│  ├── 🗄️ table_creation.sql DB schema definition
│  └── 🗄️ data_analysis.sql KPIs & analytics queries
├── 📁 outputs/
│  └── 📓 Visualizations.ipynb EDA notebook
├── 📄 requirements.txt
└── 🔐 .env DB credentials (gitignored)

See the full source code

All scripts, SQL files, and the Jupyter notebook are available on GitHub.

View on GitHub