Implementation Overview
This document outlines the development approach and technical implementation details for the Pebble Orchestrator (Pebble IQ).
Architecture Reference
For a detailed deep-dive into the event-driven system design, data flow diagrams, and scalability strategies, see the High-Level Architecture document.
Technology Stack
Backend & Orchestration
| Layer |
Technology |
Purpose |
| Language |
Python 3.11+ |
Core orchestration logic |
| API Framework |
FastAPI |
REST API for UI & Webhooks |
| Async Engine |
Celery + Redis |
Background jobs & ERP sync |
| Agentic Layer |
LangGraph |
Stateful AI workflows (decision support) |
| Workflow |
Temporal.io |
Long-running business processes |
Frontend & UI
| Layer |
Technology |
Purpose |
| Framework |
React 18+ |
Native CRM UI |
| State Mgmt |
TanStack Query |
Server state sync |
| Kanban |
Plane.so (OSS) |
Visual board (self-hosted) |
| Styling |
TailwindCSS |
Utility-first CSS |
AI/ML Stack
| Component |
Technology |
Purpose |
| Email Classification |
BERT/RoBERTa |
Intent routing (CRM vs ERP vs Tender) |
| LLM Orchestration |
LangGraph + LangChain |
Multi-step AI workflows |
| LLM Runtime |
Azure OpenAI / Ollama (local) |
Grounding & draft generation |
| Entity Extraction |
spaCy + Custom NER |
GST/PAN/CIN/Order ID extraction |
| OCR |
Azure Document Intelligence |
Tender NIT parsing |
| Vector Store |
Qdrant / Chroma |
Semantic search (Phase 4) |
Data Stores
| Store |
Technology |
Purpose |
| Operational DB |
PostgreSQL 15+ |
CRM Master, Emails, Activity Stream |
| Cache/Queue |
Redis 7+ |
Celery broker, session state |
| Vector DB |
Qdrant |
Semantic indexing (Post-MVP) |
| Search |
Meilisearch |
Fast card/lead lookup |
| File Storage |
MinIO (S3-compatible) |
Email attachments & PDFs |
ERP/External Integration
| System |
Integration Method |
Phase |
| Tally Prime |
XML over HTTP (Port 9000) |
Phase 1 |
| Outlook/Gmail |
IMAP + Microsoft Graph API |
MVP |
| Focus RT |
REST/SOAP (Current System) |
Phase 1 |
Project Structure
pebble-orchestrator/
├── backend/
│ ├── core/ # Business logic & domain models
│ ├── agents/ # LangGraph AI agents
│ ├── connectors/ # Tally XML, ERP, Email adapters
│ ├── workers/ # Celery async tasks
│ ├── api/ # FastAPI routes
│ └── tests/ # pytest suites
├── crm-ui/ # React Native CRM
│ ├── src/
│ │ ├── components/ # Reusable UI components
│ │ ├── pages/ # CRM tabs (Company, Contact, Leads...)
│ │ ├── hooks/ # TanStack Query hooks
│ │ └── utils/ # API clients
│ └── tailwind.config.js
├── plane-kanban/ # Plane.so Docker setup
│ └── docker-compose.yml
├── scripts/ # DB migrations, seed data
└── docker-compose.dev.yml # Full-stack local dev
Development Workflow
Git Flow
gitGraph
commit id: "main (production)"
branch develop
commit id: "feature-base"
branch feature/tally-integration
commit id: "XML connector"
commit id: "Stock lookup tool"
checkout develop
merge feature/tally-integration
branch feature/langgraph-agents
commit id: "Draft reply agent"
commit id: "Confidence scoring"
checkout develop
merge feature/langgraph-agents
checkout main
merge develop tag: "v1.0-mvp"
Branching Strategy
| Branch |
Purpose |
Merge To |
main |
Production-ready releases |
- |
develop |
Integration & staging |
main |
feature/* |
New features |
develop |
fix/* |
Bug fixes |
develop |
release/* |
Release prep & QA |
main |
API Design
Core Endpoints (FastAPI)
Email & Kanban
| Method |
Endpoint |
Purpose |
| POST |
/api/v1/emails/ingest |
Manual email upload (Outlook plugin) |
| GET |
/api/v1/cards |
List Kanban cards (filtered by stream) |
| PATCH |
/api/v1/cards/:id |
Update card status/column |
| POST |
/api/v1/cards/:id/enrich |
Trigger AI intent re-classification |
CRM Master
| Method |
Endpoint |
Purpose |
| GET |
/api/v1/companies |
List companies with filters |
| POST |
/api/v1/companies |
Create new company |
| GET |
/api/v1/companies/:id |
Get company detail (8 tabs) |
| PATCH |
/api/v1/companies/:id |
Update company fields |
| POST |
/api/v1/companies/:id/validate |
Trigger CIN/PAN/GST validation |
AI & Decision Support (Phase 1)
| Method |
Endpoint |
Purpose |
| POST |
/api/v1/ai/draft-reply |
Generate Tally-grounded email draft |
| POST |
/api/v1/ai/classify-email |
Manual re-classification |
| GET |
/api/v1/ai/confidence/:card_id |
Get AI confidence score |
// POST /api/v1/ai/draft-reply
// Request
{
"email_id": "eml_abc123",
"context": {
"product": "Zinc Oxide",
"enquiry_type": "price_availability"
}
}
// Response
{
"draft": {
"subject": "Re: Zinc Oxide Pricing",
"body": "Dear Customer,\n\nAs per our latest stock, Zinc Oxide is available at ₹125/kg...",
"grounding": {
"tally_stock": "500kg",
"last_price": "₹120/kg",
"confidence": 0.92
}
},
"requires_review": false
}
Error Handling
{
"error": {
"code": "TALLY_CONNECTION_FAILED",
"message": "Unable to connect to Tally XML server on Port 9000",
"details": {
"attempted_url": "http://localhost:9000",
"retry_after": 30
}
}
}
Database Schema (PostgreSQL)
CRM Master Tables
-- Companies (CRM Core)
CREATE TABLE companies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
cin CHAR(21) UNIQUE,
pan CHAR(10),
gstin VARCHAR(15)[], -- Array for multi-state
legal_entity_id UUID NOT NULL,
group_id UUID, -- For multi-entity linking
risk_score DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Contacts
CREATE TABLE contacts (
id UUID PRIMARY KEY,
company_id UUID REFERENCES companies(id),
name VARCHAR(255) NOT NULL,
email VARCHAR(255),
phone VARCHAR(20),
designation VARCHAR(100),
is_primary BOOLEAN DEFAULT false
);
-- Email Cards (Activity Stream)
CREATE TABLE email_cards (
id UUID PRIMARY KEY,
message_id VARCHAR(255) UNIQUE NOT NULL,
subject TEXT,
from_email VARCHAR(255),
to_emails TEXT[],
body_text TEXT,
body_html TEXT,
received_at TIMESTAMPTZ,
stream VARCHAR(50), -- 'crm', 'erp', 'tender'
ai_intent VARCHAR(100),
ai_confidence DECIMAL(5,2),
company_id UUID REFERENCES companies(id),
kanban_status VARCHAR(50) DEFAULT 'inbox',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Activity Stream (Event Sourcing)
CREATE TABLE activity_stream (
id UUID PRIMARY KEY,
entity_type VARCHAR(50), -- 'email', 'lead', 'order'
entity_id UUID NOT NULL,
event_type VARCHAR(100), -- 'EmailReceived', 'LeadClassified'
payload JSONB,
user_id UUID,
occurred_at TIMESTAMPTZ DEFAULT NOW()
);
-- CRM lookups
CREATE INDEX idx_companies_cin ON companies(cin);
CREATE INDEX idx_companies_gstin ON companies USING GIN(gstin);
CREATE INDEX idx_companies_group ON companies(group_id);
-- Email stream routing
CREATE INDEX idx_emails_stream ON email_cards(stream);
CREATE INDEX idx_emails_status ON email_cards(kanban_status);
CREATE INDEX idx_emails_received ON email_cards(received_at DESC);
-- Activity audit
CREATE INDEX idx_activity_entity ON activity_stream(entity_type, entity_id);
CREATE INDEX idx_activity_occurred ON activity_stream(occurred_at DESC);
LangGraph Agent Implementation
Decision Support Agent (Phase 1)
# agents/decision_support.py
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
class ReplyDraftState(TypedDict):
email_id: str
product: str
customer_name: str
tally_stock: Optional[int]
tally_price: Optional[float]
draft: Optional[str]
confidence: float
def fetch_tally_data(state: ReplyDraftState):
"""Tool node - calls Tally XML API"""
stock = tally_client.get_stock(state["product"])
price = tally_client.get_price(state["product"])
return {
**state,
"tally_stock": stock,
"tally_price": price
}
def generate_draft(state: ReplyDraftState):
"""LLM node - generates grounded reply"""
llm = ChatOpenAI(model="gpt-4")
prompt = f"""
Generate a professional email reply:
Product: {state['product']}
Stock: {state['tally_stock']} kg
Price: ₹{state['tally_price']}/kg
"""
draft = llm.invoke(prompt)
return {**state, "draft": draft.content, "confidence": 0.9}
# Build graph
workflow = StateGraph(ReplyDraftState)
workflow.add_node("fetch_tally", fetch_tally_data)
workflow.add_node("generate", generate_draft)
workflow.add_edge("fetch_tally", "generate")
workflow.set_entry_point("fetch_tally")
agent = workflow.compile()
Worker Implementation (Celery)
Email Classification Worker
# workers/email_classifier.py
from celery import shared_task
from transformers import pipeline
classifier = pipeline("text-classification", model="bert-base-uncased")
@shared_task(bind=True, max_retries=3)
def classify_email(self, email_id: str):
"""Async AI classification"""
try:
email = db.get_email(email_id)
# Run AI classification
result = classifier(email.subject + " " + email.body_text)
# Map to stream
stream = map_intent_to_stream(result[0]["label"])
confidence = result[0]["score"]
# Update DB
db.update_email(
email_id,
stream=stream,
ai_confidence=confidence,
ai_intent=result[0]["label"]
)
# Log to activity stream
log_activity(
entity_type="email",
entity_id=email_id,
event_type="EmailClassified",
payload={"stream": stream, "confidence": confidence}
)
except Exception as e:
self.retry(exc=e, countdown=60)
Tally Sync Worker (Phase 1)
# workers/tally_sync.py
import xmltodict
import requests
@shared_task
def sync_tally_stock():
"""Periodic stock sync from Tally"""
xml_payload = """
<ENVELOPE>
<HEADER><VERSION>1</VERSION></HEADER>
<BODY>
<DESC><STATICVARIABLES><SVEXPORTFORMAT>XML</SVEXPORTFORMAT></STATICVARIABLES></DESC>
<DATA><COLLECTION name="Stock Items"><FETCH>Name, ClosingBalance</FETCH></COLLECTION></DATA>
</BODY>
</ENVELOPE>
"""
response = requests.post(
"http://localhost:9000",
data=xml_payload,
headers={"Content-Type": "application/xml"}
)
data = xmltodict.parse(response.text)
# Upsert to local cache for fast AI lookups
for item in data["ENVELOPE"]["BODY"]["DATA"]["COLLECTION"]["ITEM"]:
redis_client.hset(
"tally:stock",
item["NAME"],
item["CLOSINGBALANCE"]
)
Testing Strategy
Test Pyramid
| Level |
Coverage Target |
Tools |
| Unit |
85%+ |
pytest, pytest-cov |
| Integration |
70%+ |
pytest + testcontainers |
| E2E |
Critical user flows |
Playwright |
| LangGraph Agents |
90%+ (node logic) |
LangGraph testing utils |
Example Tests
# tests/test_tally_integration.py
import pytest
from unittest.mock import patch
@patch('workers.tally_sync.requests.post')
def test_tally_stock_lookup_success(mock_post):
"""Test Tally XML stock lookup"""
mock_post.return_value.text = """
<ENVELOPE>
<BODY>
<DATA>
<COLLECTION>
<ITEM><NAME>Zinc Oxide</NAME><CLOSINGBALANCE>500</CLOSINGBALANCE></ITEM>
</COLLECTION>
</DATA>
</BODY>
</ENVELOPE>
"""
from workers.tally_sync import sync_tally_stock
sync_tally_stock.apply()
# Verify Redis cache
stock = redis_client.hget("tally:stock", "Zinc Oxide")
assert stock == "500"
# tests/test_langgraph_agents.py
def test_decision_support_agent():
"""Test LangGraph draft generation"""
from agents.decision_support import agent
result = agent.invoke({
"email_id": "test_123",
"product": "Zinc Oxide",
"customer_name": "ABC Corp"
})
assert result["draft"] is not None
assert result["confidence"] > 0.7
assert "Zinc Oxide" in result["draft"]
Deployment
Docker Compose (Full Stack Dev)
version: '3.8'
services:
api:
build: ./backend
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:pass@postgres/pebble
- REDIS_URL=redis://redis:6379
- TALLY_XML_URL=http://host.docker.internal:9000
depends_on:
- postgres
- redis
worker:
build: ./backend
command: celery -A workers worker -l INFO -Q email,tally,ai
depends_on:
- redis
- postgres
crm-ui:
build: ./crm-ui
ports:
- "3000:3000"
environment:
- VITE_API_URL=http://localhost:8000
plane-web:
image: makeplane/plane-frontend:latest
ports:
- "3001:3000"
depends_on:
- plane-api
plane-api:
image: makeplane/plane-backend:latest
environment:
- DATABASE_URL=postgresql://postgres:pass@postgres/plane
- SECRET_KEY=${PLANE_SECRET}
postgres:
image: postgres:15
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DB: pebble
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7-alpine
meilisearch:
image: getmeili/meilisearch:latest
ports:
- "7700:7700"
volumes:
pgdata:
CI/CD Pipeline (GitHub Actions)
# .github/workflows/deploy.yml
name: CI/CD
on:
push:
branches: [main, develop]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run tests
run: |
cd backend
pip install -r requirements-dev.txt
pytest --cov=core --cov=agents
build:
needs: test
runs-on: ubuntu-latest
steps:
- name: Build Docker images
run: |
docker build -t pebble-api:${{ github.sha }} ./backend
docker build -t pebble-crm:${{ github.sha }} ./crm-ui
deploy-staging:
needs: build
if: github.ref == 'refs/heads/develop'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy-staging.sh
Environment Configuration
Required Variables
# .env.example
# === Database ===
DATABASE_URL=postgresql://user:password@localhost:5432/pebble
REDIS_URL=redis://localhost:6379/0
# === Email Ingestion ===
MS_CLIENT_ID=your_azure_app_id
MS_CLIENT_SECRET=your_azure_secret
MS_TENANT_ID=your_tenant_id
# === Tally/ERP (Phase 1) ===
TALLY_XML_URL=http://localhost:9000
TALLY_COMPANY=your_company_name
CURRENT_ERP_PROVIDER=focus_rt
# === AI/LLM ===
LLM_PROVIDER=azure # or 'ollama' for local
AZURE_OPENAI_KEY=your_openai_key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
OLLAMA_BASE_URL=http://localhost:11434
# === Authentication ===
JWT_SECRET_KEY=your_secret_key_here
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
# === Feature Flags ===
ENABLE_TALLY_SYNC=true
ENABLE_AI_DRAFTS=false # Phase 1
ENABLE_LANGGRAPH_AGENTS=false # Phase 1
Key Implementation Milestones
MVP (POC) - Production Ready (14-16 Weeks)
Phase 1 (Tally Integration) - 6-8 Weeks
Phase 4 (Advanced Intelligence)
← Back to Use Cases | View Architecture