Back to Blog

How to Build a Price Tracking Bot for E-Commerce Websites (Python, 2026)

Thomas ShultzThomas Shultz
18 min read
10 views
Price Tracking Bot for E-Commerce

Price moves constantly. Amazon adjusts prices millions of times per day. Shopify merchants reprice based on demand signals. Marketplaces run flash sales that last hours. For any business making purchasing or pricing decisions โ€” or any developer building competitive intelligence tools โ€” checking prices manually is not a strategy.

A price tracking bot changes this. Run it on a schedule, store every data point, alert when something meaningful changes, and you have an intelligence feed that runs 24/7 without anyone opening a browser.

This tutorial builds one from the ground up: a production-ready Python price tracker that monitors products across multiple e-commerce platforms, stores historical price data, detects changes, and sends alerts via email or Slack. Every section produces working code. By the end, you'll have a complete system you can deploy and extend.

What We're Building

Before touching code, be specific about the system architecture. Vague "price tracker" tutorials produce fragile scripts. Production-grade trackers have five clear components:

1. Data collection layer โ€” fetches current price, availability, and product details for a URL list. This is where the scraping happens. We'll use ScrapeBadger's API to handle anti-bot bypass, JavaScript rendering, and proxy rotation automatically.

2. Storage layer โ€” persists every price observation with a timestamp, enabling historical analysis. SQLite for development, PostgreSQL-compatible for production.

3. Change detection layer โ€” compares new observations against previous ones, flags meaningful changes (price drops, out-of-stock transitions, back-in-stock alerts).

4. Alert layer โ€” delivers notifications when changes occur. Email via SMTP, Slack webhooks, or both.

5. Scheduling layer โ€” runs the collection cycle on a configurable interval without any manual intervention.

The complete system diagram:

[URL List] โ†’ [Scraper] โ†’ [Parser] โ†’ [Database]
                                         โ†“
                               [Change Detector]
                                         โ†“
                               [Alert Dispatcher] โ†’ [Email / Slack]

Setup and Dependencies

bash

pip install requests beautifulsoup4 sqlalchemy pydantic apscheduler python-dotenv

Project structure:

price_tracker/
โ”œโ”€โ”€ .env
โ”œโ”€โ”€ config.py
โ”œโ”€โ”€ models.py
โ”œโ”€โ”€ scraper.py
โ”œโ”€โ”€ database.py
โ”œโ”€โ”€ detector.py
โ”œโ”€โ”€ alerts.py
โ”œโ”€โ”€ scheduler.py
โ””โ”€โ”€ main.py

Your .env file:

env

SCRAPEBADGER_API_KEY=your_scrapebadger_key
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your@gmail.com
SMTP_PASSWORD=your_app_password
ALERT_EMAIL=alerts@yourcompany.com
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
CHECK_INTERVAL_MINUTES=60
PRICE_DROP_THRESHOLD=0.05  # Alert on 5%+ drops

Step 1: Define Your Data Models

Strong typing prevents the silent data quality failures that plague most scrapers. A price that comes back as "ยฃ29.99" when you expect 29.99 corrupts your change detection logic if you don't catch it at the boundary.

python

# models.py
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime
import re

class ProductPrice(BaseModel):
    url: str
    name: Optional[str]
    current_price: Optional[float]
    original_price: Optional[float]
    currency: str = "USD"
    availability: Optional[str]
    platform: Optional[str]
    scraped_at: datetime = None

    @validator("current_price", "original_price", pre=True)
    def parse_price(cls, v):
        """
        Normalise price strings to float.
        Handles: "ยฃ29.99", "$1,299.00", "29.99", None
        """
        if v is None:
            return None
        if isinstance(v, (int, float)):
            return float(v)
        if isinstance(v, str):
            # Strip currency symbols, commas, whitespace
            cleaned = re.sub(r"[^\d.]", "", v.replace(",", ""))
            try:
                return float(cleaned)
            except ValueError:
                return None
        return None

    @validator("availability", pre=True)
    def normalise_availability(cls, v):
        """Normalise availability to standard values."""
        if v is None:
            return "unknown"
        v_lower = str(v).lower()
        if any(term in v_lower for term in ["in stock", "available", "instock", "true"]):
            return "in_stock"
        if any(term in v_lower for term in ["out of stock", "unavailable", "sold out", "false"]):
            return "out_of_stock"
        if "limited" in v_lower or "low stock" in v_lower:
            return "low_stock"
        return v

    def discount_percentage(self) -> Optional[float]:
        """Calculate discount percentage if both prices available."""
        if self.original_price and self.current_price and self.original_price > 0:
            return round((self.original_price - self.current_price) / self.original_price * 100, 1)
        return None

    class Config:
        json_encoders = {datetime: lambda v: v.isoformat()}

