Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
llm_router,
reports_router,
alerts_router,
sentiment_router,
)
from api.routers.monitoring import record_latency
from api.routers.ws import poll_and_broadcast_transactions
Expand Down Expand Up @@ -170,6 +171,7 @@ async def _latency_middleware(request: Request, call_next):
app.include_router(llm_router)
app.include_router(reports_router)
app.include_router(alerts_router)
app.include_router(sentiment_router)


@app.get("/health", tags=["ops"])
Expand Down
2 changes: 2 additions & 0 deletions api/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from api.routers.llm import router as llm_router
from api.routers.reports import router as reports_router
from api.routers.alerts import router as alerts_router
from api.routers.sentiment import router as sentiment_router

__all__ = [
"accounts_router",
Expand Down Expand Up @@ -51,4 +52,5 @@
"llm_router",
"reports_router",
"alerts_router",
"sentiment_router",
]
19 changes: 19 additions & 0 deletions api/routers/loyalty.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from api.schemas import (
BenefitOut,
LoyaltyRecommendationRequest,
LoyaltyRecommendationResponse,
LoyaltySummaryFull,
LoyaltyTierOut,
NextTierInfo,
Expand All @@ -28,8 +30,10 @@
RedeemResponse,
ReferralOut,
)
from api.services.loyalty_recommendations import LoyaltyRecommendationService

router = APIRouter(prefix="/api/v1/loyalty", tags=["loyalty"])
recommendation_service = LoyaltyRecommendationService()

# ─── Static tier definitions ─────────────────────────────────────────────────

Expand Down Expand Up @@ -268,6 +272,21 @@ def get_referral(account_id: str, db: Optional[Session] = Depends(_get_db)):
return ReferralOut(url=f"{base_url}?code={code}", invited=invited, rewards=rewards)


@router.post("/recommendations", response_model=LoyaltyRecommendationResponse)
def generate_recommendations(payload: LoyaltyRecommendationRequest):
"""Generate fast loyalty recommendations using transaction behavior and balance signals."""
start = datetime.now(timezone.utc)
balance = int(payload.balance or 0)
result = recommendation_service.generate(
account_id=payload.account_id,
balance=balance,
transactions=payload.transactions,
)
elapsed_ms = (datetime.now(timezone.utc) - start).total_seconds() * 1000
result["response_time_ms"] = round(elapsed_ms, 2)
return result


# ─── Helper ───────────────────────────────────────────────────────────────────

from contextlib import contextmanager # noqa: E402
Expand Down
31 changes: 31 additions & 0 deletions api/routers/sentiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

from fastapi import APIRouter

from api.schemas import (
SentimentAnalysisResponse,
SentimentIngestRequest,
SentimentSeriesResponse,
)
from api.services.sentiment_pipeline import SentimentAnalysisPipeline

router = APIRouter(prefix="/api/v1/sentiment", tags=["sentiment"])
pipeline = SentimentAnalysisPipeline()


@router.post("/ingest", response_model=dict)
def ingest_sentiment(payload: SentimentIngestRequest) -> dict:
"""Ingest social/news items for an asset and update the rolling sentiment state."""
return pipeline.ingest(payload.asset, payload.items)


@router.get("/{asset}/analysis", response_model=SentimentAnalysisResponse)
def analyze_sentiment(asset: str) -> SentimentAnalysisResponse:
"""Return the current aggregate sentiment for an asset."""
return pipeline.analyze(asset)


@router.get("/{asset}/timeseries", response_model=SentimentSeriesResponse)
def sentiment_timeseries(asset: str) -> SentimentSeriesResponse:
"""Return the recent time-series sentiment points used for visualization."""
return pipeline.get_visualization(asset)
45 changes: 45 additions & 0 deletions api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,51 @@ class ReferralOut(BaseModel):
rewards: int


class LoyaltyRecommendationRequest(BaseModel):
account_id: str
balance: int = 0
transactions: List[Dict[str, Any]] = Field(default_factory=list)


class LoyaltyRecommendationItem(BaseModel):
id: str
title: str
description: str
offer_type: str
points_cost: int
expected_acceptance_rate: float
reason: str
eligible: bool


class LoyaltyRecommendationResponse(BaseModel):
account_id: str
balance: int
analysis_summary: str
recommendations: List[LoyaltyRecommendationItem]
generated_at: str
response_time_ms: float


class SentimentIngestRequest(BaseModel):
asset: str
items: List[Dict[str, Any]] = Field(default_factory=list)


class SentimentAnalysisResponse(BaseModel):
asset: str
sentiment_score: float
label: str
points: int


class SentimentSeriesResponse(BaseModel):
asset: str
points: List[Dict[str, Any]]
trend: str
average_score: float


# ─── Mentorship ────────────────────────────────────────────────────────────

class MentorProfileIn(BaseModel):
Expand Down
120 changes: 120 additions & 0 deletions api/services/loyalty_recommendations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any


class LoyaltyRecommendationService:
"""Generate fast, personalized loyalty offers using lightweight heuristics."""

def __init__(self) -> None:
self._tier_thresholds = {
"bronze": 0,
"silver": 1500,
"gold": 3000,
"platinum": 6000,
}

def generate(self, account_id: str, balance: int = 0, transactions: list[dict[str, Any]] | None = None) -> dict[str, Any]:
txs = list(transactions or [])
if txs:
total_volume = sum(float(item.get("amount", 0)) for item in txs)
average_ticket = total_volume / len(txs)
frequency = len(txs)
else:
total_volume = 0.0
average_ticket = 0.0
frequency = 0

tier = self._tier_for(balance)
recommendations: list[dict[str, Any]] = []

