How to Build a Price Tracking Bot for E-Commerce Websites (Python, 2026)

Price moves constantly. Amazon adjusts prices millions of times per day. Shopify merchants reprice based on demand signals. Marketplaces run flash sales that last hours. For any business making purchasing or pricing decisions โ or any developer building competitive intelligence tools โ checking prices manually is not a strategy.
A price tracking bot changes this. Run it on a schedule, store every data point, alert when something meaningful changes, and you have an intelligence feed that runs 24/7 without anyone opening a browser.
This tutorial builds one from the ground up: a production-ready Python price tracker that monitors products across multiple e-commerce platforms, stores historical price data, detects changes, and sends alerts via email or Slack. Every section produces working code. By the end, you'll have a complete system you can deploy and extend.
What We're Building
Before touching code, be specific about the system architecture. Vague "price tracker" tutorials produce fragile scripts. Production-grade trackers have five clear components:
1. Data collection layer โ fetches current price, availability, and product details for a URL list. This is where the scraping happens. We'll use ScrapeBadger's API to handle anti-bot bypass, JavaScript rendering, and proxy rotation automatically.
2. Storage layer โ persists every price observation with a timestamp, enabling historical analysis. SQLite for development, PostgreSQL-compatible for production.
3. Change detection layer โ compares new observations against previous ones, flags meaningful changes (price drops, out-of-stock transitions, back-in-stock alerts).
4. Alert layer โ delivers notifications when changes occur. Email via SMTP, Slack webhooks, or both.
5. Scheduling layer โ runs the collection cycle on a configurable interval without any manual intervention.
The complete system diagram:
[URL List] โ [Scraper] โ [Parser] โ [Database]
โ
[Change Detector]
โ
[Alert Dispatcher] โ [Email / Slack]Setup and Dependencies
bash
pip install requests beautifulsoup4 sqlalchemy pydantic apscheduler python-dotenvProject structure:
price_tracker/
โโโ .env
โโโ config.py
โโโ models.py
โโโ scraper.py
โโโ database.py
โโโ detector.py
โโโ alerts.py
โโโ scheduler.py
โโโ main.pyYour .env file:
env
SCRAPEBADGER_API_KEY=your_scrapebadger_key
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your@gmail.com
SMTP_PASSWORD=your_app_password
ALERT_EMAIL=alerts@yourcompany.com
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
CHECK_INTERVAL_MINUTES=60
PRICE_DROP_THRESHOLD=0.05 # Alert on 5%+ dropsStep 1: Define Your Data Models
Strong typing prevents the silent data quality failures that plague most scrapers. A price that comes back as "ยฃ29.99" when you expect 29.99 corrupts your change detection logic if you don't catch it at the boundary.
python
# models.py
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime
import re
class ProductPrice(BaseModel):
url: str
name: Optional[str]
current_price: Optional[float]
original_price: Optional[float]
currency: str = "USD"
availability: Optional[str]
platform: Optional[str]
scraped_at: datetime = None
@validator("current_price", "original_price", pre=True)
def parse_price(cls, v):
"""
Normalise price strings to float.
Handles: "ยฃ29.99", "$1,299.00", "29.99", None
"""
if v is None:
return None
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, str):
# Strip currency symbols, commas, whitespace
cleaned = re.sub(r"[^\d.]", "", v.replace(",", ""))
try:
return float(cleaned)
except ValueError:
return None
return None
@validator("availability", pre=True)
def normalise_availability(cls, v):
"""Normalise availability to standard values."""
if v is None:
return "unknown"
v_lower = str(v).lower()
if any(term in v_lower for term in ["in stock", "available", "instock", "true"]):
return "in_stock"
if any(term in v_lower for term in ["out of stock", "unavailable", "sold out", "false"]):
return "out_of_stock"
if "limited" in v_lower or "low stock" in v_lower:
return "low_stock"
return v
def discount_percentage(self) -> Optional[float]:
"""Calculate discount percentage if both prices available."""
if self.original_price and self.current_price and self.original_price > 0:
return round((self.original_price - self.current_price) / self.original_price * 100, 1)
return None
class Config:
json_encoders = {datetime: lambda v: v.isoformat()}Step 2: The Scraping Layer
This is where most tutorials break. They show you BeautifulSoup scraping a simple HTML page, which works for about 10 minutes before Amazon returns a CAPTCHA, Shopify serves Cloudflare, or ASOS loads everything via JavaScript.
We use ScrapeBadger to handle the infrastructure โ proxy rotation, TLS fingerprinting, JavaScript rendering, and anti-bot bypass are handled transparently. You pass a URL; you get back usable data. As detailed in the ScrapeBadger guide to scraping without getting blocked, the difference between a scraper that works at machine speed and one that works reliably at production scale is almost always infrastructure, not code logic.
python
# scraper.py
import requests
import os
import time
import random
from bs4 import BeautifulSoup
from typing import Optional
from models import ProductPrice
from datetime import datetime
import json
import re
API_KEY = os.getenv("SCRAPEBADGER_API_KEY")
BASE_URL = "https://api.scrapebadger.com/v1/scrape"
def detect_platform(url: str) -> str:
"""Identify the e-commerce platform from the URL."""
url_lower = url.lower()
if "amazon" in url_lower:
return "amazon"
if "shopify" in url_lower or ".myshopify.com" in url_lower:
return "shopify"
if "ebay" in url_lower:
return "ebay"
if "etsy" in url_lower:
return "etsy"
if "walmart" in url_lower:
return "walmart"
if "asos" in url_lower:
return "asos"
return "generic"
def fetch_page(url: str, render_js: bool = True) -> Optional[str]:
"""
Fetch a page via ScrapeBadger API.
Handles anti-bot bypass, proxy rotation, and JS rendering automatically.
"""
try:
response = requests.get(
BASE_URL,
headers={"X-API-Key": API_KEY},
params={
"url": url,
"render_js": render_js,
"wait_for": "networkidle",
},
timeout=30
)
response.raise_for_status()
return response.text
except requests.exceptions.Timeout:
print(f"Timeout fetching {url}")
return None
except requests.exceptions.HTTPError as e:
print(f"HTTP {e.response.status_code} for {url}")
return None
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def extract_schema_org_data(soup: BeautifulSoup) -> dict:
"""
Extract product data from schema.org JSON-LD markup.
This is the most reliable extraction method โ consistent across
platforms and themes, used by Google's own crawlers.
Works on WooCommerce, Shopify, and most modern e-commerce sites.
"""
for script in soup.find_all("script", {"type": "application/ld+json"}):
try:
data = json.loads(script.string or "")
if isinstance(data, list):
data = next((d for d in data if d.get("@type") in ["Product", "ItemPage"]), {})
if data.get("@type") == "Product":
offers = data.get("offers", {})
if isinstance(offers, list):
offers = offers[0] if offers else {}
return {
"name": data.get("name"),
"price": offers.get("price"),
"currency": offers.get("priceCurrency", "USD"),
"availability": offers.get("availability", ""),
}
except (json.JSONDecodeError, AttributeError, StopIteration):
continue
return {}
def parse_amazon(soup: BeautifulSoup, url: str) -> ProductPrice:
"""Amazon-specific parsing logic."""
data = extract_schema_org_data(soup)
# Amazon fallbacks for fields schema.org sometimes misses
if not data.get("price"):
price_el = (
soup.select_one(".a-price .a-offscreen") or
soup.select_one("#priceblock_ourprice") or
soup.select_one("#priceblock_dealprice")
)
if price_el:
data["price"] = price_el.get_text(strip=True)
if not data.get("name"):
title_el = soup.select_one("#productTitle")
if title_el:
data["name"] = title_el.get_text(strip=True)
original_price = None
original_el = soup.select_one(".a-text-price .a-offscreen")
if original_el:
original_price = original_el.get_text(strip=True)
availability = "unknown"
avail_el = soup.select_one("#availability")
if avail_el:
availability = avail_el.get_text(strip=True)
return ProductPrice(
url=url,
name=data.get("name"),
current_price=data.get("price"),
original_price=original_price,
currency=data.get("currency", "USD"),
availability=availability,
platform="amazon",
scraped_at=datetime.utcnow(),
)
def parse_shopify(soup: BeautifulSoup, url: str) -> ProductPrice:
"""
Shopify-specific parsing.
Most Shopify stores embed full product data in a JSON blob within the page.
This is more reliable than CSS selectors, which vary by theme.
"""
# Method 1: Shopify product JSON blob (most reliable)
for script in soup.find_all("script"):
if script.string and "var meta = " in (script.string or ""):
try:
match = re.search(r'var meta = ({.*?});', script.string, re.DOTALL)
if match:
meta = json.loads(match.group(1))
product = meta.get("product", {})
variant = product.get("variants", [{}])[0]
return ProductPrice(
url=url,
name=product.get("title"),
current_price=variant.get("price", 0) / 100, # Shopify stores in cents
original_price=variant.get("compare_at_price", 0) / 100 or None,
currency=meta.get("currency", "USD"),
availability="in_stock" if variant.get("available") else "out_of_stock",
platform="shopify",
scraped_at=datetime.utcnow(),
)
except (json.JSONDecodeError, KeyError, TypeError):
pass
# Method 2: Schema.org fallback
data = extract_schema_org_data(soup)
return ProductPrice(
url=url,
name=data.get("name"),
current_price=data.get("price"),
currency=data.get("currency", "USD"),
availability=data.get("availability", "unknown"),
platform="shopify",
scraped_at=datetime.utcnow(),
)
def parse_generic(soup: BeautifulSoup, url: str, platform: str) -> ProductPrice:
"""
Generic parser using schema.org data.
Works on WooCommerce, Magento, and most modern e-commerce platforms.
Falls back to common CSS patterns if schema.org is absent.
"""
data = extract_schema_org_data(soup)
if data.get("price"):
return ProductPrice(
url=url,
name=data.get("name"),
current_price=data.get("price"),
currency=data.get("currency", "USD"),
availability=data.get("availability", "unknown"),
platform=platform,
scraped_at=datetime.utcnow(),
)
# CSS selector fallbacks for common e-commerce patterns
price_selectors = [
".price", ".product-price", ".woocommerce-Price-amount",
"[data-price]", ".offer-price", ".sale-price", ".current-price",
"span.price", ".ProductMeta__Price", ".price__current",
]
price_text = None
for selector in price_selectors:
el = soup.select_one(selector)
if el:
price_text = el.get_text(strip=True)
break
name_selectors = [
"h1.product-title", "h1.product_title", ".product-name h1",
"h1[itemprop='name']", ".ProductMeta__Title", "h1",
]
name = None
for selector in name_selectors:
el = soup.select_one(selector)
if el:
name = el.get_text(strip=True)[:200]
break
return ProductPrice(
url=url,
name=name,
current_price=price_text,
currency="USD",
availability="unknown",
platform=platform,
scraped_at=datetime.utcnow(),
)
def scrape_product(url: str) -> Optional[ProductPrice]:
"""
Main scraping entrypoint. Detects platform and routes to
the appropriate parser.
"""
platform = detect_platform(url)
html = fetch_page(url)
if not html:
print(f"Failed to fetch {url}")
return None
soup = BeautifulSoup(html, "html.parser")
if platform == "amazon":
return parse_amazon(soup, url)
elif platform == "shopify":
return parse_shopify(soup, url)
else:
return parse_generic(soup, url, platform)The schema.org approach deserves emphasis. As detailed in the ScrapeBadger e-commerce scraping guide, schema.org structured data is the most stable extraction target on any e-commerce site โ it's what Google's crawlers use, so it gets maintained when other HTML changes. CSS class selectors break when a developer updates a theme; schema.org markup breaks only when a store explicitly removes it.
Step 3: The Database Layer
Every price observation gets stored. This is what separates a "check the current price" script from an intelligence tool โ historical data is what lets you spot trends, calculate how long a sale has been running, and build the change detection logic.
python
# database.py
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///price_tracker.db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
class PriceRecord(Base):
__tablename__ = "price_records"
id = Column(Integer, primary_key=True)
url = Column(String, index=True, nullable=False)
product_name = Column(String)
current_price = Column(Float)
original_price = Column(Float)
currency = Column(String, default="USD")
availability = Column(String)
platform = Column(String)
discount_pct = Column(Float)
scraped_at = Column(DateTime, default=datetime.utcnow, index=True)
class TrackedProduct(Base):
__tablename__ = "tracked_products"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True, nullable=False)
name = Column(String)
target_price = Column(Float, nullable=True) # Alert when below this
alert_on_any_drop = Column(Boolean, default=True)
alert_on_stock_change = Column(Boolean, default=True)
is_active = Column(Boolean, default=True)
added_at = Column(DateTime, default=datetime.utcnow)
last_checked = Column(DateTime)
last_price = Column(Float)
last_availability = Column(String)
Base.metadata.create_all(engine)
class PriceDatabase:
def save_price(self, product_price) -> PriceRecord:
"""Store a price observation."""
with SessionLocal() as session:
record = PriceRecord(
url=product_price.url,
product_name=product_price.name,
current_price=product_price.current_price,
original_price=product_price.original_price,
currency=product_price.currency,
availability=product_price.availability,
platform=product_price.platform,
discount_pct=product_price.discount_percentage(),
scraped_at=product_price.scraped_at or datetime.utcnow(),
)
session.add(record)
session.commit()
session.refresh(record)
return record
def get_previous_record(self, url: str) -> PriceRecord | None:
"""Get the most recent price record for a URL."""
with SessionLocal() as session:
return (
session.query(PriceRecord)
.filter(PriceRecord.url == url)
.order_by(PriceRecord.scraped_at.desc())
.first()
)
def get_price_history(self, url: str, days: int = 30) -> list[PriceRecord]:
"""Get price history for a URL over the last N days."""
from datetime import timedelta
cutoff = datetime.utcnow() - timedelta(days=days)
with SessionLocal() as session:
return (
session.query(PriceRecord)
.filter(PriceRecord.url == url, PriceRecord.scraped_at >= cutoff)
.order_by(PriceRecord.scraped_at.asc())
.all()
)
def get_active_products(self) -> list[TrackedProduct]:
"""Get all active products for monitoring."""
with SessionLocal() as session:
return (
session.query(TrackedProduct)
.filter(TrackedProduct.is_active == True)
.all()
)
def update_product_last_seen(self, url: str, price: float, availability: str):
"""Update the tracked product's last known state."""
with SessionLocal() as session:
product = session.query(TrackedProduct).filter_by(url=url).first()
if product:
product.last_checked = datetime.utcnow()
product.last_price = price
product.last_availability = availability
session.commit()
def add_product(self, url: str, target_price: float = None) -> TrackedProduct:
"""Add a new URL to the tracking list."""
with SessionLocal() as session:
existing = session.query(TrackedProduct).filter_by(url=url).first()
if existing:
existing.is_active = True
session.commit()
return existing
product = TrackedProduct(url=url, target_price=target_price)
session.add(product)
session.commit()
session.refresh(product)
return product
def get_price_stats(self, url: str, days: int = 30) -> dict:
"""Calculate price statistics over a period."""
history = self.get_price_history(url, days)
prices = [r.current_price for r in history if r.current_price]
if not prices:
return {}
return {
"min_price": min(prices),
"max_price": max(prices),
"avg_price": sum(prices) / len(prices),
"current_price": prices[-1] if prices else None,
"observations": len(prices),
"period_days": days,
}Step 4: Change Detection
This is where the data becomes intelligence. Price change detection has three tiers: target price hit (the most actionable alert), percentage drop (flexible threshold), and availability change (often more valuable than price for certain use cases).
python
# detector.py
import os
from dataclasses import dataclass
from typing import Optional
from models import ProductPrice
from database import PriceDatabase, PriceRecord
PRICE_DROP_THRESHOLD = float(os.getenv("PRICE_DROP_THRESHOLD", "0.05"))
db = PriceDatabase()
@dataclass
class PriceAlert:
url: str
product_name: Optional[str]
alert_type: str # "target_hit", "price_drop", "price_rise", "back_in_stock", "out_of_stock"
current_price: Optional[float]
previous_price: Optional[float]
target_price: Optional[float]
current_availability: Optional[str]
previous_availability: Optional[str]
change_pct: Optional[float] = None
currency: str = "USD"
def format_message(self) -> str:
name = self.product_name or self.url
currency_symbol = {"USD": "$", "GBP": "ยฃ", "EUR": "โฌ"}.get(self.currency, "$")
if self.alert_type == "target_hit":
return (
f"๐ฏ TARGET PRICE HIT: {name}\n"
f"Current: {currency_symbol}{self.current_price:.2f} "
f"(target: {currency_symbol}{self.target_price:.2f})\n"
f"URL: {self.url}"
)
elif self.alert_type == "price_drop":
return (
f"๐ PRICE DROP: {name}\n"
f"{currency_symbol}{self.previous_price:.2f} โ "
f"{currency_symbol}{self.current_price:.2f} "
f"({abs(self.change_pct):.1f}% drop)\n"
f"URL: {self.url}"
)
elif self.alert_type == "back_in_stock":
return (
f"โ
BACK IN STOCK: {name}\n"
f"Now available at {currency_symbol}{self.current_price:.2f}\n"
f"URL: {self.url}"
)
elif self.alert_type == "out_of_stock":
return (
f"โ OUT OF STOCK: {name}\n"
f"Last price: {currency_symbol}{self.current_price:.2f}\n"
f"URL: {self.url}"
)
else:
return f"Price update for {name}: {currency_symbol}{self.current_price:.2f}"
def detect_changes(
current: ProductPrice,
previous: PriceRecord,
target_price: float = None,
alert_on_any_drop: bool = True,
alert_on_stock_change: bool = True,
) -> list[PriceAlert]:
"""
Compare current observation against previous record.
Returns list of alerts triggered by the change.
"""
alerts = []
current_price = current.current_price
prev_price = previous.current_price if previous else None
current_avail = current.availability
prev_avail = previous.availability if previous else None
# Target price alert โ most important, check first
if target_price and current_price and current_price <= target_price:
# Only alert if we haven't already hit target (avoid repeat alerts)
if not prev_price or prev_price > target_price:
alerts.append(PriceAlert(
url=current.url,
product_name=current.name,
alert_type="target_hit",
current_price=current_price,
previous_price=prev_price,
target_price=target_price,
current_availability=current_avail,
previous_availability=prev_avail,
currency=current.currency,
))
# Price change alerts
if current_price and prev_price and current_price != prev_price:
change_pct = (current_price - prev_price) / prev_price
if change_pct < -PRICE_DROP_THRESHOLD and alert_on_any_drop:
alerts.append(PriceAlert(
url=current.url,
product_name=current.name,
alert_type="price_drop",
current_price=current_price,
previous_price=prev_price,
target_price=target_price,
current_availability=current_avail,
previous_availability=prev_avail,
change_pct=change_pct * 100,
currency=current.currency,
))
# Availability change alerts
if alert_on_stock_change and current_avail != prev_avail and prev_avail:
if current_avail == "in_stock" and prev_avail == "out_of_stock":
alerts.append(PriceAlert(
url=current.url,
product_name=current.name,
alert_type="back_in_stock",
current_price=current_price,
previous_price=prev_price,
target_price=target_price,
current_availability=current_avail,
previous_availability=prev_avail,
currency=current.currency,
))
elif current_avail == "out_of_stock" and prev_avail == "in_stock":
alerts.append(PriceAlert(
url=current.url,
product_name=current.name,
alert_type="out_of_stock",
current_price=current_price,
previous_price=prev_price,
target_price=target_price,
current_availability=current_avail,
previous_availability=prev_avail,
currency=current.currency,
))
return alertsStep 5: Alert Delivery
Alerts are only useful if they reach someone who can act on them. We support both email (for personal use and formal reporting) and Slack (for team workflows).
python
# alerts.py
import os
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional
from detector import PriceAlert
SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
def send_email_alert(alert: PriceAlert) -> bool:
"""Send price alert via email."""
if not all([SMTP_USER, SMTP_PASSWORD, ALERT_EMAIL]):
print("Email not configured โ skipping")
return False
subject_map = {
"target_hit": "๐ฏ Target Price Hit",
"price_drop": "๐ Price Drop Alert",
"back_in_stock": "โ
Back In Stock",
"out_of_stock": "โ Out of Stock",
}
subject = subject_map.get(alert.alert_type, "Price Alert")
if alert.product_name:
subject += f": {alert.product_name[:50]}"
body = alert.format_message()
if alert.url:
body += f"\n\nView product: {alert.url}"
msg = MIMEMultipart()
msg["From"] = SMTP_USER
msg["To"] = ALERT_EMAIL
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
print(f"Email sent: {subject}")
return True
except Exception as e:
print(f"Email failed: {e}")
return False
def send_slack_alert(alert: PriceAlert) -> bool:
"""Send price alert to Slack webhook."""
if not SLACK_WEBHOOK:
return False
emoji_map = {
"target_hit": "๐ฏ",
"price_drop": "๐",
"back_in_stock": "โ
",
"out_of_stock": "โ",
"price_rise": "๐",
}
color_map = {
"target_hit": "#00ff00",
"price_drop": "#00aa00",
"back_in_stock": "#0088ff",
"out_of_stock": "#ff4444",
}
payload = {
"attachments": [{
"color": color_map.get(alert.alert_type, "#888888"),
"title": f"{emoji_map.get(alert.alert_type, '')} {alert.product_name or 'Price Alert'}",
"title_link": alert.url,
"text": alert.format_message(),
"footer": "ScrapeBadger Price Tracker",
}]
}
try:
response = requests.post(SLACK_WEBHOOK, json=payload, timeout=10)
response.raise_for_status()
print(f"Slack alert sent: {alert.alert_type}")
return True
except Exception as e:
print(f"Slack alert failed: {e}")
return False
def dispatch_alerts(alerts: list[PriceAlert], channels: list[str] = None):
"""Send alerts to all configured channels."""
if not alerts:
return
channels = channels or ["email", "slack"]
for alert in alerts:
print(f"\n๐ {alert.format_message()}\n")
if "email" in channels:
send_email_alert(alert)
if "slack" in channels:
send_slack_alert(alert)Step 6: The Main Check Cycle
Everything above comes together in the core check function โ the loop that runs on schedule, processes every tracked product, detects changes, and dispatches alerts.
python
# scheduler.py
import time
import os
from datetime import datetime
from scraper import scrape_product
from database import PriceDatabase
from detector import detect_changes
from alerts import dispatch_alerts
db = PriceDatabase()
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL_MINUTES", "60")) * 60
def check_product(url: str, target_price: float = None) -> dict:
"""
Run a full check cycle for a single product.
Returns a summary of what happened.
"""
print(f"Checking: {url}")
# Scrape current price
current = scrape_product(url)
if not current:
return {"url": url, "status": "failed", "alerts": []}
if not current.current_price:
print(f"No price extracted for {url}")
return {"url": url, "status": "no_price", "alerts": []}
# Get previous record for comparison
previous = db.get_previous_record(url)
# Detect any meaningful changes
alerts = detect_changes(
current=current,
previous=previous,
target_price=target_price,
)
# Store this observation
db.save_price(current)
db.update_product_last_seen(url, current.current_price, current.availability)
# Dispatch alerts
if alerts:
dispatch_alerts(alerts)
return {
"url": url,
"status": "ok",
"product_name": current.name,
"current_price": current.current_price,
"previous_price": previous.current_price if previous else None,
"availability": current.availability,
"alerts": [a.alert_type for a in alerts],
}
def run_check_cycle():
"""Run one complete cycle across all tracked products."""
products = db.get_active_products()
print(f"\n{'='*50}")
print(f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] Checking {len(products)} products")
print('='*50)
results = []
for product in products:
result = check_product(product.url, product.target_price)
results.append(result)
time.sleep(2) # Polite pacing between requests
# Summary
successful = sum(1 for r in results if r["status"] == "ok")
total_alerts = sum(len(r.get("alerts", [])) for r in results)
print(f"\nCycle complete: {successful}/{len(products)} successful, {total_alerts} alerts sent")
return results
def start_scheduler():
"""Start the continuous price tracking scheduler."""
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(
run_check_cycle,
"interval",
minutes=int(os.getenv("CHECK_INTERVAL_MINUTES", "60")),
next_run_time=datetime.now() # Run immediately on start
)
print(f"Price tracker started. Checking every {os.getenv('CHECK_INTERVAL_MINUTES', '60')} minutes.")
print("Press Ctrl+C to stop.\n")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
print("\nPrice tracker stopped.")Step 7: The Entry Point
python
# main.py
import sys
import os
from dotenv import load_dotenv
load_dotenv()
from database import PriceDatabase
from scheduler import run_check_cycle, start_scheduler
db = PriceDatabase()
def add_products(urls: list[str], target_prices: list[float] = None):
"""Add products to the tracking list."""
for i, url in enumerate(urls):
target = target_prices[i] if target_prices and i < len(target_prices) else None
product = db.add_product(url, target)
print(f"Added: {url}" + (f" (target: ${target})" if target else ""))
def show_history(url: str, days: int = 30):
"""Display price history for a product."""
stats = db.get_price_stats(url, days)
if not stats:
print(f"No data for {url}")
return
print(f"\nPrice history for {url} (last {days} days):")
print(f" Current: ${stats['current_price']:.2f}")
print(f" Low: ${stats['min_price']:.2f}")
print(f" High: ${stats['max_price']:.2f}")
print(f" Average: ${stats['avg_price']:.2f}")
print(f" Data points: {stats['observations']}")
if __name__ == "__main__":
command = sys.argv[1] if len(sys.argv) > 1 else "run"
if command == "add":
# python main.py add https://example.com/product 29.99
url = sys.argv[2]
target = float(sys.argv[3]) if len(sys.argv) > 3 else None
add_products([url], [target])
elif command == "check":
# python main.py check โ run one cycle now
run_check_cycle()
elif command == "history":
# python main.py history https://example.com/product
url = sys.argv[2]
show_history(url)
elif command == "run":
# python main.py โ start continuous scheduler
start_scheduler()
else:
print("Commands: add <url> [target_price] | check | history <url> | run")Using It
bash
# Add products to track
python main.py add "https://www.amazon.com/dp/B09V3KXJPB" 199.99
python main.py add "https://competitor-store.myshopify.com/products/widget"
python main.py add "https://www.ebay.com/itm/12345678"
# Run one check right now
python main.py check
# View price history
python main.py history "https://www.amazon.com/dp/B09V3KXJPB"
# Start continuous monitoring (runs every 60 minutes)
python main.py runConsole output during a check cycle:
==================================================
[2026-05-06 14:30:00] Checking 3 products
==================================================
Checking: https://www.amazon.com/dp/B09V3KXJPB
๐ ๐ฏ TARGET PRICE HIT: Sony WH-1000XM5 Wireless Headphones
Current: $189.99 (target: $199.99)
URL: https://www.amazon.com/dp/B09V3KXJPB
Email sent: ๐ฏ Target Price Hit: Sony WH-1000XM5...
Slack alert sent: target_hit
Cycle complete: 3/3 successful, 1 alerts sentTaking It Further
The system above handles the core loop reliably. Three extensions that add the most value for production use:
Multi-currency normalisation. If you're tracking across markets โ UK prices in GBP, EU prices in EUR, US prices in USD โ add a currency normalisation layer that converts all prices to a base currency using a live exchange rate API before comparison. Open Exchange Rates has a free tier.
Price history visualisation. SQLite stores the history; building a simple dashboard on top of it with Streamlit takes about 50 lines. Plot price over time, mark the points when alerts fired, and you have a proper price intelligence dashboard.
Bulk URL management. For competitive price monitoring at scale โ tracking hundreds of SKUs across multiple competitor sites โ the ScrapeBadger blog post on e-commerce scraping covers the Shopify JSON API approach that cuts request volume dramatically: instead of scraping individual product pages, a single /products.json?limit=250 call returns all products and variants for a Shopify store in one request.
For teams building serious price intelligence products โ monitoring tens of thousands of SKUs, running multi-region checks, or feeding price data into repricers or BI tools โ the ScrapeBadger documentation covers batch processing, async request patterns, and the CLI tooling for scheduled pipeline management without custom scheduler code.
If you want to connect price data to an AI agent that makes repricing recommendations, the ScrapeBadger MCP server exposes the full scraping API to any MCP-compatible agent โ your AI gets live price data from any URL as a tool call. Setup is covered in the MCP documentation.
The complete code for this tutorial is runnable as described. Start with a handful of products you actually care about โ a competitor's key SKUs, products you're considering purchasing, or items you're monitoring for your own store. The value compounds as the history builds.

Written by
Thomas Shultz
Thomas Shultz is the Head of Data at ScrapeBadger, working on public web data, scraping infrastructure, and data reliability. He writes about real-world scraping, data pipelines, and turning unstructured web data into usable signals.
Ready to get started?
Join thousands of developers using ScrapeBadger for their data needs.