Step 2: The Scraping Layer

This is where most tutorials break. They show you BeautifulSoup scraping a simple HTML page, which works for about 10 minutes before Amazon returns a CAPTCHA, Shopify serves Cloudflare, or ASOS loads everything via JavaScript.

We use ScrapeBadger to handle the infrastructure โ€” proxy rotation, TLS fingerprinting, JavaScript rendering, and anti-bot bypass are handled transparently. You pass a URL; you get back usable data. As detailed in the ScrapeBadger guide to scraping without getting blocked, the difference between a scraper that works at machine speed and one that works reliably at production scale is almost always infrastructure, not code logic.

python

# scraper.py
import requests
import os
import time
import random
from bs4 import BeautifulSoup
from typing import Optional
from models import ProductPrice
from datetime import datetime
import json
import re

API_KEY = os.getenv("SCRAPEBADGER_API_KEY")
BASE_URL = "https://api.scrapebadger.com/v1/scrape"

def detect_platform(url: str) -> str:
    """Identify the e-commerce platform from the URL."""
    url_lower = url.lower()
    if "amazon" in url_lower:
        return "amazon"
    if "shopify" in url_lower or ".myshopify.com" in url_lower:
        return "shopify"
    if "ebay" in url_lower:
        return "ebay"
    if "etsy" in url_lower:
        return "etsy"
    if "walmart" in url_lower:
        return "walmart"
    if "asos" in url_lower:
        return "asos"
    return "generic"


def fetch_page(url: str, render_js: bool = True) -> Optional[str]:
    """
    Fetch a page via ScrapeBadger API.
    Handles anti-bot bypass, proxy rotation, and JS rendering automatically.
    """
    try:
        response = requests.get(
            BASE_URL,
            headers={"X-API-Key": API_KEY},
            params={
                "url": url,
                "render_js": render_js,
                "wait_for": "networkidle",
            },
            timeout=30
        )
        response.raise_for_status()
        return response.text
    except requests.exceptions.Timeout:
        print(f"Timeout fetching {url}")
        return None
    except requests.exceptions.HTTPError as e:
        print(f"HTTP {e.response.status_code} for {url}")
        return None
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None


def extract_schema_org_data(soup: BeautifulSoup) -> dict:
    """
    Extract product data from schema.org JSON-LD markup.
    This is the most reliable extraction method โ€” consistent across
    platforms and themes, used by Google's own crawlers.
    Works on WooCommerce, Shopify, and most modern e-commerce sites.
    """
    for script in soup.find_all("script", {"type": "application/ld+json"}):
        try:
            data = json.loads(script.string or "")
            if isinstance(data, list):
                data = next((d for d in data if d.get("@type") in ["Product", "ItemPage"]), {})
            if data.get("@type") == "Product":
                offers = data.get("offers", {})
                if isinstance(offers, list):
                    offers = offers[0] if offers else {}
                return {
                    "name": data.get("name"),
                    "price": offers.get("price"),
                    "currency": offers.get("priceCurrency", "USD"),
                    "availability": offers.get("availability", ""),
                }
        except (json.JSONDecodeError, AttributeError, StopIteration):
            continue
    return {}