if balance < 1500:
recommendations.append(
{
"id": "balance-boost",
"title": "Balance boost offer",
"description": f"Earn 250 bonus points by adding {max(100, 1500 - balance)} more points to your balance.",
"offer_type": "balance_based",
"points_cost": 0,
"expected_acceptance_rate": 0.48,
"reason": "Your balance is still below the silver tier threshold.",
"eligible": True,
}
)

if average_ticket >= 250 or frequency >= 3:
recommendations.append(
{
"id": "high-value-reward",
"title": "High-value reward",
"description": "Unlock a premium reward on your next high-value transaction.",
"offer_type": "personalized",
"points_cost": 500,
"expected_acceptance_rate": 0.44,
"reason": "Recent activity suggests you respond well to larger purchases.",
"eligible": True,
}
)
else:
recommendations.append(
{
"id": "small-step-reward",
"title": "Small-step reward",
"description": "Earn a quick bonus on your next purchase to keep momentum going.",
"offer_type": "personalized",
"points_cost": 150,
"expected_acceptance_rate": 0.41,
"reason": "You have a steady but lower-volume pattern.",
"eligible": True,
}
)

if tier in {"silver", "gold", "platinum"}:
recommendations.append(
{
"id": "tier-upgrade",
"title": "Tier upgrade bonus",
"description": "Keep your streak alive with a bonus that accelerates your next tier upgrade.",
"offer_type": "balance_based",
"points_cost": 0,
"expected_acceptance_rate": 0.43,
"reason": "You are already close to the next tier and benefit from milestone offers.",
"eligible": True,
}
)

recommendations.append(
{
"id": "cashback-surprise",
"title": "Cashback surprise",
"description": "Claim a surprise cashback-style reward with your next eligible transaction.",
"offer_type": "seasonal",
"points_cost": 0,
"expected_acceptance_rate": 0.42,
"reason": "A lightweight, broad offer is ideal for users with mixed behavior.",
"eligible": True,
}
)

analysis_summary = (
f"Account {account_id} shows {frequency} recent transactions with an average ticket of "
f"${average_ticket:.2f}; current balance places them in {tier} tier."
)

return {
"account_id": account_id,
"balance": balance,
"analysis_summary": analysis_summary,
"recommendations": recommendations[:4],
"generated_at": datetime.now(timezone.utc).isoformat(),
}

def _tier_for(self, balance: int) -> str:
if balance >= self._tier_thresholds["platinum"]:
return "platinum"
if balance >= self._tier_thresholds["gold"]:
return "gold"
if balance >= self._tier_thresholds["silver"]:
return "silver"
return "bronze"
87 changes: 87 additions & 0 deletions api/services/sentiment_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import math
import time
from datetime import datetime, timedelta, timezone
from typing import Any


class SentimentAnalysisPipeline:
"""A lightweight sentiment pipeline with time-series tracking for Stellar assets."""

def __init__(self) -> None:
self._state: dict[str, list[dict[str, Any]]] = {}

def ingest(self, asset: str, items: list[dict[str, Any]]) -> dict[str, Any]:
scores = []
for item in items:
text = str(item.get("text", "")).strip()
if not text:
continue
score = self._score_text(text)
scores.append({"timestamp": item.get("timestamp") or datetime.now(timezone.utc).isoformat(), "score": score})

if asset not in self._state:
self._state[asset] = []
self._state[asset].extend(scores)
self._state[asset] = self._state[asset][-200:]
return {"asset": asset, "ingested": len(scores), "latest_score": self._latest_score(asset)}

def analyze(self, asset: str) -> dict[str, Any]:
points = self._state.get(asset, [])
if not points:
return {"asset": asset, "sentiment_score": 0.0, "label": "neutral", "points": 0}

avg = sum(item["score"] for item in points) / len(points)
label = "positive" if avg > 0.2 else "negative" if avg < -0.2 else "neutral"
return {"asset": asset, "sentiment_score": round(avg, 4), "label": label, "points": len(points)}

def get_timeseries(self, asset: str, hours: int = 24) -> list[dict[str, Any]]:
points = self._state.get(asset, [])
if not points:
return []

cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
series = []
for item in points:
ts = item.get("timestamp")
if not ts:
continue
try:
parsed = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
except ValueError:
continue
if parsed >= cutoff:
series.append({"timestamp": parsed.isoformat(), "score": item["score"]})
return series

def get_visualization(self, asset: str) -> dict[str, Any]:
series = self.get_timeseries(asset)
if not series:
return {"asset": asset, "points": [], "trend": "neutral"}
scores = [item["score"] for item in series]
avg = sum(scores) / len(scores)
trend = "positive" if avg > 0.1 else "negative" if avg < -0.1 else "neutral"
return {"asset": asset, "points": series[-24:], "trend": trend, "average_score": round(avg, 4)}

def _latest_score(self, asset: str) -> float:
if not self._state.get(asset):
return 0.0
return self._state[asset][-1]["score"]

def _score_text(self, text: str) -> float:
lowered = text.lower()
positive_words = ["bullish", "up", "surge", "breakout", "buy", "optimistic", "strong", "adoption", "good"]
negative_words = ["bearish", "down", "drop", "crash", "sell", "pessimistic", "weak", "risk", "bad"]
score = 0.0
for word in positive_words:
if word in lowered:
score += 0.25
for word in negative_words:
if word in lowered:
score -= 0.25
if any(token in lowered for token in ["!", "moon", "wow"]):
score += 0.1
if any(token in lowered for token in ["?", "uncertain", "unclear"]):
score -= 0.1
return max(-1.0, min(1.0, round(score, 3)))
Loading