Enterprise Sentiment Analysis System
BERT + Spark NLP | Scalable Real-Time Analysis
Project Overview
Problem Statement
Enterprises receive millions of customer interactions daily across channels. Manual sentiment analysis cannot scale. This system uses fine-tuned BERT models with Spark NLP for distributed processing to analyze sentiment across emails, reviews, social media, and support tickets in real-time.
Objectives
- Fine-tune BERT for multi-class sentiment (positive, negative, neutral, mixed)
- Build scalable processing pipeline with Spark NLP
- Implement aspect-based sentiment analysis
- Deploy with real-time and batch inference modes
- Create dashboards for sentiment trend monitoring
| Component | Technology |
|---|---|
| Sentiment Model | BERT / RoBERTa (fine-tuned) |
| Scalable Processing | Spark NLP |
| API Framework | FastAPI + Triton Inference Server |
| Stream Processing | Apache Kafka + Flink |
| Data Storage | Elasticsearch + PostgreSQL |
| Dashboard | Grafana + Kibana |
| Monitoring | Prometheus + ELK Stack |
Architecture Diagram
+-------------------------------------------------------------------+
| Enterprise Sentiment Analysis Architecture |
+-------------------------------------------------------------------+
| +--------------+ +--------------+ +------------------+ |
| | Data Sources |--->| Kafka |--->| Spark NLP | |
| | (Multi-chan) | | Stream | | Pre-processing | |
| +--------------+ +--------------+ +--------+---------+ |
| | |
| v |
| +--------------+ +--------------+ +------------------+ |
| | Sentiment |<---| Triton |<---| BERT Model | |
| | Results | | Inference | | (Fine-tuned) | |
+--------------+ +--------------+ +------------------+ |
| | | |
| v v |
| +--------------+ +--------------+ +------------------+ |
| | Real-time | | Aspect | | Trend | |
| | Dashboard | | Analysis | | Monitoring | |
| +--------------+ +--------------+ +------------------+ |
+-------------------------------------------------------------------+
Step-by-Step Implementation
Step 1: Environment Setup
mkdir sentiment-system && cd sentiment-system
pip install transformers datasets torch spark-nlp
pip install fastapi uvicorn kafka-python elasticsearch
pip install scikit-learn mlflow prometheus-client
pip install pyspark
Step 2: BERT Fine-Tuning for Sentiment
Fine-tune BERT for multi-class sentiment classification on enterprise data.
# src/models/train_sentiment.py
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
BertTokenizer, BertForSequenceClassification,
AdamW, get_linear_schedule_with_warmup
)
from sklearn.metrics import classification_report, confusion_matrix
from typing import List, Dict, Tuple
import mlflow
import numpy as np
class SentimentDataset(Dataset):
LABEL_MAP = {"negative": 0, "neutral": 1, "positive": 2, "mixed": 3}
def __init__(self, texts: List[str], labels: List[str], tokenizer, max_len: int = 128):
self.texts = texts
self.labels = [self.LABEL_MAP[l] for l in labels]
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
encoding = self.tokenizer(
self.texts[idx],
max_length=self.max_len,
padding="max_length",
truncation=True,
return_tensors="pt",
)
return {
"input_ids": encoding["input_ids"].squeeze(),
"attention_mask": encoding["attention_mask"].squeeze(),
"labels": torch.tensor(self.labels[idx], dtype=torch.long),
}
class SentimentTrainer:
def __init__(self, model_name: str = "bert-base-uncased", num_labels: int = 4):
self.tokenizer = BertTokenizer.from_pretrained(model_name)
self.model = BertForSequenceClassification.from_pretrained(
model_name, num_labels=num_labels
)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(self.device)
def train(
self, train_texts, train_labels, eval_texts, eval_labels,
epochs: int = 5, batch_size: int = 32, learning_rate: float = 2e-5
):
train_dataset = SentimentDataset(train_texts, train_labels, self.tokenizer)
eval_dataset = SentimentDataset(eval_texts, eval_labels, self.tokenizer)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=batch_size)
optimizer = AdamW(self.model.parameters(), lr=learning_rate)
total_steps = len(train_loader) * epochs
scheduler = get_linear_schedule_with_warmup(
optimizer, num_warmup_steps=total_steps // 10, num_training_steps=total_steps
)
mlflow.set_experiment("sentiment-analysis")
with mlflow.start_run():
mlflow.log_params({"epochs": epochs, "lr": learning_rate, "batch_size": batch_size})
for epoch in range(epochs):
self.model.train()
total_loss = 0
for batch in train_loader:
optimizer.zero_grad()
outputs = self.model(
input_ids=batch["input_ids"].to(self.device),
attention_mask=batch["attention_mask"].to(self.device),
labels=batch["labels"].to(self.device),
)
loss = outputs.loss
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
scheduler.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
eval_metrics = self.evaluate(eval_loader)
mlflow.log_metrics({
"train_loss": avg_loss,
"eval_accuracy": eval_metrics["accuracy"],
}, step=epoch)
print(f"Epoch {epoch+1}: loss={avg_loss:.4f}, acc={eval_metrics['accuracy"]:.4f}")
mlflow.pytorch.log_model(self.model, "sentiment-model")
def evaluate(self, eval_loader) -> Dict:
self.model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
for batch in eval_loader:
outputs = self.model(
input_ids=batch["input_ids"].to(self.device),
attention_mask=batch["attention_mask"].to(self.device),
)
preds = torch.argmax(outputs.logits, dim=-1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch["labels"].numpy())
accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
return {"accuracy": accuracy, "report": classification_report(all_labels, all_preds)}
Step 3: Spark NLP Preprocessing Pipeline
Build scalable text preprocessing with Spark NLP for batch analysis of millions of documents.
# src/processing/spark_pipeline.py
import sparknlp
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import (
Tokenizer, Normalizer, StopWordsCleaner, LemmatizerModel,
SentenceDetectorDLModel
)
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
class SparkSentimentPipeline:
def __init__(self):
self.spark = sparknlp.start()
self.pipeline = self._build_pipeline()
def _build_pipeline(self):
document_assembler = DocumentAssembler() \
.setInputCol("text") \
.setOutputCol("document")
sentence_detector = SentenceDetectorDLModel.pretrained() \
.setInputCols(["document"]) \
.setOutputCol("sentences")
tokenizer = Tokenizer() \
.setInputCols(["sentences"]) \
.setOutputCol("tokens")
normalizer = Normalizer() \
.setInputCols(["tokens"]) \
.setOutputCol("normalized") \
.setLowercase(True)
stopwords_cleaner = StopWordsCleaner() \
.setInputCols(["normalized"]) \
.setOutputCol("clean_tokens")
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
.setInputCols(["clean_tokens"]) \
.setOutputCol("lemmatized")
finisher = Finisher() \
.setInputCols(["lemmatized"]) \
.setOutputCols(["processed_text"]) \
.setCleanAnnotations(True)
return Pipeline(stages=[
document_assembler, sentence_detector, tokenizer,
normalizer, stopwords_cleaner, lemmatizer, finisher
])
def process_batch(self, df):
model = self.pipeline.fit(df)
return model.transform(df)
def analyze_large_dataset(self, input_path: str, output_path: str, chunk_size: int = 100000):
df = self.spark.read.parquet(input_path)
total = df.count()
print(f"Processing {total} documents...")
processed = self.process_batch(df)
processed.write.mode("overwrite").parquet(output_path)
print(f"Results saved to {output_path}")
Step 4: Aspect-Based Sentiment Analysis
Extract sentiment toward specific aspects (e.g., "battery life", "customer service").
# src/models/aspect_sentiment.py
from transformers import pipeline as hf_pipeline
from typing import List, Dict, Tuple
import re
class AspectSentimentAnalyzer:
ASPECT_KEYWORDS = {
"product_quality": ["quality", "build", "material", "durable", "cheap"],
"customer_service": ["support", "service", "help", "response", "staff"],
"price": ["price", "cost", "expensive", "cheap", "value", "worth"],
"delivery": ["shipping", "delivery", "arrived", "package", "fast"],
"usability": ["easy", "difficult", "intuitive", "confusing", "user-friendly"],
}
def __init__(self, sentiment_model: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"):
self.sentiment_pipeline = hf_pipeline("sentiment-analysis", model=sentiment_model)
def extract_aspects(self, text: str) -> List[str]:
text_lower = text.lower()
found_aspects = []
for aspect, keywords in self.ASPECT_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
found_aspects.append(aspect)
return found_aspects if found_aspects else ["general"]
def analyze(self, text: str) -> Dict:
aspects = self.extract_aspects(text)
sentences = re.split(r"[.!?]+", text)
aspect_results = {}
for aspect in aspects:
relevant_sentences = [
s.strip() for s in sentences
if any(kw in s.lower() for kw in self.ASPECT_KEYWORDS.get(aspect, []))
]
if relevant_sentences:
combined = " ".join(relevant_sentences)
result = self.sentiment_pipeline(combined[:512])[0]
aspect_results[aspect] = {
"sentiment": result["label"],
"confidence": result["score"],
"text_snippet": combined[:200],
}
else:
overall = self.sentiment_pipeline(text[:512])[0]
aspect_results[aspect] = {
"sentiment": overall["label"],
"confidence": overall["score"] * 0.7,
"text_snippet": text[:200],
}
return {"aspects": aspect_results, "overall": self.sentiment_pipeline(text[:512])[0]}
Step 5: Real-Time Inference API
# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from contextlib import asynccontextmanager
import torch
from transformers import BertTokenizer, BertForSequenceClassification
import time
LABEL_MAP = {0: "negative", 1: "neutral", 2: "positive", 3: "mixed"}
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.tokenizer = BertTokenizer.from_pretrained("./sentiment_model")
app.state.model = BertForSequenceClassification.from_pretrained("./sentiment_model")
app.state.model.eval()
app.state.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
app.state.model.to(app.state.device)
yield
app = FastAPI(title="Sentiment API", lifespan=lifespan)
class SentimentRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000)
return_aspects: bool = False
class SentimentResponse(BaseModel):
sentiment: str
confidence: float
all_scores: Dict[str, float]
latency_ms: float
aspects: Optional[Dict] = None
@app.post("/api/v1/sentiment", response_model=SentimentResponse)
async def analyze_sentiment(request: SentimentRequest):
start = time.time()
tokenizer = app.state.tokenizer
model = app.state.model
encoding = tokenizer(
request.text, max_length=128, padding="max_length",
truncation=True, return_tensors="pt"
)
with torch.no_grad():
outputs = model(
input_ids=encoding["input_ids"].to(app.state.device),
attention_mask=encoding["attention_mask"].to(app.state.device),
)
probs = torch.softmax(outputs.logits, dim=-1)[0].cpu().numpy()
pred_idx = int(np.argmax(probs))
latency = (time.time() - start) * 1000
return SentimentResponse(
sentiment=LABEL_MAP[pred_idx],
confidence=float(probs[pred_idx]),
all_scores={LABEL_MAP[i]: float(p) for i, p in enumerate(probs)},
latency_ms=latency,
)
@app.post("/api/v1/sentiment/batch")
async def batch_sentiment(texts: List[str]):
results = []
for text in texts[:100]:
result = await analyze_sentiment(SentimentRequest(text=text))
results.append(result.dict())
return {"results": results, "total": len(results)}
βΉοΈ
Use domain-specific fine-tuning data. Generic sentiment models perform poorly on specialized domains like healthcare or finance. Collect and annotate domain-specific training data.
π‘
Implement a fallback chain: if the fine-tuned model has low confidence, fall back to a general-purpose sentiment model. This improves coverage for edge cases.
Performance Metrics
| Metric | Target | Description |
|---|---|---|
| Accuracy | > 92% | 4-class sentiment |
| F1 Score | > 0.90 | Weighted F1 |
| Inference Latency | < 30ms | Single text (GPU) |
| Batch Throughput | > 1000 texts/s | Spark NLP |
| Aspect Precision | > 0.85 | Aspect extraction |
Interview Talking Points
- BERT Fine-Tuning: Adding a classification head to BERT and fine-tuning on domain data typically achieves 5-10% improvement over zero-shot.
- Aspect-Based Analysis: Decomposing sentiment by aspect provides actionable insights that overall sentiment misses.
- Spark NLP: Enables processing millions of documents with GPU-accelerated NLP pipelines at scale.
- Model Selection: RoBERTa often outperforms BERT for sentiment. Consider domain-specific models like FinBERT for finance.
- Multi-Label: Some texts express mixed sentiments. Multi-label classification captures this nuance.
- Deployment: ONNX export + Triton Inference Server provides optimal serving throughput.
β οΈ
Sentiment models can encode biases from training data. Regularly audit model predictions across demographic groups and retrain with balanced datasets.
βΉοΈ
For production deployment, consider using DistilBERT for 2x faster inference with minimal accuracy loss (typically 1-2% drop).