def parse_amazon(soup: BeautifulSoup, url: str) -> ProductPrice:
    """Amazon-specific parsing logic."""
    data = extract_schema_org_data(soup)

    # Amazon fallbacks for fields schema.org sometimes misses
    if not data.get("price"):
        price_el = (
            soup.select_one(".a-price .a-offscreen") or
            soup.select_one("#priceblock_ourprice") or
            soup.select_one("#priceblock_dealprice")
        )
        if price_el:
            data["price"] = price_el.get_text(strip=True)

    if not data.get("name"):
        title_el = soup.select_one("#productTitle")
        if title_el:
            data["name"] = title_el.get_text(strip=True)

    original_price = None
    original_el = soup.select_one(".a-text-price .a-offscreen")
    if original_el:
        original_price = original_el.get_text(strip=True)

    availability = "unknown"
    avail_el = soup.select_one("#availability")
    if avail_el:
        availability = avail_el.get_text(strip=True)

    return ProductPrice(
        url=url,
        name=data.get("name"),
        current_price=data.get("price"),
        original_price=original_price,
        currency=data.get("currency", "USD"),
        availability=availability,
        platform="amazon",
        scraped_at=datetime.utcnow(),
    )


def parse_shopify(soup: BeautifulSoup, url: str) -> ProductPrice:
    """
    Shopify-specific parsing.
    Most Shopify stores embed full product data in a JSON blob within the page.
    This is more reliable than CSS selectors, which vary by theme.
    """
    # Method 1: Shopify product JSON blob (most reliable)
    for script in soup.find_all("script"):
        if script.string and "var meta = " in (script.string or ""):
            try:
                match = re.search(r'var meta = ({.*?});', script.string, re.DOTALL)
                if match:
                    meta = json.loads(match.group(1))
                    product = meta.get("product", {})
                    variant = product.get("variants", [{}])[0]
                    return ProductPrice(
                        url=url,
                        name=product.get("title"),
                        current_price=variant.get("price", 0) / 100,  # Shopify stores in cents
                        original_price=variant.get("compare_at_price", 0) / 100 or None,
                        currency=meta.get("currency", "USD"),
                        availability="in_stock" if variant.get("available") else "out_of_stock",
                        platform="shopify",
                        scraped_at=datetime.utcnow(),
                    )
            except (json.JSONDecodeError, KeyError, TypeError):
                pass

    # Method 2: Schema.org fallback
    data = extract_schema_org_data(soup)
    return ProductPrice(
        url=url,
        name=data.get("name"),
        current_price=data.get("price"),
        currency=data.get("currency", "USD"),
        availability=data.get("availability", "unknown"),
        platform="shopify",
        scraped_at=datetime.utcnow(),
    )


def parse_generic(soup: BeautifulSoup, url: str, platform: str) -> ProductPrice:
    """
    Generic parser using schema.org data.
    Works on WooCommerce, Magento, and most modern e-commerce platforms.
    Falls back to common CSS patterns if schema.org is absent.
    """
    data = extract_schema_org_data(soup)

    if data.get("price"):
        return ProductPrice(
            url=url,
            name=data.get("name"),
            current_price=data.get("price"),
            currency=data.get("currency", "USD"),
            availability=data.get("availability", "unknown"),
            platform=platform,
            scraped_at=datetime.utcnow(),
        )

    # CSS selector fallbacks for common e-commerce patterns
    price_selectors = [
        ".price", ".product-price", ".woocommerce-Price-amount",
        "[data-price]", ".offer-price", ".sale-price", ".current-price",
        "span.price", ".ProductMeta__Price", ".price__current",
    ]

    price_text = None
    for selector in price_selectors:
        el = soup.select_one(selector)
        if el:
            price_text = el.get_text(strip=True)
            break

    name_selectors = [
        "h1.product-title", "h1.product_title", ".product-name h1",
        "h1[itemprop='name']", ".ProductMeta__Title", "h1",
    ]
    name = None
    for selector in name_selectors:
        el = soup.select_one(selector)
        if el:
            name = el.get_text(strip=True)[:200]
            break

    return ProductPrice(
        url=url,
        name=name,
        current_price=price_text,
        currency="USD",
        availability="unknown",
        platform=platform,
        scraped_at=datetime.utcnow(),
    )


