How to Scrape Amazon Seller Data: Feedback, Storefronts, and Competitor Analysis

Most Amazon competitive intelligence focuses on products. Who has the Buy Box? What's the price? What's the BSR? That analysis is valuable, but it only tells you what's happening right now on specific ASINs. It misses the actors behind those products — the sellers whose feedback scores, inventory breadth, fulfillment methods, and customer satisfaction trends determine who wins on Amazon over time.
Seller data tells a different story. A competitor with 500 seller feedback entries and a 97% positive rating is a stable, professional operation. A competitor with 500 entries and an 82% rating is vulnerable — customers are complaining publicly, and those complaints contain specific, actionable intelligence about what's going wrong. A competitor's storefront shows you their full catalogue, not just the ASINs you're already tracking. A sudden storefront expansion reveals market moves before any pricing signal does.
ScrapeBadger's Amazon Scraper includes three dedicated seller endpoints: profile and feedback summary, paginated buyer feedback entries, and storefront product listings. This guide builds a complete seller intelligence pipeline using all three — covering competitor discovery, feedback analysis, storefront monitoring, and change detection with alerts.
The Three Seller Endpoints
From docs.scrapebadger.com/amazon/overview:
GET /v1/amazon/sellers/{seller_id} — Profile + feedback summary (3 credits)
GET /v1/amazon/sellers/{seller_id}/feedback — Paginated feedback entries (3 credits)
GET /v1/amazon/sellers/{seller_id}/products — Storefront listings (5 credits)Credit costs are low for seller endpoints specifically — 3 credits for profile and feedback versus 10 for full product detail. Monitoring 20 competitor sellers daily costs 60 credits for profile checks. The failed request policy applies equally: zero credits charged when a request fails.
Step 1: Finding Seller IDs
Every Amazon seller has a unique seller ID — a 13-character alphanumeric string like A1X2Y3Z4W5V6U. The seller ID is stable; a seller's display name can change, but their ID doesn't. Always use seller IDs, never display names, as the key in your database.
The fastest way to discover competitor seller IDs for ASINs you're already tracking is through the offers endpoint — which returns every seller currently listing on a product:
python
# seller_discovery.py
import httpx
import asyncio
import os
from typing import Optional
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
BASE_URL = "https://api.scrapebadger.com/v1"
HEADERS = {"X-API-Key": API_KEY}
async def discover_sellers_from_asin(
client: httpx.AsyncClient,
asin: str,
marketplace: str = "com",
min_rating: float = 0.0,
exclude_amazon: bool = False,
) -> list[dict]:
"""
Extract all sellers currently offering an ASIN.
Returns seller IDs, names, ratings, and current prices.
Use this to build your competitor seller watchlist.
"""
try:
response = await client.get(
f"{BASE_URL}/amazon/products/{asin}/offers",
params={"domain": marketplace, "condition": "new"},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
sellers = []
for offer in data.get("offers", []):
seller_id = offer.get("seller_id")
is_amazon = offer.get("is_amazon", False)
if not seller_id:
continue
if exclude_amazon and is_amazon:
continue
rating = offer.get("seller_rating", 0) or 0
if rating < min_rating:
continue
sellers.append({
"seller_id": seller_id,
"seller_name": offer.get("seller_name"),
"is_amazon": is_amazon,
"is_buy_box": offer.get("is_buy_box", False),
"current_price": offer.get("price"),
"fulfillment": offer.get("fulfillment"),
"rating": rating,
"rating_count": offer.get("seller_rating_count"),
"found_on_asin": asin,
"marketplace": marketplace,
})
return sellers
except Exception as e:
print(f"Error discovering sellers on {asin}: {e}")
return []
async def discover_sellers_from_asins(
asins: list[str],
marketplace: str = "com",
exclude_amazon: bool = True,
) -> dict[str, dict]:
"""
Build a deduplicated seller dictionary from multiple ASINs.
Tracks which ASINs each seller appears on.
"""
all_sellers: dict[str, dict] = {}
semaphore = asyncio.Semaphore(5)
async with httpx.AsyncClient(headers=HEADERS) as client:
async def bounded_discover(asin: str):
async with semaphore:
return await discover_sellers_from_asin(
client, asin, marketplace, exclude_amazon=exclude_amazon
)
results = await asyncio.gather(
*[bounded_discover(asin) for asin in asins]
)
for sellers in results:
for seller in sellers:
sid = seller["seller_id"]
if sid not in all_sellers:
all_sellers[sid] = {**seller, "competing_asins": []}
all_sellers[sid]["competing_asins"].append(
seller["found_on_asin"]
)
# Track if they're winning Buy Box on any ASIN
if seller["is_buy_box"]:
all_sellers[sid]["has_buy_box"] = True
# Sort by Buy Box ownership and competition breadth
sorted_sellers = dict(sorted(
all_sellers.items(),
key=lambda x: (
x[1].get("has_buy_box", False),
len(x[1].get("competing_asins", []))
),
reverse=True
))
print(f"\nDiscovered {len(sorted_sellers)} unique sellers "
f"across {len(asins)} ASINs:")
for sid, data in list(sorted_sellers.items())[:10]:
bb = "✓ BB" if data.get("has_buy_box") else " "
print(f" {bb} {data['seller_name']:<30} "
f"({data.get('rating', 0):.0f}% | "
f"{len(data['competing_asins'])} ASINs)")
return sorted_sellersStep 2: Database Layer
python
# seller_database.py
from sqlalchemy import (
create_engine, Column, Integer, Float, String,
DateTime, Boolean, Text, Index
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from typing import Optional
Base = declarative_base()
engine = create_engine("sqlite:///seller_intelligence.db")
Session = sessionmaker(bind=engine)
class SellerProfile(Base):
"""Snapshot of seller profile and aggregate feedback metrics."""
__tablename__ = "seller_profiles"
id = Column(Integer, primary_key=True)
seller_id = Column(String, nullable=False, index=True)
marketplace = Column(String, nullable=False)
seller_name = Column(String)
# Aggregate feedback metrics
feedback_count = Column(Integer)
positive_pct = Column(Float) # 0–100
neutral_pct = Column(Float)
negative_pct = Column(Float)
# Time-window breakdowns (30-day, 90-day, lifetime)
positive_30d = Column(Integer)
neutral_30d = Column(Integer)
negative_30d = Column(Integer)
positive_90d = Column(Integer)
neutral_90d = Column(Integer)
negative_90d = Column(Integer)
# Account metadata
account_created = Column(String)
is_verified = Column(Boolean)
business_type = Column(String)
scraped_at = Column(DateTime, default=datetime.utcnow, index=True)
class SellerFeedbackEntry(Base):
"""Individual buyer feedback entries for a seller."""
__tablename__ = "seller_feedback_entries"
id = Column(Integer, primary_key=True)
seller_id = Column(String, nullable=False, index=True)
marketplace = Column(String, nullable=False)
feedback_id = Column(String, index=True)
rating = Column(Integer) # 1–5
is_positive = Column(Boolean)
is_negative = Column(Boolean)
comment = Column(Text)
asin = Column(String) # Product this feedback was for
date = Column(String)
scraped_at = Column(DateTime, default=datetime.utcnow)
class StorefrontProduct(Base):
"""Product appearing in a seller's storefront."""
__tablename__ = "storefront_products"
id = Column(Integer, primary_key=True)
seller_id = Column(String, nullable=False, index=True)
marketplace = Column(String, nullable=False)
asin = Column(String)
title = Column(String)
price = Column(Float)
rating = Column(Float)
review_count = Column(Integer)
category = Column(String)
is_prime = Column(Boolean)
scraped_at = Column(DateTime, default=datetime.utcnow, index=True)
class TrackedSeller(Base):
"""Sellers to monitor on a schedule."""
__tablename__ = "tracked_sellers"
id = Column(Integer, primary_key=True)
seller_id = Column(String, nullable=False)
marketplace = Column(String, nullable=False, default="com")
label = Column(String)
is_active = Column(Boolean, default=True)
added_at = Column(DateTime, default=datetime.utcnow)
last_checked = Column(DateTime)
last_positive_pct = Column(Float)
last_feedback_count = Column(Integer)
last_storefront_size = Column(Integer)
Base.metadata.create_all(engine)
class SellerDatabase:
def save_profile(self, seller_id: str, marketplace: str, data: dict):
with Session() as session:
profile = SellerProfile(
seller_id=seller_id,
marketplace=marketplace,
seller_name=data.get("seller_name"),
feedback_count=data.get("feedback_count"),
positive_pct=data.get("positive_pct"),
neutral_pct=data.get("neutral_pct"),
negative_pct=data.get("negative_pct"),
positive_30d=data.get("positive_30d"),
neutral_30d=data.get("neutral_30d"),
negative_30d=data.get("negative_30d"),
positive_90d=data.get("positive_90d"),
neutral_90d=data.get("neutral_90d"),
negative_90d=data.get("negative_90d"),
account_created=data.get("account_created"),
is_verified=data.get("is_verified"),
scraped_at=datetime.utcnow(),
)
session.add(profile)
session.commit()
def save_feedback_entries(self, entries: list[dict]):
with Session() as session:
for entry in entries:
# Skip if already stored
existing = session.query(SellerFeedbackEntry).filter_by(
feedback_id=entry.get("feedback_id")
).first()
if existing:
continue
record = SellerFeedbackEntry(
seller_id=entry["seller_id"],
marketplace=entry["marketplace"],
feedback_id=entry.get("feedback_id"),
rating=entry.get("rating"),
is_positive=entry.get("rating", 0) >= 4,
is_negative=entry.get("rating", 0) <= 2,
comment=entry.get("comment"),
asin=entry.get("asin"),
date=entry.get("date"),
scraped_at=datetime.utcnow(),
)
session.add(record)
session.commit()
def save_storefront(self, seller_id: str, marketplace: str, products: list[dict]):
with Session() as session:
for product in products:
record = StorefrontProduct(
seller_id=seller_id,
marketplace=marketplace,
asin=product.get("asin"),
title=product.get("title"),
price=product.get("price"),
rating=product.get("rating"),
review_count=product.get("review_count"),
category=product.get("category"),
is_prime=product.get("is_prime", False),
scraped_at=datetime.utcnow(),
)
session.add(record)
session.commit()
def get_previous_profile(
self, seller_id: str, marketplace: str
) -> Optional[SellerProfile]:
with Session() as session:
return (
session.query(SellerProfile)
.filter_by(seller_id=seller_id, marketplace=marketplace)
.order_by(SellerProfile.scraped_at.desc())
.first()
)
def get_negative_feedback(
self, seller_id: str, marketplace: str, days: int = 30
) -> list[SellerFeedbackEntry]:
from datetime import timedelta
cutoff = datetime.utcnow() - timedelta(days=days)
with Session() as session:
return (
session.query(SellerFeedbackEntry)
.filter(
SellerFeedbackEntry.seller_id == seller_id,
SellerFeedbackEntry.marketplace == marketplace,
SellerFeedbackEntry.is_negative == True,
SellerFeedbackEntry.scraped_at >= cutoff,
)
.order_by(SellerFeedbackEntry.scraped_at.desc())
.all()
)
def get_storefront_asins(
self, seller_id: str, marketplace: str, latest_only: bool = True
) -> list[str]:
with Session() as session:
if latest_only:
# Get ASINs from the most recent storefront scrape
latest = (
session.query(StorefrontProduct.scraped_at)
.filter_by(seller_id=seller_id, marketplace=marketplace)
.order_by(StorefrontProduct.scraped_at.desc())
.first()
)
if not latest:
return []
from datetime import timedelta
cutoff = latest[0] - timedelta(minutes=5)
records = (
session.query(StorefrontProduct.asin)
.filter(
StorefrontProduct.seller_id == seller_id,
StorefrontProduct.marketplace == marketplace,
StorefrontProduct.scraped_at >= cutoff,
)
.all()
)
return [r[0] for r in records if r[0]]
else:
records = (
session.query(StorefrontProduct.asin)
.filter_by(seller_id=seller_id, marketplace=marketplace)
.distinct()
.all()
)
return [r[0] for r in records if r[0]]
def get_active_sellers(self) -> list[TrackedSeller]:
with Session() as session:
return session.query(TrackedSeller).filter_by(is_active=True).all()
def add_seller(
self,
seller_id: str,
marketplace: str = "com",
label: str = None,
) -> TrackedSeller:
with Session() as session:
existing = session.query(TrackedSeller).filter_by(
seller_id=seller_id, marketplace=marketplace
).first()
if existing:
existing.is_active = True
session.commit()
return existing
seller = TrackedSeller(
seller_id=seller_id,
marketplace=marketplace,
label=label or seller_id,
)
session.add(seller)
session.commit()
return sellerStep 3: Fetching Seller Data
python
# seller_collector.py
import httpx
import asyncio
import os
from datetime import datetime
from typing import Optional
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
BASE_URL = "https://api.scrapebadger.com/v1"
HEADERS = {"X-API-Key": API_KEY}
async def fetch_seller_profile(
client: httpx.AsyncClient,
seller_id: str,
marketplace: str = "com",
) -> Optional[dict]:
"""
Fetch seller profile and aggregate feedback summary.
Returns overall rating, feedback count, and 30/90-day breakdowns.
3 credits per successful request.
"""
try:
response = await client.get(
f"{BASE_URL}/amazon/sellers/{seller_id}",
params={"domain": marketplace},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
# Normalise feedback breakdown structure
feedback = data.get("feedback", {})
by_period = feedback.get("by_period", {})
period_30 = by_period.get("30_days", {})
period_90 = by_period.get("90_days", {})
return {
"seller_id": seller_id,
"seller_name": data.get("name"),
"marketplace": marketplace,
"feedback_count": feedback.get("count") or feedback.get("total"),
"positive_pct": feedback.get("positive_percentage"),
"neutral_pct": feedback.get("neutral_percentage"),
"negative_pct": feedback.get("negative_percentage"),
"positive_30d": period_30.get("positive"),
"neutral_30d": period_30.get("neutral"),
"negative_30d": period_30.get("negative"),
"positive_90d": period_90.get("positive"),
"neutral_90d": period_90.get("neutral"),
"negative_90d": period_90.get("negative"),
"account_created": data.get("account_since"),
"is_verified": data.get("is_verified", False),
"business_type": data.get("business_type"),
}
except Exception as e:
print(f"Error fetching profile for {seller_id}: {e}")
return None
async def fetch_seller_feedback(
client: httpx.AsyncClient,
seller_id: str,
marketplace: str = "com",
max_pages: int = 5,
rating_filter: Optional[int] = None, # None = all, 1 = only 1-star, etc.
) -> list[dict]:
"""
Fetch paginated buyer feedback entries for a seller.
rating_filter: pass 1 or 2 to collect only negative feedback.
3 credits per page. Set max_pages=1 for recent feedback only.
"""
all_entries = []
page = 1
while page <= max_pages:
try:
params = {"domain": marketplace, "page": page}
if rating_filter:
params["rating"] = rating_filter
response = await client.get(
f"{BASE_URL}/amazon/sellers/{seller_id}/feedback",
params=params,
timeout=30.0,
)
response.raise_for_status()
data = response.json()
entries = data.get("feedback", [])
if not entries:
break
for entry in entries:
all_entries.append({
"seller_id": seller_id,
"marketplace": marketplace,
"feedback_id": entry.get("id"),
"rating": entry.get("rating"),
"comment": entry.get("comment"),
"asin": entry.get("asin"),
"date": entry.get("date"),
})
# Check if there are more pages
pagination = data.get("pagination", {})
if not pagination.get("has_next_page", False):
break
page += 1
except Exception as e:
print(f"Error fetching feedback p{page} for {seller_id}: {e}")
break
return all_entries
async def fetch_seller_storefront(
client: httpx.AsyncClient,
seller_id: str,
marketplace: str = "com",
max_pages: int = 10,
) -> list[dict]:
"""
Fetch all products in a seller's storefront.
5 credits per page. max_pages=10 captures ~240 products.
"""
all_products = []
page = 1
while page <= max_pages:
try:
response = await client.get(
f"{BASE_URL}/amazon/sellers/{seller_id}/products",
params={"domain": marketplace, "page": page},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
products = data.get("products", [])
if not products:
break
for product in products:
import re
price_raw = product.get("price", "")
price = None
if price_raw:
cleaned = re.sub(r"[^\d.]", "", str(price_raw).replace(",", ""))
try:
price = float(cleaned)
except ValueError:
pass
all_products.append({
"asin": product.get("asin"),
"title": product.get("title"),
"price": price,
"rating": product.get("rating"),
"review_count": product.get("review_count"),
"category": product.get("category"),
"is_prime": product.get("is_prime", False),
})
pagination = data.get("pagination", {})
if not pagination.get("has_next_page", False):
break
page += 1
except Exception as e:
print(f"Error fetching storefront p{page} for {seller_id}: {e}")
break
return all_productsStep 4: Change Detection — The Intelligence Layer
python
# seller_detector.py
import os
from dataclasses import dataclass
from typing import Optional
from collections import Counter
import re
FEEDBACK_DROP_THRESHOLD = float(os.getenv("FEEDBACK_DROP_THRESHOLD", "2.0"))
NEGATIVE_SURGE_THRESHOLD = int(os.getenv("NEGATIVE_SURGE_THRESHOLD", "5"))
@dataclass
class SellerAlert:
seller_id: str
marketplace: str
label: str
alert_type: str
current_value: Optional[float]
previous_value: Optional[float]
detail: Optional[str] = None
def format_message(self) -> str:
if self.alert_type == "rating_drop":
return (
f"⚠️ SELLER RATING DROP: {self.label}\n"
f"{self.previous_value:.1f}% → {self.current_value:.1f}% positive\n"
f"Seller ID: {self.seller_id} on amazon.{self.marketplace}"
)
elif self.alert_type == "negative_surge":
return (
f"🚨 NEGATIVE FEEDBACK SURGE: {self.label}\n"
f"{int(self.current_value)} new negative entries detected\n"
f"{self.detail or ''}\n"
f"Seller ID: {self.seller_id}"
)
elif self.alert_type == "storefront_expansion":
return (
f"📦 STOREFRONT EXPANDED: {self.label}\n"
f"{int(self.previous_value)} → {int(self.current_value)} products "
f"(+{int(self.current_value - self.previous_value)} new ASINs)\n"
f"Seller ID: {self.seller_id} on amazon.{self.marketplace}"
)
elif self.alert_type == "storefront_contraction":
return (
f"📉 STOREFRONT SHRUNK: {self.label}\n"
f"{int(self.previous_value)} → {int(self.current_value)} products "
f"(-{int(self.previous_value - self.current_value)} ASINs removed)\n"
f"Seller ID: {self.seller_id}"
)
elif self.alert_type == "new_category":
return (
f"🆕 NEW CATEGORY DETECTED: {self.label}\n"
f"Now selling in: {self.detail}\n"
f"Seller ID: {self.seller_id}"
)
return f"Seller update: {self.label}"
def analyse_negative_feedback(
entries: list[dict],
) -> dict:
"""
Extract recurring themes from negative feedback.
Returns most common complaint categories.
"""
if not entries:
return {}
comments = [
e.get("comment", "").lower()
for e in entries if e.get("comment")
]
# Simple keyword categorisation
categories = {
"shipping": ["late", "slow", "shipping", "delivery", "arrived", "damaged"],
"product_quality": ["broken", "defective", "quality", "fake", "counterfeit", "wrong"],
"seller_communication": ["no response", "ignored", "contact", "customer service"],
"returns": ["refund", "return", "won't", "refused"],
"description": ["not as described", "different", "misleading", "inaccurate"],
}
category_counts = Counter()
for comment in comments:
for category, keywords in categories.items():
if any(kw in comment for kw in keywords):
category_counts[category] += 1
total = len(comments)
return {
cat: {"count": count, "pct": round(count / total * 100, 1)}
for cat, count in category_counts.most_common()
if count > 0
}
def detect_seller_changes(
seller_id: str,
marketplace: str,
label: str,
current_profile: dict,
previous_profile,
current_storefront_size: int,
previous_storefront_size: Optional[int],
new_negative_count: int,
new_categories: list[str] = None,
) -> list[SellerAlert]:
"""
Compare current seller data against previous to generate alerts.
"""
alerts = []
curr_pos = current_profile.get("positive_pct", 100)
prev_pos = previous_profile.positive_pct if previous_profile else None
# Rating drop alert
if prev_pos and curr_pos and (prev_pos - curr_pos) >= FEEDBACK_DROP_THRESHOLD:
alerts.append(SellerAlert(
seller_id=seller_id, marketplace=marketplace, label=label,
alert_type="rating_drop",
current_value=curr_pos,
previous_value=prev_pos,
))
# Negative feedback surge
if new_negative_count >= NEGATIVE_SURGE_THRESHOLD:
alerts.append(SellerAlert(
seller_id=seller_id, marketplace=marketplace, label=label,
alert_type="negative_surge",
current_value=float(new_negative_count),
previous_value=None,
detail=f"Check recent feedback for recurring complaints",
))
# Storefront size changes
if previous_storefront_size is not None and current_storefront_size > 0:
size_change = current_storefront_size - previous_storefront_size
change_pct = abs(size_change) / max(previous_storefront_size, 1) * 100
if size_change >= 10 and change_pct >= 15:
alerts.append(SellerAlert(
seller_id=seller_id, marketplace=marketplace, label=label,
alert_type="storefront_expansion",
current_value=float(current_storefront_size),
previous_value=float(previous_storefront_size),
))
elif size_change <= -10 and change_pct >= 15:
alerts.append(SellerAlert(
seller_id=seller_id, marketplace=marketplace, label=label,
alert_type="storefront_contraction",
current_value=float(current_storefront_size),
previous_value=float(previous_storefront_size),
))
# New category expansion
if new_categories:
for category in new_categories:
alerts.append(SellerAlert(
seller_id=seller_id, marketplace=marketplace, label=label,
alert_type="new_category",
current_value=None, previous_value=None,
detail=category,
))
return alertsStep 5: The Full Seller Check Cycle
python
# seller_monitor.py
import asyncio
import httpx
import os
import random
from datetime import datetime
from database import SellerDatabase
from seller_collector import (
fetch_seller_profile,
fetch_seller_feedback,
fetch_seller_storefront,
)
from seller_detector import detect_seller_changes, analyse_negative_feedback
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
db = SellerDatabase()
async def check_seller(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
seller_id: str,
marketplace: str,
label: str,
collect_storefront: bool = True,
collect_feedback: bool = True,
) -> dict:
"""Full check cycle for one seller."""
async with semaphore:
await asyncio.sleep(random.uniform(0.5, 1.5))
# Fetch profile
profile = await fetch_seller_profile(client, seller_id, marketplace)
if not profile:
return {"seller_id": seller_id, "status": "failed"}
# Get previous profile for comparison
previous = db.get_previous_profile(seller_id, marketplace)
# Fetch new negative feedback only (3 credits per page)
new_negative_count = 0
negative_themes = {}
if collect_feedback:
neg_entries = await fetch_seller_feedback(
client, seller_id, marketplace,
max_pages=2,
rating_filter=None,
)
# Count new negatives since last check
if previous and previous.scraped_at:
new_negatives = [
e for e in neg_entries
if e.get("rating", 5) <= 2
]
new_negative_count = len(new_negatives)
if new_negatives:
db.save_feedback_entries(new_negatives)
negative_themes = analyse_negative_feedback(new_negatives)
# Fetch storefront and detect category expansion
current_storefront_size = 0
new_categories = []
if collect_storefront:
storefront = await fetch_seller_storefront(
client, seller_id, marketplace, max_pages=5
)
current_storefront_size = len(storefront)
if storefront:
db.save_storefront(seller_id, marketplace, storefront)
# Detect new categories
current_categories = set(
p["category"] for p in storefront if p.get("category")
)
previous_asins = set(
db.get_storefront_asins(seller_id, marketplace)
)
# Simple category comparison via previously stored data
# In production, persist categories separately
# Detect changes
previous_storefront_size = (
previous.last_storefront_size if previous else None
)
alerts = detect_seller_changes(
seller_id, marketplace, label,
current_profile=profile,
previous_profile=previous,
current_storefront_size=current_storefront_size,
previous_storefront_size=previous_storefront_size,
new_negative_count=new_negative_count,
new_categories=new_categories,
)
# Save profile snapshot
db.save_profile(seller_id, marketplace, profile)
# Fire alerts
if alerts:
from alerts import dispatch
dispatch(alerts)
return {
"seller_id": seller_id,
"status": "ok",
"label": label,
"positive_pct": profile.get("positive_pct"),
"feedback_count": profile.get("feedback_count"),
"storefront_size": current_storefront_size,
"new_negatives": new_negative_count,
"negative_themes": negative_themes,
"alerts": [a.alert_type for a in alerts],
}
async def run_seller_check_cycle(max_concurrent: int = 3):
"""Check all tracked sellers."""
sellers = db.get_active_sellers()
if not sellers:
print("No tracked sellers. Add with db.add_seller()")
return
print(f"\n{'='*50}")
print(f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}] "
f"Checking {len(sellers)} sellers")
print("="*50)
semaphore = asyncio.Semaphore(max_concurrent)
headers = {"X-API-Key": API_KEY}
async with httpx.AsyncClient(headers=headers) as client:
tasks = [
check_seller(
client, semaphore,
s.seller_id, s.marketplace, s.label,
)
for s in sellers
]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r.get("status") == "ok"]
print(f"\nCycle complete: {len(successful)}/{len(sellers)} OK")
for r in successful:
pos = f"{r.get('positive_pct', 0):.0f}%" if r.get('positive_pct') else "N/A"
neg = f" ⚠️ {r['new_negatives']} new negatives" if r.get('new_negatives') else ""
sf = f"| {r.get('storefront_size', 0)} products" if r.get('storefront_size') else ""
print(f" {r['label']:<35} {pos} positive {sf}{neg}")
if r.get("negative_themes"):
print(" Complaint themes:")
for theme, data in r["negative_themes"].items():
print(f" {theme}: {data['count']} mentions ({data['pct']}%)")Step 6: Entry Point
python
# main_sellers.py
import asyncio
import sys
from database import SellerDatabase
from seller_monitor import run_seller_check_cycle
from seller_discovery import discover_sellers_from_asins
db = SellerDatabase()
async def setup():
"""Add sellers to monitor."""
# Discover competitors from ASINs you're already tracking
your_category_asins = [
"B09V3KXJPB", # Sony WH-1000XM5
"B07XJ8C8F7", # Competitor A
"B09B8YWXDF", # Competitor B
]
sellers = await discover_sellers_from_asins(
your_category_asins,
marketplace="com",
exclude_amazon=True,
)
# Add top competitors (most ASINs, highest Buy Box wins)
for seller_id, data in list(sellers.items())[:20]:
db.add_seller(
seller_id,
marketplace="com",
label=data.get("seller_name", seller_id),
)
print(f"Added {min(len(sellers), 20)} sellers to watchlist")
if __name__ == "__main__":
command = sys.argv[1] if len(sys.argv) > 1 else "check"
if command == "setup":
asyncio.run(setup())
elif command == "check":
asyncio.run(run_seller_check_cycle())
elif command == "schedule":
import time
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 360 # 6 hours default
print(f"Seller monitor started — checking every {interval} minutes")
while True:
asyncio.run(run_seller_check_cycle())
print(f"Next check in {interval} minutes")
time.sleep(interval * 60)Running it:
bash
# Discover competitors and add to watchlist
python main_sellers.py setup
# Run one check cycle
python main_sellers.py check
# Continuous monitoring every 6 hours
python main_sellers.py schedule 360What the Intelligence Picture Looks Like
With a week of data across 20 competitor sellers, the patterns that emerge are commercially meaningful:
Feedback trajectory as a leading indicator. A competitor's positive rating dropping from 97% to 91% over 30 days, with complaint themes clustering around "counterfeit" and "not as described", signals a sourcing or quality control problem before it shows up in their BSR or Buy Box losses. You know before the market does.
Storefront expansion as a market entry signal. A competitor adding 40 new ASINs in a category you're not currently competing in is a market move you'd want to know about. The storefront endpoint surfaces this weeks before any pricing data changes.
New category entry as strategic intelligence. A seller who's been selling consumer electronics expanding into home improvement or personal care is pivoting. Their sourcing relationships, their operational capability, and their Amazon presence are all signals worth tracking.
Feedback complaint themes as competitive advantage. If your main competitors consistently receive negative feedback about shipping speed, and you're FBA while they're FBM, you have a differentiator that the raw rating numbers don't show. The comment text makes it explicit.
All of the above is available through ScrapeBadger's three seller endpoints at modest credit costs. Profile checks at 3 credits each, feedback at 3 credits per page, storefront at 5 credits per page. The full Amazon Scraper documentation covers every field and pagination parameter.
For teams combining seller intelligence with the price tracking pipeline from the Amazon price tracker tutorial, the complete picture is: which sellers are active in your category, what they're selling, what prices they're running, who owns the Buy Box, and how their customer satisfaction is trending — all updated on schedule, all stored for historical analysis, all alerting when something meaningful changes.

Written by
Thomas Shultz
Thomas Shultz is the Head of Data at ScrapeBadger, working on public web data, scraping infrastructure, and data reliability. He writes about real-world scraping, data pipelines, and turning unstructured web data into usable signals.
Ready to get started?
Join thousands of developers using ScrapeBadger for their data needs.