Skip to content

shweta-dataverse/realtime-mobility-streaming-kafka

Repository files navigation

πŸš€ Real‑Time Ride Data Pipeline & Mobility Analytics Platform

A full‑stack, real‑time data engineering system built with streaming, warehousing, modeling, and analytics.


🧰 Tech Stack

Layer Technology
Event Ingestion FastAPI
Streaming Backbone Kafka β€’ Zookeeper
Real‑Time Processing Python β€’ aiokafka β€’ asyncio
Warehouse PostgreSQL
Modeling dbt
Dashboards Streamlit
Infrastructure Docker β€’ docker‑compose

🧠 Overview

This project implements a real‑time mobility analytics platform that ingests streaming event data, processes it in near real time, models it in a warehouse, and exposes it through interactive dashboards.

It is designed as an end‑to‑end data engineering system, showcasing:

  • Event ingestion via an HTTP API
  • Streaming transport using Kafka
  • Real‑time ETL with Python and aiokafka
  • Analytical storage in PostgreSQL
  • Transformation and modeling with dbt
  • Visualization and storytelling with Streamlit
  • Fully containerized infrastructure with Docker

πŸ”„ Architecture / Data Flow

Architecture Diagram

1. Data Generator

Continuously emits synthetic mobility events (request β†’ accept β†’ complete, surge, fare, timestamps).
This simulates a high‑velocity event stream for testing real‑time pipelines.


2. FastAPI Ingestion Layer

Receives incoming events over HTTP and publishes them to Kafka.
Pydantic schemas enforce strict validation and schema consistency before data enters the streaming system.


3. Kafka + Zookeeper

Kafka acts as the durable, high‑throughput event log.
Events are written to the raw-rides topic, enabling scalable, decoupled processing.
Zookeeper coordinates Kafka brokers and manages metadata.


4. Stream Processor (Python + aiokafka)

Consumes events from Kafka in real time, applies operational cleaning and enrichment, and loads them into PostgreSQL.
This forms the real‑time ETL layer of the system.


5. PostgreSQL Warehouse

Serves as the analytical storage layer.
Stores:

  • fact_rides β€” raw, high‑granularity event data
  • dim_users, dim_locations β€” lookup dimensions
  • analytics.* β€” dbt‑generated models

Postgres provides ACID guarantees, strong indexing, and seamless integration with dbt and Streamlit.


6. dbt (Data Build Tool)

Transforms raw warehouse tables into analytics‑ready models.

  • Staging models (stg_rides) standardize and clean raw data
  • Mart models (fct_ride_conversion) compute business‑level metrics
  • Tests + documentation ensure data quality and lineage

7. Streamlit Dashboards

Interactive dashboards built directly on top of the warehouse and dbt models.

Includes:

  • Conversion funnel
  • Marketplace KPIs
  • Route intelligence
  • Surge pricing insights
  • Data quality monitoring

πŸ“¦ Project Structure


realtime-mobility-streaming-kafka/
β”œβ”€β”€ data_generator/                  # Real-time ride event generator
β”‚   └── main.py
β”‚
β”œβ”€β”€ db/                              # Database bootstrap & reference queries
β”‚   β”œβ”€β”€ init.sql                     # DDL + seed + initial setup for Postgres
β”‚   └── queries/                     # Reference Queries
β”‚       β”œβ”€β”€ 0_data_quality.sql
β”‚       β”œβ”€β”€ 1_basic_kpis.sql
β”‚       └── 2_funnel_analysis.sql
β”‚
β”œβ”€β”€ dbt_project/                     # dbt analytics project
β”‚   β”œβ”€β”€ dbt_project.yml              # config
β”‚   └── models/
β”‚       β”œβ”€β”€ marts/                   # Business logic models
β”‚       β”‚   └── fct_ride_conversion.sql
β”‚       └── staging/                 # cleaning of raw tables
β”‚           └── stg_ride_events.sql
β”‚
β”œβ”€β”€ ingestion_api/                   # FastAPI ingestion service
β”‚   β”œβ”€β”€ kafka_producer.py            # Kafka producer logic
β”‚   β”œβ”€β”€ main.py                      # API endpoints (ingest β†’ Kafka)
β”‚   └── schemas.py                   # Pydantic validation models
β”‚
β”œβ”€β”€ pages/                           # Streamlit dashboard pages
β”‚   β”œβ”€β”€ conversion_funnel.py
β”‚   β”œβ”€β”€ marketplace_kpis.py
β”‚   β”œβ”€β”€ route_intelligence.py
β”‚   └── surge_pricing.py
β”‚
β”œβ”€β”€ stream_processor/                # Kafka consumer β†’ Postgres writer
β”‚   β”œβ”€β”€ processor.py                 # Real-time ETL (consume, transform, load)
β”‚   └── transformations.py           # cleaning & enrichment logic
β”‚
β”œβ”€β”€ utils/                           # Shared utilities
β”‚   └── db.py                        # PostgreSQL connection for dashboards
β”‚
β”œβ”€β”€ dashboard.py                     # Main Streamlit entrypoint
β”œβ”€β”€ docker-compose.yml               # Kafka + Zookeeper + Postgres infra
└── requirements.txt                 # project dependencies


How to Run the Project

  1. Start Kafka, Zookeeper, PostgreSQL, and Kafka‑UI
docker-compose up -d
  1. Start the FastAPI ingestion service
uvicorn ingestion_api.main:app --host 0.0.0.0 --port 8000 --reload
  1. Start the stream processor
python stream_processor/processor.py
  1. Start the real-time data generator
python data_generator/main.py
  1. Build analytics models with dbt
cd dbt_project/mobility_analytics
dbt run
cd ../..
  1. Launch the Streamlit dashboard
streamlit run dashboard.py

Services Access

πŸ“˜ FastAPI Swagger UI

fastapi

Access the interactive API docs here: http://localhost:8000/docs


🧭 Kafka UI (Optional)

kafka UI

Inspect topics, partitions, and consumer lag here: http://localhost:8080


πŸ“Š Streamlit Dashboard

dashboard

Open the analytics dashboard here: http://localhost:8501


πŸš€ Project Enhancements (Future Work)

  • Real‑time ML models (fraud detection, ETA prediction, anomaly detection)
  • Monitoring & observability with Prometheus + Grafana
  • CI/CD pipelines for API, processor, dbt, and dashboards
  • Expanded dbt models for deeper marketplace, supply‑demand, and surge analytics

πŸ‘©β€πŸ’» Author

Shweta β€” Full‑Stack Data Engineer
Real‑Time Streaming β€’ Warehousing β€’ Analytics Engineering β€’ System Design


πŸ“„ License

This project is licensed under CC BY‑NC‑ND 4.0.
You may share it with attribution, but no commercial use and no modifications are allowed.

Releases

No releases published

Packages

 
 
 

Contributors

Languages