def scrape_product(url: str) -> Optional[ProductPrice]:
    """
    Main scraping entrypoint. Detects platform and routes to
    the appropriate parser.
    """
    platform = detect_platform(url)
    html = fetch_page(url)

    if not html:
        print(f"Failed to fetch {url}")
        return None

    soup = BeautifulSoup(html, "html.parser")

    if platform == "amazon":
        return parse_amazon(soup, url)
    elif platform == "shopify":
        return parse_shopify(soup, url)
    else:
        return parse_generic(soup, url, platform)

The schema.org approach deserves emphasis. As detailed in the ScrapeBadger e-commerce scraping guide, schema.org structured data is the most stable extraction target on any e-commerce site โ€” it's what Google's crawlers use, so it gets maintained when other HTML changes. CSS class selectors break when a developer updates a theme; schema.org markup breaks only when a store explicitly removes it.

Step 3: The Database Layer

Every price observation gets stored. This is what separates a "check the current price" script from an intelligence tool โ€” historical data is what lets you spot trends, calculate how long a sale has been running, and build the change detection logic.

python

# database.py
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os

DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///price_tracker.db")

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()


class PriceRecord(Base):
    __tablename__ = "price_records"

    id = Column(Integer, primary_key=True)
    url = Column(String, index=True, nullable=False)
    product_name = Column(String)
    current_price = Column(Float)
    original_price = Column(Float)
    currency = Column(String, default="USD")
    availability = Column(String)
    platform = Column(String)
    discount_pct = Column(Float)
    scraped_at = Column(DateTime, default=datetime.utcnow, index=True)


class TrackedProduct(Base):
    __tablename__ = "tracked_products"

    id = Column(Integer, primary_key=True)
    url = Column(String, unique=True, nullable=False)
    name = Column(String)
    target_price = Column(Float, nullable=True)  # Alert when below this
    alert_on_any_drop = Column(Boolean, default=True)
    alert_on_stock_change = Column(Boolean, default=True)
    is_active = Column(Boolean, default=True)
    added_at = Column(DateTime, default=datetime.utcnow)
    last_checked = Column(DateTime)
    last_price = Column(Float)
    last_availability = Column(String)


Base.metadata.create_all(engine)


class PriceDatabase:

    def save_price(self, product_price) -> PriceRecord:
        """Store a price observation."""
        with SessionLocal() as session:
            record = PriceRecord(
                url=product_price.url,
                product_name=product_price.name,
                current_price=product_price.current_price,
                original_price=product_price.original_price,
                currency=product_price.currency,
                availability=product_price.availability,
                platform=product_price.platform,
                discount_pct=product_price.discount_percentage(),
                scraped_at=product_price.scraped_at or datetime.utcnow(),
            )
            session.add(record)
            session.commit()
            session.refresh(record)
            return record

    def get_previous_record(self, url: str) -> PriceRecord | None:
        """Get the most recent price record for a URL."""
        with SessionLocal() as session:
            return (
                session.query(PriceRecord)
                .filter(PriceRecord.url == url)
                .order_by(PriceRecord.scraped_at.desc())
                .first()
            )

    def get_price_history(self, url: str, days: int = 30) -> list[PriceRecord]:
        """Get price history for a URL over the last N days."""
        from datetime import timedelta
        cutoff = datetime.utcnow() - timedelta(days=days)
        with SessionLocal() as session:
            return (
                session.query(PriceRecord)
                .filter(PriceRecord.url == url, PriceRecord.scraped_at >= cutoff)
                .order_by(PriceRecord.scraped_at.asc())
                .all()
            )

    def get_active_products(self) -> list[TrackedProduct]:
        """Get all active products for monitoring."""
        with SessionLocal() as session:
            return (
                session.query(TrackedProduct)
                .filter(TrackedProduct.is_active == True)
                .all()
            )

    def update_product_last_seen(self, url: str, price: float, availability: str):
        """Update the tracked product's last known state."""
        with SessionLocal() as session:
            product = session.query(TrackedProduct).filter_by(url=url).first()
            if product:
                product.last_checked = datetime.utcnow()
                product.last_price = price
                product.last_availability = availability
                session.commit()

    def add_product(self, url: str, target_price: float = None) -> TrackedProduct:
        """Add a new URL to the tracking list."""
        with SessionLocal() as session:
            existing = session.query(TrackedProduct).filter_by(url=url).first()
            if existing:
                existing.is_active = True
                session.commit()
                return existing
            product = TrackedProduct(url=url, target_price=target_price)
            session.add(product)
            session.commit()
            session.refresh(product)
            return product

    def get_price_stats(self, url: str, days: int = 30) -> dict:
        """Calculate price statistics over a period."""
        history = self.get_price_history(url, days)
        prices = [r.current_price for r in history if r.current_price]
        if not prices:
            return {}
        return {
            "min_price": min(prices),
            "max_price": max(prices),
            "avg_price": sum(prices) / len(prices),
            "current_price": prices[-1] if prices else None,
            "observations": len(prices),
            "period_days": days,
        }

