Your brand is being discussed on Reddit right now. Not necessarily about you specifically โ though it might be โ but about problems your product solves, comparisons to your competitors, questions your potential customers are asking, and opinions your existing customers are sharing in communities where real names don't appear.
The challenge is coverage. Reddit has 100,000+ active subreddits. A complaint in r/personalfinance, a comparison thread in r/homelab, a recommendation request in r/Entrepreneur โ none of these are in the communities you're already monitoring. The mentions that matter most are often the ones you never thought to look for.
A Reddit brand monitor covers both dimensions: subreddit-level monitoring for known communities where your brand appears, and cross-Reddit keyword search for discovering mentions wherever they surface. This guide builds both using ScrapeBadger's Reddit Scraper โ collecting posts and comment threads, scoring sentiment, detecting volume spikes and sentiment shifts, and delivering alerts through Slack and email.
Architecture
[Keywords + Subreddits]
โ
ScrapeBadger Reddit API
/v1/reddit/search โ cross-Reddit keyword search
/v1/reddit/subreddit/posts โ subreddit feed monitoring
/v1/reddit/post/comments โ comment thread collection
โ
[Mention Record] โ SQLite
โ
[Sentiment Scorer] โ positive / neutral / negative
โ
[Change Detector] โ volume spike, sentiment shift
โ
[Alert Dispatcher] โ Slack / EmailSetup
bash
pip install httpx sqlalchemy textblob python-dotenv aiofiles
python -m textblob.download_corporaenv
SCRAPEBADGER_API_KEY=your_key
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK
ALERT_EMAIL=team@yourcompany.com
SMTP_USER=alerts@yourcompany.com
SMTP_PASSWORD=your_app_password
VOLUME_SPIKE_THRESHOLD=3.0 # Alert if 3x normal daily volume
SENTIMENT_DROP_THRESHOLD=15.0 # Alert if negative % rises by 15 pointsStep 1: Data Models
python
# models.py
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
@dataclass
class MentionRecord:
"""A single Reddit mention โ post or comment containing a tracked keyword."""
mention_id: str # Reddit post/comment ID
mention_type: str # "post" or "comment"
keyword: str # Which tracked keyword triggered this
subreddit: str
title: Optional[str] # Post title (None for comments)
body: str # Post selftext or comment body
author: str
score: int
upvote_ratio: Optional[float]
num_comments: Optional[int]
url: str
permalink: str
created_utc: str
sentiment: Optional[str] = None # "positive", "neutral", "negative"
sentiment_score: Optional[float] = None # -1.0 to 1.0
scraped_at: Optional[datetime] = None
@dataclass
class MonitorConfig:
"""Configuration for a brand monitoring campaign."""
brand_keywords: list[str] # Primary brand/product names
competitor_keywords: list[str] # Competitor names to track
topic_keywords: list[str] # Industry topics to monitor
subreddits_to_watch: list[str] # Communities to poll regularly
alert_on_negative: bool = True
alert_on_spike: bool = True
min_score_threshold: int = 1 # Ignore posts with score below thisStep 2: Database Layer
python
# monitor_database.py
from sqlalchemy import (
create_engine, Column, Integer, Float,
String, DateTime, Boolean, Text, Index
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta
from typing import Optional
Base = declarative_base()
engine = create_engine("sqlite:///brand_monitor.db")
Session = sessionmaker(bind=engine)
class MentionHistory(Base):
__tablename__ = "mentions"
id = Column(Integer, primary_key=True)
mention_id = Column(String, unique=True, index=True)
mention_type = Column(String)
keyword = Column(String, index=True)
subreddit = Column(String, index=True)
title = Column(Text)
body = Column(Text)
author = Column(String)
score = Column(Integer, default=0)
upvote_ratio = Column(Float)
num_comments = Column(Integer)
url = Column(String)
permalink = Column(String)
created_utc = Column(String)
sentiment = Column(String)
sentiment_score = Column(Float)
scraped_at = Column(DateTime, default=datetime.utcnow, index=True)
class DailySummary(Base):
"""Aggregated daily stats per keyword โ used for spike detection."""
__tablename__ = "daily_summaries"
id = Column(Integer, primary_key=True)
date = Column(String, index=True)
keyword = Column(String, index=True)
mention_count = Column(Integer, default=0)
positive_count = Column(Integer, default=0)
neutral_count = Column(Integer, default=0)
negative_count = Column(Integer, default=0)
avg_score = Column(Float)
top_subreddits = Column(String)
Base.metadata.create_all(engine)
class MonitorDatabase:
def save_mention(self, mention: MentionRecord) -> bool:
"""Save a mention. Returns False if already exists (deduplication)."""
with Session() as session:
existing = session.query(MentionHistory).filter_by(
mention_id=mention.mention_id
).first()
if existing:
return False
record = MentionHistory(
mention_id=mention.mention_id,
mention_type=mention.mention_type,
keyword=mention.keyword,
subreddit=mention.subreddit,
title=mention.title,
body=mention.body[:2000] if mention.body else "",
author=mention.author,
score=mention.score,
upvote_ratio=mention.upvote_ratio,
num_comments=mention.num_comments,
url=mention.url,
permalink=mention.permalink,
created_utc=mention.created_utc,
sentiment=mention.sentiment,
sentiment_score=mention.sentiment_score,
scraped_at=mention.scraped_at or datetime.utcnow(),
)
session.add(record)
session.commit()
return True
def get_mention_count(
self,
keyword: str,
days: int = 7,
) -> int:
"""Count mentions for a keyword over the last N days."""
cutoff = datetime.utcnow() - timedelta(days=days)
with Session() as session:
return (
session.query(MentionHistory)
.filter(
MentionHistory.keyword == keyword,
MentionHistory.scraped_at >= cutoff,
)
.count()
)
def get_sentiment_breakdown(
self,
keyword: str,
days: int = 7,
) -> dict:
"""Get positive/neutral/negative breakdown for a keyword."""
cutoff = datetime.utcnow() - timedelta(days=days)
with Session() as session:
records = (
session.query(MentionHistory.sentiment)
.filter(
MentionHistory.keyword == keyword,
MentionHistory.scraped_at >= cutoff,
MentionHistory.sentiment.isnot(None),
)
.all()
)
counts = {"positive": 0, "neutral": 0, "negative": 0}
for (sentiment,) in records:
if sentiment in counts:
counts[sentiment] += 1
total = sum(counts.values())
if total == 0:
return counts
return {
k: {"count": v, "pct": round(v / total * 100, 1)}
for k, v in counts.items()
}
def get_recent_negative(
self,
keyword: str,
hours: int = 24,
) -> list[MentionHistory]:
"""Get recent negative mentions for a keyword."""
cutoff = datetime.utcnow() - timedelta(hours=hours)
with Session() as session:
return (
session.query(MentionHistory)
.filter(
MentionHistory.keyword == keyword,
MentionHistory.sentiment == "negative",
MentionHistory.scraped_at >= cutoff,
)
.order_by(MentionHistory.score.desc())
.limit(10)
.all()
)
def get_top_subreddits(
self,
keyword: str,
days: int = 30,
limit: int = 10,
) -> list[tuple]:
"""Get subreddits where keyword is most mentioned."""
from sqlalchemy import func
cutoff = datetime.utcnow() - timedelta(days=days)
with Session() as session:
return (
session.query(
MentionHistory.subreddit,
func.count(MentionHistory.id).label("count"),
)
.filter(
MentionHistory.keyword == keyword,
MentionHistory.scraped_at >= cutoff,
)
.group_by(MentionHistory.subreddit)
.order_by(func.count(MentionHistory.id).desc())
.limit(limit)
.all()
)Step 3: Sentiment Scoring
python
# sentiment.py
from textblob import TextBlob
import re
from typing import tuple
# Reddit-specific sentiment adjustments
# Words that carry stronger signal in tech/product discussions
POSITIVE_BOOSTERS = {
"love", "excellent", "perfect", "amazing", "fantastic",
"highly recommend", "great product", "works great",
"game changer", "worth it", "best purchase",
}
NEGATIVE_BOOSTERS = {
"garbage", "terrible", "avoid", "scam", "broken",
"waste of money", "returned it", "don't buy",
"worst", "regret", "defective", "fraud",
}
NEUTRAL_OVERRIDES = {
"question", "asking", "curious", "wondering",
"anyone know", "can someone", "help with",
}
def score_sentiment(text: str) -> tuple[str, float]:
"""
Score Reddit text sentiment.
Returns (label, score) where score is -1.0 to 1.0.
Adjusts TextBlob base score with Reddit-specific signals.
"""
if not text or len(text.strip()) < 10:
return "neutral", 0.0
text_lower = text.lower()
# Check for neutral question patterns first
if any(phrase in text_lower for phrase in NEUTRAL_OVERRIDES):
return "neutral", 0.0
# TextBlob base score
blob = TextBlob(text)
base_score = blob.sentiment.polarity
# Apply Reddit-specific boosters
boost = 0.0
for phrase in POSITIVE_BOOSTERS:
if phrase in text_lower:
boost += 0.3
for phrase in NEGATIVE_BOOSTERS:
if phrase in text_lower:
boost -= 0.3
# Clamp to valid range
final_score = max(-1.0, min(1.0, base_score + boost))
# Label based on thresholds
if final_score > 0.1:
label = "positive"
elif final_score < -0.1:
label = "negative"
else:
label = "neutral"
return label, round(final_score, 3)
def score_mention(text: str, title: str = None) -> tuple[str, float]:
"""Score a post or comment, combining title and body if available."""
combined = ""
if title:
combined += title + " "
combined += (text or "")
return score_sentiment(combined.strip())Step 4: The ScrapeBadger Collection Layer
Two collection strategies working in parallel: subreddit feed polling for known communities, and cross-Reddit search for brand discovery.
python
# collector.py
import httpx
import asyncio
import os
import re
from datetime import datetime
from typing import Optional
from models import MentionRecord
from sentiment import score_mention
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
BASE_URL = "https://api.scrapebadger.com/v1"
HEADERS = {"X-API-Key": API_KEY}
def extract_mentions(
text: str,
keywords: list[str],
) -> list[str]:
"""Find which keywords appear in text. Case-insensitive."""
text_lower = text.lower()
return [kw for kw in keywords if kw.lower() in text_lower]
async def search_reddit(
client: httpx.AsyncClient,
query: str,
keyword_label: str,
sort: str = "new",
time_filter: str = "day",
limit: int = 100,
min_score: int = 1,
) -> list[MentionRecord]:
"""
Cross-Reddit keyword search.
Finds mentions of a query across all public subreddits.
Best for brand name discovery โ catches mentions in communities
you don't already know about.
"""
try:
response = await client.get(
f"{BASE_URL}/reddit/search",
params={
"query": query,
"sort": sort,
"time_filter": time_filter,
"limit": limit,
"type": "posts",
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
mentions = []
for post in data.get("posts", []):
score = post.get("score", 0)
if score < min_score:
continue
body = post.get("selftext", "") or ""
title = post.get("title", "") or ""
combined_text = f"{title} {body}"
sentiment_label, sentiment_score = score_mention(body, title)
mentions.append(MentionRecord(
mention_id=post["id"],
mention_type="post",
keyword=keyword_label,
subreddit=post.get("subreddit", ""),
title=title,
body=body,
author=post.get("author", "[deleted]"),
score=score,
upvote_ratio=post.get("upvote_ratio"),
num_comments=post.get("num_comments", 0),
url=post.get("url", ""),
permalink=post.get("permalink", ""),
created_utc=post.get("created_utc", ""),
sentiment=sentiment_label,
sentiment_score=sentiment_score,
scraped_at=datetime.utcnow(),
))
return mentions
except Exception as e:
print(f"Error searching Reddit for '{query}': {e}")
return []
async def poll_subreddit(
client: httpx.AsyncClient,
subreddit: str,
keywords: list[str],
sort: str = "new",
limit: int = 100,
min_score: int = 0,
) -> list[MentionRecord]:
"""
Poll a specific subreddit for posts containing tracked keywords.
Best for communities where you know your brand appears regularly.
"""
try:
response = await client.get(
f"{BASE_URL}/reddit/subreddit/{subreddit}/posts",
params={"sort": sort, "limit": limit},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
mentions = []
for post in data.get("posts", []):
score = post.get("score", 0)
if score < min_score:
continue
title = post.get("title", "") or ""
body = post.get("selftext", "") or ""
combined = f"{title} {body}"
# Only include posts that mention at least one keyword
matched_keywords = extract_mentions(combined, keywords)
if not matched_keywords:
continue
sentiment_label, sentiment_score = score_mention(body, title)
# One record per matched keyword
for keyword in matched_keywords:
mentions.append(MentionRecord(
mention_id=f"{post['id']}_{keyword}",
mention_type="post",
keyword=keyword,
subreddit=subreddit,
title=title,
body=body,
author=post.get("author", "[deleted]"),
score=score,
upvote_ratio=post.get("upvote_ratio"),
num_comments=post.get("num_comments", 0),
url=post.get("url", ""),
permalink=post.get("permalink", ""),
created_utc=post.get("created_utc", ""),
sentiment=sentiment_label,
sentiment_score=sentiment_score,
scraped_at=datetime.utcnow(),
))
return mentions
except Exception as e:
print(f"Error polling r/{subreddit}: {e}")
return []
async def collect_post_comments(
client: httpx.AsyncClient,
post_id: str,
subreddit: str,
keywords: list[str],
min_comment_score: int = 2,
) -> list[MentionRecord]:
"""
Collect comments from a high-engagement post.
Use this to monitor comment threads on viral mentions.
"""
try:
response = await client.get(
f"{BASE_URL}/reddit/post/{post_id}/comments",
timeout=30.0,
)
response.raise_for_status()
data = response.json()
mentions = []
for comment in data.get("comments", []):
body = comment.get("body", "") or ""
if not body or comment.get("score", 0) < min_comment_score:
continue
matched = extract_mentions(body, keywords)
if not matched:
continue
sentiment_label, sentiment_score = score_mention(body)
for keyword in matched:
mentions.append(MentionRecord(
mention_id=f"comment_{comment['id']}_{keyword}",
mention_type="comment",
keyword=keyword,
subreddit=subreddit,
title=None,
body=body,
author=comment.get("author", "[deleted]"),
score=comment.get("score", 0),
upvote_ratio=None,
num_comments=None,
url=f"https://reddit.com{comment.get('permalink', '')}",
permalink=comment.get("permalink", ""),
created_utc=comment.get("created_utc", ""),
sentiment=sentiment_label,
sentiment_score=sentiment_score,
scraped_at=datetime.utcnow(),
))
return mentions
except Exception as e:
print(f"Error fetching comments for {post_id}: {e}")
return []Step 5: Change Detection
python
# monitor_detector.py
import os
from dataclasses import dataclass
from typing import Optional
from monitor_database import MonitorDatabase
VOLUME_SPIKE_THRESHOLD = float(os.getenv("VOLUME_SPIKE_THRESHOLD", "3.0"))
SENTIMENT_DROP_THRESHOLD = float(os.getenv("SENTIMENT_DROP_THRESHOLD", "15.0"))
db = MonitorDatabase()
@dataclass
class MonitorAlert:
keyword: str
alert_type: str
current_value: float
baseline_value: Optional[float]
detail: Optional[str] = None
top_mentions: Optional[list] = None
def format_message(self) -> str:
if self.alert_type == "volume_spike":
return (
f"๐ฅ MENTION SPIKE: '{self.keyword}'\n"
f"Today: {int(self.current_value)} mentions "
f"(baseline: {self.baseline_value:.0f}/day, "
f"{self.current_value / max(self.baseline_value, 1):.1f}x normal)\n"
f"{self.detail or ''}"
)
elif self.alert_type == "negative_surge":
return (
f"โ ๏ธ NEGATIVE SENTIMENT SURGE: '{self.keyword}'\n"
f"Negative mentions: {self.current_value:.0f}% "
f"(was {self.baseline_value:.0f}%)\n"
f"{self.detail or ''}"
)
elif self.alert_type == "high_score_mention":
return (
f"๐ HIGH-ENGAGEMENT MENTION: '{self.keyword}'\n"
f"Score: {int(self.current_value)} points\n"
f"{self.detail or ''}"
)
elif self.alert_type == "new_subreddit":
return (
f"๐ NEW COMMUNITY DETECTED: '{self.keyword}'\n"
f"First mention in r/{self.detail}\n"
f"Score: {int(self.current_value)}"
)
return f"Brand monitor alert for '{self.keyword}'"
def detect_volume_spike(
keyword: str,
new_mentions_today: int,
lookback_days: int = 14,
) -> Optional[MonitorAlert]:
"""Alert when today's mention volume is significantly above normal."""
# Calculate baseline from previous period
total_past = db.get_mention_count(keyword, days=lookback_days)
daily_baseline = total_past / lookback_days if lookback_days > 0 else 0
if daily_baseline < 2:
# Too little history for meaningful spike detection
return None
spike_ratio = new_mentions_today / daily_baseline
if spike_ratio >= VOLUME_SPIKE_THRESHOLD:
return MonitorAlert(
keyword=keyword,
alert_type="volume_spike",
current_value=float(new_mentions_today),
baseline_value=daily_baseline,
detail=f"Investigate what's driving the spike",
)
return None
def detect_sentiment_shift(
keyword: str,
current_negative_pct: float,
) -> Optional[MonitorAlert]:
"""Alert when negative sentiment percentage rises sharply."""
# Compare against 30-day baseline
baseline = db.get_sentiment_breakdown(keyword, days=30)
baseline_neg = baseline.get("negative", {}).get("pct", 0)
shift = current_negative_pct - baseline_neg
if shift >= SENTIMENT_DROP_THRESHOLD:
return MonitorAlert(
keyword=keyword,
alert_type="negative_surge",
current_value=current_negative_pct,
baseline_value=baseline_neg,
detail=f"Negative % rose {shift:.0f} points above 30-day baseline",
)
return None
def check_high_score_mention(
mention,
score_threshold: int = 100,
) -> Optional[MonitorAlert]:
"""Alert on individual mentions with unusually high engagement."""
if mention.score >= score_threshold:
return MonitorAlert(
keyword=mention.keyword,
alert_type="high_score_mention",
current_value=float(mention.score),
baseline_value=None,
detail=(
f"r/{mention.subreddit}: "
f"{(mention.title or mention.body[:80]).strip()}\n"
f"Sentiment: {mention.sentiment} | "
f"URL: https://reddit.com{mention.permalink}"
),
)
return NoneStep 6: Alert Delivery
python
# monitor_alerts.py
import os
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from monitor_detector import MonitorAlert
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
def send_slack(alert: MonitorAlert) -> bool:
if not SLACK_WEBHOOK:
return False
color_map = {
"volume_spike": "#ff8800",
"negative_surge": "#ff4444",
"high_score_mention": "#00aaff",
"new_subreddit": "#00aa00",
}
try:
requests.post(SLACK_WEBHOOK, json={
"attachments": [{
"color": color_map.get(alert.alert_type, "#888888"),
"title": f"Reddit Brand Monitor: {alert.keyword}",
"text": alert.format_message(),
"footer": "ScrapeBadger Brand Monitor",
}]
}, timeout=10)
return True
except Exception as e:
print(f"Slack failed: {e}")
return False
def send_email(alert: MonitorAlert) -> bool:
if not all([SMTP_USER, SMTP_PASSWORD, ALERT_EMAIL]):
return False
subject_map = {
"volume_spike": "๐ฅ Reddit Mention Spike",
"negative_surge": "โ ๏ธ Reddit Negative Sentiment Alert",
"high_score_mention": "๐ High-Engagement Reddit Mention",
"new_subreddit": "๐ New Reddit Community Mention",
}
subject = subject_map.get(alert.alert_type, "Reddit Brand Alert")
subject += f": {alert.keyword}"
msg = MIMEMultipart()
msg["From"] = SMTP_USER
msg["To"] = ALERT_EMAIL
msg["Subject"] = subject
msg.attach(MIMEText(alert.format_message(), "plain"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email failed: {e}")
return False
def dispatch(alerts: list[MonitorAlert]):
for alert in alerts:
print(f"\n๐ {alert.format_message()}\n")
send_slack(alert)
send_email(alert)Step 7: The Main Monitor Cycle
python
# brand_monitor.py
import asyncio
import httpx
import os
import random
from datetime import datetime
from models import MonitorConfig, MentionRecord
from monitor_database import MonitorDatabase
from collector import search_reddit, poll_subreddit, collect_post_comments
from monitor_detector import (
detect_volume_spike,
detect_sentiment_shift,
check_high_score_mention,
)
from monitor_alerts import dispatch
API_KEY = os.environ["SCRAPEBADGER_API_KEY"]
db = MonitorDatabase()
async def run_monitor_cycle(
config: MonitorConfig,
max_concurrent: int = 5,
) -> dict:
"""
Full monitoring cycle:
1. Search cross-Reddit for all keywords
2. Poll known subreddits for brand mentions
3. Score sentiment on all new mentions
4. Detect volume spikes and sentiment shifts
5. Fire alerts
"""
semaphore = asyncio.Semaphore(max_concurrent)
headers = {"X-API-Key": API_KEY}
all_alerts = []
total_new = 0
print(f"\n{'='*55}")
print(f"[{datetime.utcnow().strftime('%Y-%m-%d %H:%M')}] "
f"Reddit Brand Monitor running")
print("="*55)
async with httpx.AsyncClient(headers=headers) as client:
# --- PHASE 1: Cross-Reddit keyword search ---
print(f"\nSearching Reddit for "
f"{len(config.brand_keywords)} brand keywords...")
all_keywords = (
config.brand_keywords +
config.competitor_keywords +
config.topic_keywords
)
async def bounded_search(keyword: str) -> list[MentionRecord]:
async with semaphore:
await asyncio.sleep(random.uniform(0.3, 1.0))
return await search_reddit(
client, keyword, keyword,
sort="new", time_filter="day", limit=100,
)
search_results = await asyncio.gather(
*[bounded_search(kw) for kw in all_keywords]
)
for mentions in search_results:
for mention in mentions:
is_new = db.save_mention(mention)
if is_new:
total_new += 1
# Check for high-engagement individual mentions
alert = check_high_score_mention(mention, score_threshold=50)
if alert:
all_alerts.append(alert)
# --- PHASE 2: Subreddit polling ---
if config.subreddits_to_watch:
print(f"\nPolling {len(config.subreddits_to_watch)} subreddits...")
async def bounded_poll(subreddit: str) -> list[MentionRecord]:
async with semaphore:
await asyncio.sleep(random.uniform(0.3, 1.0))
return await poll_subreddit(
client, subreddit, all_keywords,
sort="new", limit=100,
)
poll_results = await asyncio.gather(
*[bounded_poll(sr) for sr in config.subreddits_to_watch]
)
for mentions in poll_results:
for mention in mentions:
is_new = db.save_mention(mention)
if is_new:
total_new += 1
# --- PHASE 3: Per-keyword analytics and spike detection ---
print(f"\nAnalysing {total_new} new mentions...")
for keyword in config.brand_keywords:
# Count today's mentions
today_count = db.get_mention_count(keyword, days=1)
# Volume spike detection
spike_alert = detect_volume_spike(keyword, today_count)
if spike_alert:
all_alerts.append(spike_alert)
# Sentiment shift detection
sentiment = db.get_sentiment_breakdown(keyword, days=1)
neg_pct = sentiment.get("negative", {}).get("pct", 0)
if neg_pct > 0:
shift_alert = detect_sentiment_shift(keyword, neg_pct)
if shift_alert:
all_alerts.append(shift_alert)
# Print per-keyword summary
pos = sentiment.get("positive", {}).get("count", 0)
neu = sentiment.get("neutral", {}).get("count", 0)
neg = sentiment.get("negative", {}).get("count", 0)
print(f" '{keyword}': {today_count} mentions today | "
f"โ
{pos} ๐{neu} โ{neg}")
# Top communities
top_subreddits = db.get_top_subreddits(keyword, days=7, limit=3)
if top_subreddits:
communities = ", ".join(
f"r/{sr} ({count})" for sr, count in top_subreddits
)
print(f" Top: {communities}")
# --- PHASE 4: Fire alerts ---
if all_alerts:
print(f"\nFiring {len(all_alerts)} alerts...")
dispatch(all_alerts)
else:
print("\nNo alerts triggered this cycle.")
return {
"total_new_mentions": total_new,
"alerts_fired": len(all_alerts),
"cycle_completed_at": datetime.utcnow().isoformat(),
}Step 8: Entry Point
python
# main_monitor.py
import asyncio
import sys
import time
from models import MonitorConfig
from brand_monitor import run_monitor_cycle
from monitor_database import MonitorDatabase
db = MonitorDatabase()
def build_config() -> MonitorConfig:
"""Define what to monitor. Customise for your brand."""
return MonitorConfig(
# Your brand and product names โ exact match and variations
brand_keywords=[
"ScrapeBadger",
"scrapebadger.com",
"scrapebadger api",
],
# Competitors to track
competitor_keywords=[
"bright data scraping",
"scrapingbee review",
"oxylabs api",
],
# Industry topics โ surface relevant conversations
topic_keywords=[
"web scraping api 2026",
"cloudflare bypass python",
"amazon scraper api",
],
# Communities to poll regularly regardless of keyword match
subreddits_to_watch=[
"webdev",
"datascience",
"Python",
"MachineLearning",
"entrepreneur",
"SaaS",
],
alert_on_negative=True,
alert_on_spike=True,
min_score_threshold=1,
)
if __name__ == "__main__":
command = sys.argv[1] if len(sys.argv) > 1 else "run"
config = build_config()
if command == "run":
# Single cycle
result = asyncio.run(run_monitor_cycle(config))
print(f"\nโ Cycle complete: {result}")
elif command == "schedule":
# Continuous monitoring
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 60
print(f"Brand monitor started โ checking every {interval} minutes")
while True:
asyncio.run(run_monitor_cycle(config))
print(f"Next check in {interval} minutes\n")
time.sleep(interval * 60)
elif command == "report":
# Print 7-day summary report
print("\n=== 7-DAY BRAND INTELLIGENCE REPORT ===\n")
for keyword in config.brand_keywords:
count = db.get_mention_count(keyword, days=7)
sentiment = db.get_sentiment_breakdown(keyword, days=7)
subreddits = db.get_top_subreddits(keyword, days=7, limit=5)
print(f"'{keyword}': {count} total mentions")
pos = sentiment.get("positive", {})
neu = sentiment.get("neutral", {})
neg = sentiment.get("negative", {})
print(f" Sentiment: "
f"{pos.get('pct', 0):.0f}% positive | "
f"{neu.get('pct', 0):.0f}% neutral | "
f"{neg.get('pct', 0):.0f}% negative")
if subreddits:
print(" Top communities:")
for sr, count in subreddits:
print(f" r/{sr}: {count} mentions")
print()Running it:
bash
# Single monitoring cycle
python main_monitor.py run
# Continuous monitoring every 60 minutes
python main_monitor.py schedule 60
# Print 7-day report
python main_monitor.py reportOutput from a live run:
=======================================================
[2026-06-01 09:15] Reddit Brand Monitor running
Searching Reddit for 3 brand keywords...
Polling 6 subreddits...
Analysing 47 new mentions...
'ScrapeBadger': 12 mentions today | โ
9 ๐2 โ1
Top: r/Python (4), r/webdev (3), r/datascience (2)
'scrapebadger.com': 3 mentions today | โ
3 ๐0 โ0
Top: r/webdev (2), r/SaaS (1)
๐ HIGH-ENGAGEMENT MENTION: 'ScrapeBadger'
Score: 128 points
r/Python: "Just switched from Bright Data to ScrapeBadger..."
Sentiment: positive | URL: https://reddit.com/r/Python/...
โ Cycle complete: {'total_new_mentions': 47, 'alerts_fired': 1}Extending the Pipeline
Three additions that significantly increase the intelligence value of what's been built:
Comment-level monitoring on viral posts. The collect_post_comments() function in the collector is wired but not called in the main cycle. Enable it for posts above a score threshold โ a post with 200 upvotes discussing your brand likely has comment threads worth monitoring. The nested comment structure ScrapeBadger returns lets you follow the conversation thread, not just the top-level post.
Combining with Google Trends for spike context. When a volume spike alert fires, the natural question is why. Pairing the spike timestamp with a ScrapeBadger Google Trends query for the same keyword tells you whether the Reddit spike correlates with a broader search interest increase โ confirming whether the event is Reddit-internal or market-wide.
Competitor sentiment comparison. The config already tracks competitor keywords. Adding a weekly side-by-side sentiment comparison โ your brand's positive percentage versus each competitor's โ produces the kind of chart that makes brand health performance concrete and shareable. The data is already being collected in the same database.
Full Reddit API documentation at docs.scrapebadger.com. Free trial at scrapebadger.com/reddit-scraper โ 1,000 credits, no credit card.
Written by
Domas Sakavickas
Domas Sakavickas is the Co-founder of ScrapeBadger, building web scraping infrastructure for developers and data teams. He writes about the web data market, tool comparisons, business use cases for scraping, and what it takes to turn public web data into a competitive advantage.
Ready to get started?
Join thousands of developers using ScrapeBadger for their data needs.
