Amazon adjusts prices millions of times per day. Competitors reprice dynamically. Buy Box ownership changes by the hour based on pricing, fulfillment type, and seller metrics. A product that was ยฃ29.99 on amazon.co.uk this morning might be ยฃ24.99 this afternoon โ and your repricing strategy has no idea.
A price tracker that runs on a schedule, stores every observation, detects meaningful changes, and fires an alert when something worth acting on happens is the difference between reactive and proactive competitive intelligence. This guide builds one from scratch using ScrapeBadger's Amazon Scraper API โ covering single-ASIN monitoring, Buy Box tracking, full offer landscape collection, and multi-marketplace comparison.
The ScrapeBadger e-commerce scraping guide covers the general multi-platform architecture. This guide is Amazon-specific and takes full advantage of ScrapeBadger's dedicated Amazon endpoints โ structured JSON that includes Buy Box ownership, all seller offers, BSR rankings, and variant pricing without any HTML parsing on your side.
Architecture Overview
[ASIN List + Marketplaces]
โ
ScrapeBadger Amazon API
/v1/amazon/products/{asin} โ price, BSR, variants
/v1/amazon/products/{asin}/offers โ Buy Box + all sellers
โ
[Price Record] โ SQLite / PostgreSQL
โ
[Change Detector] โ price drop, Buy Box change, stock alert
โ
[Alert Dispatcher] โ Slack / EmailTwo modes run separately:
Collection mode โ queries ScrapeBadger for current prices and stores observations. Runs on a schedule.
Alert mode โ compares latest observations against previous ones, detects meaningful changes, fires notifications.
Setup
bash
pip install httpx sqlalchemy pydantic python-dotenv aiofilesenv
# .env
SCRAPEBADGER_API_KEY=your_key_here
SMTP_USER=alerts@yourcompany.com
SMTP_PASSWORD=your_app_password
ALERT_EMAIL=team@yourcompany.com
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK
PRICE_DROP_THRESHOLD=0.03 # Alert on 3%+ price drops
BSR_CHANGE_THRESHOLD=1000 # Alert if BSR moves by 1000+ positionsStep 1: Data Models
Strong typing catches the silent failures that corrupt price history. An Amazon price field returning "ยฃ29.99" when your database expects a float corrupts every downstream calculation silently.
python
# models.py
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime
import re
class ProductSnapshot(BaseModel):
"""Complete price snapshot for a single ASIN at a point in time."""
asin: str
marketplace: str # "com", "co.uk", "de", etc.
title: Optional[str]
# Buy Box price โ what most customers pay
buybox_price: Optional[float]
buybox_seller: Optional[str]
buybox_is_amazon: Optional[bool]
buybox_fulfillment: Optional[str] # "FBA", "FBM", "Amazon"
# List price and discount
list_price: Optional[float]
currency: str = "USD"
discount_pct: Optional[float]
# Availability
availability: Optional[str]
stock_indicator: Optional[str] # "In Stock", "Only 3 left", etc.
# Rankings
bsr_rank: Optional[int]
bsr_category: Optional[str]
# Offer landscape
total_offer_count: Optional[int]
lowest_new_price: Optional[float]
lowest_used_price: Optional[float]
# Metadata
scraped_at: datetime = None
@validator("buybox_price", "list_price",
"lowest_new_price", "lowest_used_price", pre=True)
def parse_price(cls, v):
if v is None:
return None
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, str):
cleaned = re.sub(r"[^\d.]", "", v.replace(",", ""))
try:
return float(cleaned)
except ValueError:
return None
return None
def discount_percentage(self) -> Optional[float]:
if self.list_price and self.buybox_price and self.list_price > 0:
return round(
(self.list_price - self.buybox_price) / self.list_price * 100, 1
)
return None
class OfferRecord(BaseModel):
"""Individual seller offer for a product."""
asin: str
marketplace: str
seller_id: Optional[str]
seller_name: Optional[str]
price: Optional[float]
condition: Optional[str]
fulfillment: Optional[str] # "FBA", "FBM"
is_buy_box_winner: bool = False
is_amazon: bool = False
rating: Optional[float]
rating_count: Optional[int]
ships_from: Optional[str]
scraped_at: datetime = None
@validator("price", pre=True)
def parse_price(cls, v):
if v is None:
return None
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, str):
cleaned = re.sub(r"[^\d.]", "", v.replace(",", ""))
try:
return float(cleaned)
except ValueError:
return None
return NoneStep 2: Database Layer
Every observation is stored. Price history is the asset โ you're building a dataset over time, not just checking current prices.
python
# database.py
from sqlalchemy import (
create_engine, Column, Integer, Float, String,
DateTime, Boolean, Index
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
engine = create_engine("sqlite:///amazon_prices.db")
Session = sessionmaker(bind=engine)
class PriceHistory(Base):
__tablename__ = "price_history"
id = Column(Integer, primary_key=True)
asin = Column(String, nullable=False, index=True)
marketplace = Column(String, nullable=False)
title = Column(String)
buybox_price = Column(Float)
buybox_seller = Column(String)
buybox_is_amazon = Column(Boolean)
list_price = Column(Float)
currency = Column(String, default="USD")
discount_pct = Column(Float)
availability = Column(String)
bsr_rank = Column(Integer)
bsr_category = Column(String)
total_offer_count = Column(Integer)
lowest_new_price = Column(Float)
scraped_at = Column(DateTime, default=datetime.utcnow, index=True)
class OfferHistory(Base):
__tablename__ = "offer_history"
id = Column(Integer, primary_key=True)
asin = Column(String, nullable=False, index=True)
marketplace = Column(String, nullable=False)
seller_id = Column(String)
seller_name = Column(String)
price = Column(Float)
condition = Column(String)
fulfillment = Column(String)
is_buy_box_winner = Column(Boolean, default=False)
is_amazon = Column(Boolean, default=False)
rating = Column(Float)
scraped_at = Column(DateTime, default=datetime.utcnow, index=True)
class TrackedASIN(Base):
__tablename__ = "tracked_asins"
id = Column(Integer, primary_key=True)
asin = Column(String, nullable=False)
marketplace = Column(String, nullable=False, default="com")
label = Column(String) # Human-readable name for alerts
target_price = Column(Float) # Alert when below this
is_active = Column(Boolean, default=True)
added_at = Column(DateTime, default=datetime.utcnow)
last_checked = Column(DateTime)
last_price = Column(Float)
last_bsr = Column(Integer)
# Create all tables
Base.metadata.create_all(engine)
class PriceDatabase:
def save_snapshot(self, snapshot: "ProductSnapshot"):
with Session() as session:
record = PriceHistory(
asin=snapshot.asin,
marketplace=snapshot.marketplace,
title=snapshot.title,
buybox_price=snapshot.buybox_price,
buybox_seller=snapshot.buybox_seller,
buybox_is_amazon=snapshot.buybox_is_amazon,
list_price=snapshot.list_price,
currency=snapshot.currency,
availability=snapshot.availability,
bsr_rank=snapshot.bsr_rank,
bsr_category=snapshot.bsr_category,
total_offer_count=snapshot.total_offer_count,
lowest_new_price=snapshot.lowest_new_price,
scraped_at=snapshot.scraped_at or datetime.utcnow(),
)
session.add(record)
session.commit()
def save_offers(self, offers: list["OfferRecord"]):
with Session() as session:
for offer in offers:
record = OfferHistory(
asin=offer.asin,
marketplace=offer.marketplace,
seller_id=offer.seller_id,
seller_name=offer.seller_name,
price=offer.price,
condition=offer.condition,
fulfillment=offer.fulfillment,
is_buy_box_winner=offer.is_buy_box_winner,
is_amazon=offer.is_amazon,
rating=offer.rating,
scraped_at=datetime.utcnow(),
)
session.add(record)
session.commit()
def get_previous_snapshot(
self, asin: str, marketplace: str
) -> Optional[PriceHistory]:
with Session() as session:
return (
session.query(PriceHistory)
.filter_by(asin=asin, marketplace=marketplace)
.order_by(PriceHistory.scraped_at.desc())
.first()
)
def get_price_history(
self, asin: str, marketplace: str, days: int = 30
) -> list[PriceHistory]:
from datetime import timedelta
cutoff = datetime.utcnow() - timedelta(days=days)
with Session() as session:
return (
session.query(PriceHistory)
.filter(
PriceHistory.asin == asin,
PriceHistory.marketplace == marketplace,
PriceHistory.scraped_at >= cutoff,
)
.order_by(PriceHistory.scraped_at.asc())
.all()
)
def get_active_asins(self) -> list[TrackedASIN]:
with Session() as session:
return (
session.query(TrackedASIN)
.filter_by(is_active=True)
.all()
)
def add_asin(
self,
asin: str,
marketplace: str = "com",
label: str = None,
target_price: float = None,
) -> TrackedASIN:
with Session() as session:
existing = session.query(TrackedASIN).filter_by(
asin=asin, marketplace=marketplace
).first()
if existing:
existing.is_active = True
session.commit()
return existing
tracked = TrackedASIN(
asin=asin,
marketplace=marketplace,
label=label or asin,
target_price=target_price,
)
session.add(tracked)
session.commit()
return tracked
def get_price_stats(self, asin: str, marketplace: str, days: int = 30) -> dict:
history = self.get_price_history(asin, marketplace, days)
prices = [r.buybox_price for r in history if r.buybox_price]
if not prices:
return {}
return {
"min": min(prices),
"max": max(prices),
"avg": round(sum(prices) / len(prices), 2),
"current": prices[-1],
"observations": len(prices),
}Step 3: The ScrapeBadger Collection Layer
Two endpoints do the main work: /v1/amazon/products/{asin} for full product detail and BSR, and /v1/amazon/products/{asin}/offers for the complete competitive offer landscape including Buy Box ownership.
python
# collector.py
import httpx
import os
import asyncio
import time
from typing import Optional
from datetime import datetime
from models import ProductSnapshot, OfferRecord
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
BASE_URL = "https://api.scrapebadger.com/v1"
async def fetch_product(
client: httpx.AsyncClient,
asin: str,
marketplace: str = "com",
) -> Optional[ProductSnapshot]:
"""
Fetch full product detail for an ASIN.
Returns price, BSR, variants, availability, and Buy Box info.
10 credits per successful request. 0 credits if request fails.
"""
try:
response = await client.get(
f"{BASE_URL}/amazon/products/{asin}",
params={"domain": marketplace},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
# Extract Buy Box pricing
buybox = data.get("buybox", {})
pricing = data.get("pricing", {})
bsr_data = data.get("best_sellers_rank", [{}])
bsr = bsr_data[0] if bsr_data else {}
return ProductSnapshot(
asin=asin,
marketplace=marketplace,
title=data.get("title"),
buybox_price=buybox.get("price") or pricing.get("current"),
buybox_seller=buybox.get("seller_name"),
buybox_is_amazon=buybox.get("is_amazon", False),
buybox_fulfillment=buybox.get("fulfillment"),
list_price=pricing.get("list_price"),
currency=pricing.get("currency", "USD"),
availability=data.get("availability"),
stock_indicator=data.get("stock_message"),
bsr_rank=bsr.get("rank"),
bsr_category=bsr.get("category"),
total_offer_count=data.get("offer_count"),
scraped_at=datetime.utcnow(),
)
except httpx.HTTPStatusError as e:
print(f"HTTP error fetching {asin} on {marketplace}: {e.response.status_code}")
return None
except Exception as e:
print(f"Error fetching {asin}: {e}")
return None
async def fetch_offers(
client: httpx.AsyncClient,
asin: str,
marketplace: str = "com",
condition: str = "new", # "new", "used", "all"
) -> list[OfferRecord]:
"""
Fetch all seller offers for an ASIN.
Returns Buy Box winner + every competing seller with price and fulfillment.
8 credits per successful request. 0 credits if request fails.
"""
try:
response = await client.get(
f"{BASE_URL}/amazon/products/{asin}/offers",
params={"domain": marketplace, "condition": condition},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
offers = []
for offer in data.get("offers", []):
offers.append(OfferRecord(
asin=asin,
marketplace=marketplace,
seller_id=offer.get("seller_id"),
seller_name=offer.get("seller_name"),
price=offer.get("price"),
condition=offer.get("condition"),
fulfillment=offer.get("fulfillment"),
is_buy_box_winner=offer.get("is_buy_box", False),
is_amazon=offer.get("is_amazon", False),
rating=offer.get("seller_rating"),
rating_count=offer.get("seller_rating_count"),
ships_from=offer.get("ships_from"),
scraped_at=datetime.utcnow(),
))
return offers
except Exception as e:
print(f"Error fetching offers for {asin}: {e}")
return []Step 4: Change Detection
The intelligence layer. Detecting that a price changed is trivial. Detecting which changes are worth acting on โ and which are noise โ is where most trackers fail.
python
# detector.py
import os
from dataclasses import dataclass
from typing import Optional
from models import ProductSnapshot
from database import PriceDatabase, PriceHistory
PRICE_DROP_THRESHOLD = float(os.getenv("PRICE_DROP_THRESHOLD", "0.03"))
BSR_CHANGE_THRESHOLD = int(os.getenv("BSR_CHANGE_THRESHOLD", "1000"))
db = PriceDatabase()
@dataclass
class PriceAlert:
asin: str
marketplace: str
label: str
alert_type: str
current_value: Optional[float]
previous_value: Optional[float]
change_pct: Optional[float] = None
extra_context: Optional[str] = None
def format_message(self) -> str:
currency_map = {
"com": "$", "co.uk": "ยฃ", "de": "โฌ",
"fr": "โฌ", "it": "โฌ", "es": "โฌ",
"ca": "CA$", "co.jp": "ยฅ", "in": "โน",
}
symbol = currency_map.get(self.marketplace, "$")
if self.alert_type == "target_hit":
return (
f"๐ฏ TARGET PRICE HIT: {self.label}\n"
f"Price: {symbol}{self.current_value:.2f} "
f"(your target: {symbol}{self.previous_value:.2f})\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
elif self.alert_type == "price_drop":
return (
f"๐ PRICE DROP: {self.label}\n"
f"{symbol}{self.previous_value:.2f} โ "
f"{symbol}{self.current_value:.2f} "
f"({abs(self.change_pct):.1f}% drop)\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
elif self.alert_type == "price_rise":
return (
f"๐ PRICE RISE: {self.label}\n"
f"{symbol}{self.previous_value:.2f} โ "
f"{symbol}{self.current_value:.2f} "
f"({self.change_pct:.1f}% rise)\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
elif self.alert_type == "out_of_stock":
return (
f"โ OUT OF STOCK: {self.label}\n"
f"Last price: {symbol}{self.current_value:.2f}\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
elif self.alert_type == "back_in_stock":
return (
f"โ
BACK IN STOCK: {self.label}\n"
f"Current price: {symbol}{self.current_value:.2f}\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
elif self.alert_type == "buybox_change":
return (
f"๐ BUY BOX CHANGED: {self.label}\n"
f"{self.previous_value} โ {self.current_value}\n"
f"{self.extra_context or ''}\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
elif self.alert_type == "bsr_change":
direction = "improved" if self.change_pct < 0 else "declined"
return (
f"๐ BSR {direction.upper()}: {self.label}\n"
f"Rank: #{int(self.previous_value):,} โ "
f"#{int(self.current_value):,} "
f"({abs(int(self.change_pct)):,} positions)\n"
f"amazon.{self.marketplace}/dp/{self.asin}"
)
return f"Price update for {self.label}"
def detect_changes(
asin: str,
marketplace: str,
label: str,
current: ProductSnapshot,
previous: Optional[PriceHistory],
target_price: float = None,
) -> list[PriceAlert]:
"""
Compare current snapshot against previous record.
Returns list of actionable alerts.
"""
alerts = []
curr_price = current.buybox_price
prev_price = previous.buybox_price if previous else None
# --- Target price alert ---
if target_price and curr_price and curr_price <= target_price:
if not prev_price or prev_price > target_price:
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="target_hit",
current_value=curr_price,
previous_value=target_price,
))
# --- Price change alerts ---
if curr_price and prev_price and curr_price != prev_price:
change_pct = (curr_price - prev_price) / prev_price * 100
if change_pct <= -PRICE_DROP_THRESHOLD * 100:
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="price_drop",
current_value=curr_price,
previous_value=prev_price,
change_pct=change_pct,
))
elif change_pct >= PRICE_DROP_THRESHOLD * 100 * 2:
# Only alert on rises that are larger than drops threshold
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="price_rise",
current_value=curr_price,
previous_value=prev_price,
change_pct=change_pct,
))
# --- Availability changes ---
curr_avail = current.availability or ""
prev_avail = previous.availability if previous else ""
if "out of stock" in curr_avail.lower() and (
not prev_avail or "out of stock" not in prev_avail.lower()
):
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="out_of_stock",
current_value=curr_price,
previous_value=prev_price,
))
elif "in stock" in curr_avail.lower() and prev_avail and (
"out of stock" in prev_avail.lower()
):
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="back_in_stock",
current_value=curr_price,
previous_value=prev_price,
))
# --- Buy Box change ---
curr_seller = current.buybox_seller or ""
prev_seller = previous.buybox_seller if previous else ""
if curr_seller and prev_seller and curr_seller != prev_seller:
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="buybox_change",
current_value=curr_seller,
previous_value=prev_seller,
extra_context=(
f"New Buy Box price: {current.buybox_price} | "
f"Fulfillment: {current.buybox_fulfillment}"
),
))
# --- BSR change ---
curr_bsr = current.bsr_rank
prev_bsr = previous.bsr_rank if previous else None
if curr_bsr and prev_bsr:
bsr_delta = curr_bsr - prev_bsr
if abs(bsr_delta) >= BSR_CHANGE_THRESHOLD:
alerts.append(PriceAlert(
asin=asin, marketplace=marketplace, label=label,
alert_type="bsr_change",
current_value=float(curr_bsr),
previous_value=float(prev_bsr),
change_pct=float(bsr_delta),
))
return alertsStep 5: Alert Delivery
python
# alerts.py
import os, smtplib, requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from detector import PriceAlert
SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
def send_slack(alert: PriceAlert) -> bool:
if not SLACK_WEBHOOK:
return False
color_map = {
"price_drop": "#00aa00", "target_hit": "#00ff00",
"back_in_stock": "#0088ff", "out_of_stock": "#ff4444",
"price_rise": "#ff8800", "buybox_change": "#8800ff",
"bsr_change": "#0088ff",
}
try:
requests.post(SLACK_WEBHOOK, json={
"attachments": [{
"color": color_map.get(alert.alert_type, "#888888"),
"title": f"Amazon Price Alert: {alert.label}",
"text": alert.format_message(),
"footer": "ScrapeBadger Amazon Tracker",
}]
}, timeout=10)
return True
except Exception as e:
print(f"Slack failed: {e}")
return False
def send_email(alert: PriceAlert) -> bool:
if not all([SMTP_USER, SMTP_PASSWORD, ALERT_EMAIL]):
return False
subject_map = {
"price_drop": "๐ Amazon Price Drop",
"target_hit": "๐ฏ Amazon Target Price Hit",
"back_in_stock": "โ
Amazon Back In Stock",
"out_of_stock": "โ Amazon Out of Stock",
"buybox_change": "๐ Amazon Buy Box Changed",
"bsr_change": "๐ Amazon BSR Change",
}
subject = subject_map.get(alert.alert_type, "Amazon Alert")
if alert.label:
subject += f": {alert.label[:40]}"
msg = MIMEMultipart()
msg["From"] = SMTP_USER
msg["To"] = ALERT_EMAIL
msg["Subject"] = subject
msg.attach(MIMEText(alert.format_message(), "plain"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email failed: {e}")
return False
def dispatch(alerts: list[PriceAlert]):
for alert in alerts:
print(f"\n๐ {alert.format_message()}\n")
send_slack(alert)
send_email(alert)Step 6: The Main Check Cycle
python
# tracker.py
import asyncio
import httpx
import os
import time
import random
from datetime import datetime
from database import PriceDatabase
from collector import fetch_product, fetch_offers
from detector import detect_changes
from alerts import dispatch
db = PriceDatabase()
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
BASE_URL = "https://api.scrapebadger.com/v1"
async def check_asin(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
asin: str,
marketplace: str,
label: str,
target_price: float = None,
collect_offers: bool = True,
) -> dict:
"""Run a full check cycle for one ASIN on one marketplace."""
async with semaphore:
await asyncio.sleep(random.uniform(0.5, 1.5)) # Polite pacing
# Fetch product snapshot
snapshot = await fetch_product(client, asin, marketplace)
if not snapshot:
return {"asin": asin, "marketplace": marketplace, "status": "failed"}
# Get previous record for comparison
previous = db.get_previous_snapshot(asin, marketplace)
# Detect changes and fire alerts
alerts = detect_changes(
asin, marketplace, label, snapshot, previous, target_price
)
if alerts:
dispatch(alerts)
# Store this observation
db.save_snapshot(snapshot)
# Optionally collect full offer landscape
if collect_offers:
offers = await fetch_offers(client, asin, marketplace)
if offers:
db.save_offers(offers)
return {
"asin": asin,
"marketplace": marketplace,
"status": "ok",
"price": snapshot.buybox_price,
"bsr": snapshot.bsr_rank,
"buybox_seller": snapshot.buybox_seller,
"alerts": [a.alert_type for a in alerts],
}
async def run_check_cycle(max_concurrent: int = 5):
"""Run one full check cycle across all tracked ASINs."""
tracked = db.get_active_asins()
if not tracked:
print("No tracked ASINs โ add some with db.add_asin()")
return
print(f"\n{'='*50}")
print(f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] "
f"Checking {len(tracked)} ASINs")
print("="*50)
semaphore = asyncio.Semaphore(max_concurrent)
headers = {"X-API-Key": API_KEY}
async with httpx.AsyncClient(headers=headers) as client:
tasks = [
check_asin(
client, semaphore,
t.asin, t.marketplace, t.label, t.target_price,
)
for t in tracked
]
results = await asyncio.gather(*tasks)
successful = sum(1 for r in results if r.get("status") == "ok")
total_alerts = sum(len(r.get("alerts", [])) for r in results)
print(f"\nCycle complete: {successful}/{len(tracked)} OK | {total_alerts} alerts")
for r in results:
if r.get("status") == "ok":
price_str = f"${r['price']:.2f}" if r.get("price") else "N/A"
bsr_str = f"BSR #{r['bsr']:,}" if r.get("bsr") else ""
bb_str = f"BB: {r.get('buybox_seller', 'Unknown')}"
print(f" {r['asin']} ({r['marketplace']}): "
f"{price_str} | {bsr_str} | {bb_str}")Step 7: Multi-Marketplace Comparison
One of ScrapeBadger's strongest features for Amazon tracking: country-matched residential proxies on every marketplace. The same ASIN on amazon.de returns German prices seen by German customers, not the prices Amazon shows international visitors.
python
async def compare_across_marketplaces(
asin: str,
marketplaces: list[str] = None,
) -> dict:
"""
Compare price for one ASIN across multiple Amazon marketplaces.
Uses country-matched proxies for each โ accurate local pricing.
"""
if marketplaces is None:
marketplaces = ["com", "co.uk", "de", "fr", "it", "es", "ca", "co.jp"]
currency_map = {
"com": "USD", "co.uk": "GBP", "de": "EUR", "fr": "EUR",
"it": "EUR", "es": "EUR", "ca": "CAD", "co.jp": "JPY",
"in": "INR", "com.au": "AUD",
}
semaphore = asyncio.Semaphore(5)
headers = {"X-API-Key": API_KEY}
results = {}
async with httpx.AsyncClient(headers=headers) as client:
tasks = [
fetch_product(client, asin, mkt)
for mkt in marketplaces
]
snapshots = await asyncio.gather(*tasks)
for marketplace, snapshot in zip(marketplaces, snapshots):
if snapshot and snapshot.buybox_price:
results[marketplace] = {
"price": snapshot.buybox_price,
"currency": currency_map.get(marketplace, "USD"),
"availability": snapshot.availability,
"bsr_rank": snapshot.bsr_rank,
"buybox_seller": snapshot.buybox_seller,
"total_offers": snapshot.total_offer_count,
}
# Find cheapest marketplace (in local currency โ add FX conversion for true comparison)
print(f"\nPrice comparison for ASIN {asin}:")
print(f"{'Marketplace':<15} {'Price':<15} {'Currency':<10} {'BSR':<12} {'Availability'}")
print("-" * 75)
for mkt, data in sorted(results.items(), key=lambda x: x[1]["price"]):
bsr = f"#{data['bsr_rank']:,}" if data.get("bsr_rank") else "N/A"
print(
f"amazon.{mkt:<10} "
f"{data['price']:<15.2f} "
f"{data['currency']:<10} "
f"{bsr:<12} "
f"{data['availability'] or 'Unknown'}"
)
return resultsEntry Point: Running the Tracker
python
# main.py
import asyncio
import sys
from database import PriceDatabase
from tracker import run_check_cycle
from collector import compare_across_marketplaces
db = PriceDatabase()
def add_products():
"""Add products to track."""
# Your own products โ watch for price erosion and stock levels
db.add_asin("B09V3KXJPB", "com", "Sony WH-1000XM5 - US", target_price=279.99)
db.add_asin("B09V3KXJPB", "co.uk", "Sony WH-1000XM5 - UK", target_price=250.00)
# Competitor products โ watch for price moves you should respond to
db.add_asin("B07XJ8C8F7", "com", "Competitor Headphones A")
db.add_asin("B09B8YWXDF", "com", "Competitor Headphones B")
print("Products added. Run with 'check' to start collecting.")
if __name__ == "__main__":
command = sys.argv[1] if len(sys.argv) > 1 else "check"
if command == "add":
add_products()
elif command == "check":
asyncio.run(run_check_cycle(max_concurrent=5))
elif command == "compare":
asin = sys.argv[2] if len(sys.argv) > 2 else "B09V3KXJPB"
asyncio.run(compare_across_marketplaces(asin))
elif command == "schedule":
# Run every 60 minutes
import time
interval_minutes = int(sys.argv[2]) if len(sys.argv) > 2 else 60
print(f"Scheduler started โ checking every {interval_minutes} minutes")
while True:
asyncio.run(run_check_cycle(max_concurrent=5))
print(f"Next check in {interval_minutes} minutes")
time.sleep(interval_minutes * 60)
elif command == "stats":
asin = sys.argv[2] if len(sys.argv) > 2 else None
if asin:
stats = db.get_price_stats(asin, "com")
print(f"\nPrice stats for {asin} (last 30 days):")
for k, v in stats.items():
print(f" {k}: {v}")Running it:
bash
# Add your products
python main.py add
# Run one check immediately
python main.py check
# Compare one ASIN across all marketplaces
python main.py compare B09V3KXJPB
# Start continuous monitoring (every 60 minutes)
python main.py schedule 60The Zero-Credit-on-Failure Guarantee
One detail worth calling out explicitly in a price tracker context: ScrapeBadger charges zero credits for failed requests. As covered in the data quality article, most scraping APIs charge per request regardless of outcome โ a 15% failure rate means 15% of your monthly budget buys nothing.
On Amazon specifically, where AWS WAF blocks can return plausible-looking responses that aren't product data, this matters more than on most targets. ScrapeBadger validates content before billing โ you pay only when you receive actual product data. A scraping API that charges for challenge pages and empty responses has a meaningfully different effective cost than the headline per-request price suggests.
What to Build Next
The tracker above covers the core loop. Three extensions that add the most analytical value:
Price history visualisation โ the SQLite database accumulates a time series. A Streamlit dashboard plotting price over time, marking Buy Box changes, and overlaying BSR movement turns raw observations into the kind of chart that makes pricing decisions obvious. Build one in about 50 lines on top of the data already being collected.
Competitor pattern analysis โ use the offer history to analyse competitor reprice behaviour. What time of day do they usually adjust prices? How quickly do they respond to your price changes? How often do they win the Buy Box and what price point wins it? The data is already being collected โ it needs analysis logic.
Alert threshold tuning โ start conservative (5% threshold) and adjust based on the alert volume you actually want. For high-velocity categories, a 3% threshold generates significant noise; for stable categories, a 10% threshold misses meaningful moves. The PRICE_DROP_THRESHOLD and BSR_CHANGE_THRESHOLD environment variables let you tune without code changes.
Full Amazon API documentation at docs.scrapebadger.com/amazon/overview. Free trial at scrapebadger.com/amazon-scraper โ 1,000 credits, no credit card.

Written by
Thomas Shultz
Thomas Shultz is the Head of Data at ScrapeBadger, working on public web data, scraping infrastructure, and data reliability. He writes about real-world scraping, data pipelines, and turning unstructured web data into usable signals.
Ready to get started?
Join thousands of developers using ScrapeBadger for their data needs.