Step 4: Change Detection

This is where the data becomes intelligence. Price change detection has three tiers: target price hit (the most actionable alert), percentage drop (flexible threshold), and availability change (often more valuable than price for certain use cases).

python

# detector.py
import os
from dataclasses import dataclass
from typing import Optional
from models import ProductPrice
from database import PriceDatabase, PriceRecord

PRICE_DROP_THRESHOLD = float(os.getenv("PRICE_DROP_THRESHOLD", "0.05"))
db = PriceDatabase()


@dataclass
class PriceAlert:
    url: str
    product_name: Optional[str]
    alert_type: str  # "target_hit", "price_drop", "price_rise", "back_in_stock", "out_of_stock"
    current_price: Optional[float]
    previous_price: Optional[float]
    target_price: Optional[float]
    current_availability: Optional[str]
    previous_availability: Optional[str]
    change_pct: Optional[float] = None
    currency: str = "USD"

    def format_message(self) -> str:
        name = self.product_name or self.url
        currency_symbol = {"USD": "$", "GBP": "ยฃ", "EUR": "โ‚ฌ"}.get(self.currency, "$")

        if self.alert_type == "target_hit":
            return (
                f"๐ŸŽฏ TARGET PRICE HIT: {name}\n"
                f"Current: {currency_symbol}{self.current_price:.2f} "
                f"(target: {currency_symbol}{self.target_price:.2f})\n"
                f"URL: {self.url}"
            )
        elif self.alert_type == "price_drop":
            return (
                f"๐Ÿ“‰ PRICE DROP: {name}\n"
                f"{currency_symbol}{self.previous_price:.2f} โ†’ "
                f"{currency_symbol}{self.current_price:.2f} "
                f"({abs(self.change_pct):.1f}% drop)\n"
                f"URL: {self.url}"
            )
        elif self.alert_type == "back_in_stock":
            return (
                f"โœ… BACK IN STOCK: {name}\n"
                f"Now available at {currency_symbol}{self.current_price:.2f}\n"
                f"URL: {self.url}"
            )
        elif self.alert_type == "out_of_stock":
            return (
                f"โŒ OUT OF STOCK: {name}\n"
                f"Last price: {currency_symbol}{self.current_price:.2f}\n"
                f"URL: {self.url}"
            )
        else:
            return f"Price update for {name}: {currency_symbol}{self.current_price:.2f}"


def detect_changes(
    current: ProductPrice,
    previous: PriceRecord,
    target_price: float = None,
    alert_on_any_drop: bool = True,
    alert_on_stock_change: bool = True,
) -> list[PriceAlert]:
    """
    Compare current observation against previous record.
    Returns list of alerts triggered by the change.
    """
    alerts = []

    current_price = current.current_price
    prev_price = previous.current_price if previous else None
    current_avail = current.availability
    prev_avail = previous.availability if previous else None

    # Target price alert โ€” most important, check first
    if target_price and current_price and current_price <= target_price:
        # Only alert if we haven't already hit target (avoid repeat alerts)
        if not prev_price or prev_price > target_price:
            alerts.append(PriceAlert(
                url=current.url,
                product_name=current.name,
                alert_type="target_hit",
                current_price=current_price,
                previous_price=prev_price,
                target_price=target_price,
                current_availability=current_avail,
                previous_availability=prev_avail,
                currency=current.currency,
            ))

    # Price change alerts
    if current_price and prev_price and current_price != prev_price:
        change_pct = (current_price - prev_price) / prev_price

        if change_pct < -PRICE_DROP_THRESHOLD and alert_on_any_drop:
            alerts.append(PriceAlert(
                url=current.url,
                product_name=current.name,
                alert_type="price_drop",
                current_price=current_price,
                previous_price=prev_price,
                target_price=target_price,
                current_availability=current_avail,
                previous_availability=prev_avail,
                change_pct=change_pct * 100,
                currency=current.currency,
            ))

    # Availability change alerts
    if alert_on_stock_change and current_avail != prev_avail and prev_avail:
        if current_avail == "in_stock" and prev_avail == "out_of_stock":
            alerts.append(PriceAlert(
                url=current.url,
                product_name=current.name,
                alert_type="back_in_stock",
                current_price=current_price,
                previous_price=prev_price,
                target_price=target_price,
                current_availability=current_avail,
                previous_availability=prev_avail,
                currency=current.currency,
            ))
        elif current_avail == "out_of_stock" and prev_avail == "in_stock":
            alerts.append(PriceAlert(
                url=current.url,
                product_name=current.name,
                alert_type="out_of_stock",
                current_price=current_price,
                previous_price=prev_price,
                target_price=target_price,
                current_availability=current_avail,
                previous_availability=prev_avail,
                currency=current.currency,
            ))

    return alerts

Step 5: Alert Delivery

Alerts are only useful if they reach someone who can act on them. We support both email (for personal use and formal reporting) and Slack (for team workflows).

python

# alerts.py
import os
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional
from detector import PriceAlert

SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")


def send_email_alert(alert: PriceAlert) -> bool:
    """Send price alert via email."""
    if not all([SMTP_USER, SMTP_PASSWORD, ALERT_EMAIL]):
        print("Email not configured โ€” skipping")
        return False

    subject_map = {
        "target_hit": "๐ŸŽฏ Target Price Hit",
        "price_drop": "๐Ÿ“‰ Price Drop Alert",
        "back_in_stock": "โœ… Back In Stock",
        "out_of_stock": "โŒ Out of Stock",
    }

    subject = subject_map.get(alert.alert_type, "Price Alert")
    if alert.product_name:
        subject += f": {alert.product_name[:50]}"

    body = alert.format_message()
    if alert.url:
        body += f"\n\nView product: {alert.url}"

    msg = MIMEMultipart()
    msg["From"] = SMTP_USER
    msg["To"] = ALERT_EMAIL
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))

    try:
        with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
            server.starttls()
            server.login(SMTP_USER, SMTP_PASSWORD)
            server.send_message(msg)
        print(f"Email sent: {subject}")
        return True
    except Exception as e:
        print(f"Email failed: {e}")
        return False


def send_slack_alert(alert: PriceAlert) -> bool:
    """Send price alert to Slack webhook."""
    if not SLACK_WEBHOOK:
        return False

    emoji_map = {
        "target_hit": "๐ŸŽฏ",
        "price_drop": "๐Ÿ“‰",
        "back_in_stock": "โœ…",
        "out_of_stock": "โŒ",
        "price_rise": "๐Ÿ“ˆ",
    }

    color_map = {
        "target_hit": "#00ff00",
        "price_drop": "#00aa00",
        "back_in_stock": "#0088ff",
        "out_of_stock": "#ff4444",
    }

    payload = {
        "attachments": [{
            "color": color_map.get(alert.alert_type, "#888888"),
            "title": f"{emoji_map.get(alert.alert_type, '')} {alert.product_name or 'Price Alert'}",
            "title_link": alert.url,
            "text": alert.format_message(),
            "footer": "ScrapeBadger Price Tracker",
        }]
    }

    try:
        response = requests.post(SLACK_WEBHOOK, json=payload, timeout=10)
        response.raise_for_status()
        print(f"Slack alert sent: {alert.alert_type}")
        return True
    except Exception as e:
        print(f"Slack alert failed: {e}")
        return False


def dispatch_alerts(alerts: list[PriceAlert], channels: list[str] = None):
    """Send alerts to all configured channels."""
    if not alerts:
        return

    channels = channels or ["email", "slack"]

    for alert in alerts:
        print(f"\n๐Ÿ”” {alert.format_message()}\n")

        if "email" in channels:
            send_email_alert(alert)

        if "slack" in channels:
            send_slack_alert(alert)

Step 6: The Main Check Cycle

Everything above comes together in the core check function โ€” the loop that runs on schedule, processes every tracked product, detects changes, and dispatches alerts.

python

# scheduler.py
import time
import os
from datetime import datetime
from scraper import scrape_product
from database import PriceDatabase
from detector import detect_changes
from alerts import dispatch_alerts

db = PriceDatabase()
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL_MINUTES", "60")) * 60


def check_product(url: str, target_price: float = None) -> dict:
    """
    Run a full check cycle for a single product.
    Returns a summary of what happened.
    """
    print(f"Checking: {url}")

    # Scrape current price
    current = scrape_product(url)
    if not current:
        return {"url": url, "status": "failed", "alerts": []}

    if not current.current_price:
        print(f"No price extracted for {url}")
        return {"url": url, "status": "no_price", "alerts": []}

    # Get previous record for comparison
    previous = db.get_previous_record(url)

    # Detect any meaningful changes
    alerts = detect_changes(
        current=current,
        previous=previous,
        target_price=target_price,
    )

    # Store this observation
    db.save_price(current)
    db.update_product_last_seen(url, current.current_price, current.availability)

    # Dispatch alerts
    if alerts:
        dispatch_alerts(alerts)

    return {
        "url": url,
        "status": "ok",
        "product_name": current.name,
        "current_price": current.current_price,
        "previous_price": previous.current_price if previous else None,
        "availability": current.availability,
        "alerts": [a.alert_type for a in alerts],
    }


def run_check_cycle():
    """Run one complete cycle across all tracked products."""
    products = db.get_active_products()
    print(f"\n{'='*50}")
    print(f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] Checking {len(products)} products")
    print('='*50)

    results = []
    for product in products:
        result = check_product(product.url, product.target_price)
        results.append(result)
        time.sleep(2)  # Polite pacing between requests

    # Summary
    successful = sum(1 for r in results if r["status"] == "ok")
    total_alerts = sum(len(r.get("alerts", [])) for r in results)
    print(f"\nCycle complete: {successful}/{len(products)} successful, {total_alerts} alerts sent")
    return results


def start_scheduler():
    """Start the continuous price tracking scheduler."""
    from apscheduler.schedulers.blocking import BlockingScheduler

    scheduler = BlockingScheduler()
    scheduler.add_job(
        run_check_cycle,
        "interval",
        minutes=int(os.getenv("CHECK_INTERVAL_MINUTES", "60")),
        next_run_time=datetime.now()  # Run immediately on start
    )

    print(f"Price tracker started. Checking every {os.getenv('CHECK_INTERVAL_MINUTES', '60')} minutes.")
    print("Press Ctrl+C to stop.\n")

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        print("\nPrice tracker stopped.")

Step 7: The Entry Point

python

# main.py
import sys
import os
from dotenv import load_dotenv

load_dotenv()

from database import PriceDatabase
from scheduler import run_check_cycle, start_scheduler

db = PriceDatabase()


def add_products(urls: list[str], target_prices: list[float] = None):
    """Add products to the tracking list."""
    for i, url in enumerate(urls):
        target = target_prices[i] if target_prices and i < len(target_prices) else None
        product = db.add_product(url, target)
        print(f"Added: {url}" + (f" (target: ${target})" if target else ""))


def show_history(url: str, days: int = 30):
    """Display price history for a product."""
    stats = db.get_price_stats(url, days)
    if not stats:
        print(f"No data for {url}")
        return

    print(f"\nPrice history for {url} (last {days} days):")
    print(f"  Current: ${stats['current_price']:.2f}")
    print(f"  Low: ${stats['min_price']:.2f}")
    print(f"  High: ${stats['max_price']:.2f}")
    print(f"  Average: ${stats['avg_price']:.2f}")
    print(f"  Data points: {stats['observations']}")


if __name__ == "__main__":
    command = sys.argv[1] if len(sys.argv) > 1 else "run"

    if command == "add":
        # python main.py add https://example.com/product 29.99
        url = sys.argv[2]
        target = float(sys.argv[3]) if len(sys.argv) > 3 else None
        add_products([url], [target])

    elif command == "check":
        # python main.py check  โ€” run one cycle now
        run_check_cycle()

    elif command == "history":
        # python main.py history https://example.com/product
        url = sys.argv[2]
        show_history(url)

    elif command == "run":
        # python main.py  โ€” start continuous scheduler
        start_scheduler()

    else:
        print("Commands: add <url> [target_price] | check | history <url> | run")

Using It

bash

# Add products to track
python main.py add "https://www.amazon.com/dp/B09V3KXJPB" 199.99
python main.py add "https://competitor-store.myshopify.com/products/widget" 
python main.py add "https://www.ebay.com/itm/12345678"

# Run one check right now
python main.py check

# View price history
python main.py history "https://www.amazon.com/dp/B09V3KXJPB"

# Start continuous monitoring (runs every 60 minutes)
python main.py run

Console output during a check cycle:

==================================================
[2026-05-06 14:30:00] Checking 3 products
==================================================
Checking: https://www.amazon.com/dp/B09V3KXJPB

๐Ÿ”” ๐ŸŽฏ TARGET PRICE HIT: Sony WH-1000XM5 Wireless Headphones
Current: $189.99 (target: $199.99)
URL: https://www.amazon.com/dp/B09V3KXJPB

Email sent: ๐ŸŽฏ Target Price Hit: Sony WH-1000XM5...
Slack alert sent: target_hit

Cycle complete: 3/3 successful, 1 alerts sent

Taking It Further

The system above handles the core loop reliably. Three extensions that add the most value for production use:

Multi-currency normalisation. If you're tracking across markets โ€” UK prices in GBP, EU prices in EUR, US prices in USD โ€” add a currency normalisation layer that converts all prices to a base currency using a live exchange rate API before comparison. Open Exchange Rates has a free tier.

Price history visualisation. SQLite stores the history; building a simple dashboard on top of it with Streamlit takes about 50 lines. Plot price over time, mark the points when alerts fired, and you have a proper price intelligence dashboard.

Bulk URL management. For competitive price monitoring at scale โ€” tracking hundreds of SKUs across multiple competitor sites โ€” the ScrapeBadger blog post on e-commerce scraping covers the Shopify JSON API approach that cuts request volume dramatically: instead of scraping individual product pages, a single /products.json?limit=250 call returns all products and variants for a Shopify store in one request.

For teams building serious price intelligence products โ€” monitoring tens of thousands of SKUs, running multi-region checks, or feeding price data into repricers or BI tools โ€” the ScrapeBadger documentation covers batch processing, async request patterns, and the CLI tooling for scheduled pipeline management without custom scheduler code.

If you want to connect price data to an AI agent that makes repricing recommendations, the ScrapeBadger MCP server exposes the full scraping API to any MCP-compatible agent โ€” your AI gets live price data from any URL as a tool call. Setup is covered in the MCP documentation.

The complete code for this tutorial is runnable as described. Start with a handful of products you actually care about โ€” a competitor's key SKUs, products you're considering purchasing, or items you're monitoring for your own store. The value compounds as the history builds.

Thomas Shultz

Written by

Thomas Shultz

Thomas Shultz is the Head of Data at ScrapeBadger, working on public web data, scraping infrastructure, and data reliability. He writes about real-world scraping, data pipelines, and turning unstructured web data into usable signals.

Ready to get started?

Join thousands of developers using ScrapeBadger for their data needs.

How to Build a Price Tracking Bot for E-Commerce (Python 2026) | ScrapeBadger