Skip to content

Implementation Overview

This document outlines the development approach and technical implementation details for the Pebble Orchestrator (Pebble IQ).

Architecture Reference

For a detailed deep-dive into the event-driven system design, data flow diagrams, and scalability strategies, see the High-Level Architecture document.


Technology Stack

Backend & Orchestration

Layer Technology Purpose
Language Python 3.11+ Core orchestration logic
API Framework FastAPI REST API for UI & Webhooks
Async Engine Celery + Redis Background jobs & ERP sync
Agentic Layer LangGraph Stateful AI workflows (decision support)
Workflow Temporal.io Long-running business processes

Frontend & UI

Layer Technology Purpose
Framework React 18+ Native CRM UI
State Mgmt TanStack Query Server state sync
Kanban Plane.so (OSS) Visual board (self-hosted)
Styling TailwindCSS Utility-first CSS

AI/ML Stack

Component Technology Purpose
Email Classification BERT/RoBERTa Intent routing (CRM vs ERP vs Tender)
LLM Orchestration LangGraph + LangChain Multi-step AI workflows
LLM Runtime Azure OpenAI / Ollama (local) Grounding & draft generation
Entity Extraction spaCy + Custom NER GST/PAN/CIN/Order ID extraction
OCR Azure Document Intelligence Tender NIT parsing
Vector Store Qdrant / Chroma Semantic search (Phase 4)

Data Stores

Store Technology Purpose
Operational DB PostgreSQL 15+ CRM Master, Emails, Activity Stream
Cache/Queue Redis 7+ Celery broker, session state
Vector DB Qdrant Semantic indexing (Post-MVP)
Search Meilisearch Fast card/lead lookup
File Storage MinIO (S3-compatible) Email attachments & PDFs

ERP/External Integration

System Integration Method Phase
Tally Prime XML over HTTP (Port 9000) Phase 1
Outlook/Gmail IMAP + Microsoft Graph API MVP
Focus RT REST/SOAP (Current System) Phase 1

Project Structure

pebble-orchestrator/
├── backend/
│   ├── core/                   # Business logic & domain models
│   ├── agents/                 # LangGraph AI agents
│   ├── connectors/             # Tally XML, ERP, Email adapters
│   ├── workers/                # Celery async tasks
│   ├── api/                    # FastAPI routes
│   └── tests/                  # pytest suites
├── crm-ui/                     # React Native CRM
│   ├── src/
│   │   ├── components/         # Reusable UI components
│   │   ├── pages/              # CRM tabs (Company, Contact, Leads...)
│   │   ├── hooks/              # TanStack Query hooks
│   │   └── utils/              # API clients
│   └── tailwind.config.js
├── plane-kanban/               # Plane.so Docker setup
│   └── docker-compose.yml
├── scripts/                    # DB migrations, seed data
└── docker-compose.dev.yml      # Full-stack local dev

Development Workflow

Git Flow

gitGraph
    commit id: "main (production)"
    branch develop
    commit id: "feature-base"
    branch feature/tally-integration
    commit id: "XML connector"
    commit id: "Stock lookup tool"
    checkout develop
    merge feature/tally-integration
    branch feature/langgraph-agents
    commit id: "Draft reply agent"
    commit id: "Confidence scoring"
    checkout develop
    merge feature/langgraph-agents
    checkout main
    merge develop tag: "v1.0-mvp"

Branching Strategy

Branch Purpose Merge To
main Production-ready releases -
develop Integration & staging main
feature/* New features develop
fix/* Bug fixes develop
release/* Release prep & QA main

API Design

Core Endpoints (FastAPI)

Email & Kanban

Method Endpoint Purpose
POST /api/v1/emails/ingest Manual email upload (Outlook plugin)
GET /api/v1/cards List Kanban cards (filtered by stream)
PATCH /api/v1/cards/:id Update card status/column
POST /api/v1/cards/:id/enrich Trigger AI intent re-classification

CRM Master

Method Endpoint Purpose
GET /api/v1/companies List companies with filters
POST /api/v1/companies Create new company
GET /api/v1/companies/:id Get company detail (8 tabs)
PATCH /api/v1/companies/:id Update company fields
POST /api/v1/companies/:id/validate Trigger CIN/PAN/GST validation

AI & Decision Support (Phase 1)

Method Endpoint Purpose
POST /api/v1/ai/draft-reply Generate Tally-grounded email draft
POST /api/v1/ai/classify-email Manual re-classification
GET /api/v1/ai/confidence/:card_id Get AI confidence score

Request/Response Format

// POST /api/v1/ai/draft-reply
// Request
{
  "email_id": "eml_abc123",
  "context": {
    "product": "Zinc Oxide",
    "enquiry_type": "price_availability"
  }
}

// Response
{
  "draft": {
    "subject": "Re: Zinc Oxide Pricing",
    "body": "Dear Customer,\n\nAs per our latest stock, Zinc Oxide is available at ₹125/kg...",
    "grounding": {
      "tally_stock": "500kg",
      "last_price": "₹120/kg",
      "confidence": 0.92
    }
  },
  "requires_review": false
}

Error Handling

{
  "error": {
    "code": "TALLY_CONNECTION_FAILED",
    "message": "Unable to connect to Tally XML server on Port 9000",
    "details": {
      "attempted_url": "http://localhost:9000",
      "retry_after": 30
    }
  }
}

Database Schema (PostgreSQL)

CRM Master Tables

-- Companies (CRM Core)
CREATE TABLE companies (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    cin CHAR(21) UNIQUE,
    pan CHAR(10),
    gstin VARCHAR(15)[],  -- Array for multi-state
    legal_entity_id UUID NOT NULL,
    group_id UUID,  -- For multi-entity linking
    risk_score DECIMAL(5,2),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Contacts
CREATE TABLE contacts (
    id UUID PRIMARY KEY,
    company_id UUID REFERENCES companies(id),
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255),
    phone VARCHAR(20),
    designation VARCHAR(100),
    is_primary BOOLEAN DEFAULT false
);

-- Email Cards (Activity Stream)
CREATE TABLE email_cards (
    id UUID PRIMARY KEY,
    message_id VARCHAR(255) UNIQUE NOT NULL,
    subject TEXT,
    from_email VARCHAR(255),
    to_emails TEXT[],
    body_text TEXT,
    body_html TEXT,
    received_at TIMESTAMPTZ,
    stream VARCHAR(50),  -- 'crm', 'erp', 'tender'
    ai_intent VARCHAR(100),
    ai_confidence DECIMAL(5,2),
    company_id UUID REFERENCES companies(id),
    kanban_status VARCHAR(50) DEFAULT 'inbox',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Activity Stream (Event Sourcing)
CREATE TABLE activity_stream (
    id UUID PRIMARY KEY,
    entity_type VARCHAR(50),  -- 'email', 'lead', 'order'
    entity_id UUID NOT NULL,
    event_type VARCHAR(100),  -- 'EmailReceived', 'LeadClassified'
    payload JSONB,
    user_id UUID,
    occurred_at TIMESTAMPTZ DEFAULT NOW()
);

Indexes (Performance Optimization)

-- CRM lookups
CREATE INDEX idx_companies_cin ON companies(cin);
CREATE INDEX idx_companies_gstin ON companies USING GIN(gstin);
CREATE INDEX idx_companies_group ON companies(group_id);

-- Email stream routing
CREATE INDEX idx_emails_stream ON email_cards(stream);
CREATE INDEX idx_emails_status ON email_cards(kanban_status);
CREATE INDEX idx_emails_received ON email_cards(received_at DESC);

-- Activity audit
CREATE INDEX idx_activity_entity ON activity_stream(entity_type, entity_id);
CREATE INDEX idx_activity_occurred ON activity_stream(occurred_at DESC);

LangGraph Agent Implementation

Decision Support Agent (Phase 1)

# agents/decision_support.py
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI

class ReplyDraftState(TypedDict):
    email_id: str
    product: str
    customer_name: str
    tally_stock: Optional[int]
    tally_price: Optional[float]
    draft: Optional[str]
    confidence: float

def fetch_tally_data(state: ReplyDraftState):
    """Tool node - calls Tally XML API"""
    stock = tally_client.get_stock(state["product"])
    price = tally_client.get_price(state["product"])
    return {
        **state,
        "tally_stock": stock,
        "tally_price": price
    }

def generate_draft(state: ReplyDraftState):
    """LLM node - generates grounded reply"""
    llm = ChatOpenAI(model="gpt-4")
    prompt = f"""
    Generate a professional email reply:
    Product: {state['product']}
    Stock: {state['tally_stock']} kg
    Price: ₹{state['tally_price']}/kg
    """
    draft = llm.invoke(prompt)
    return {**state, "draft": draft.content, "confidence": 0.9}

# Build graph
workflow = StateGraph(ReplyDraftState)
workflow.add_node("fetch_tally", fetch_tally_data)
workflow.add_node("generate", generate_draft)
workflow.add_edge("fetch_tally", "generate")
workflow.set_entry_point("fetch_tally")

agent = workflow.compile()

Worker Implementation (Celery)

Email Classification Worker

# workers/email_classifier.py
from celery import shared_task
from transformers import pipeline

classifier = pipeline("text-classification", model="bert-base-uncased")

@shared_task(bind=True, max_retries=3)
def classify_email(self, email_id: str):
    """Async AI classification"""
    try:
        email = db.get_email(email_id)

        # Run AI classification
        result = classifier(email.subject + " " + email.body_text)

        # Map to stream
        stream = map_intent_to_stream(result[0]["label"])
        confidence = result[0]["score"]

        # Update DB
        db.update_email(
            email_id,
            stream=stream,
            ai_confidence=confidence,
            ai_intent=result[0]["label"]
        )

        # Log to activity stream
        log_activity(
            entity_type="email",
            entity_id=email_id,
            event_type="EmailClassified",
            payload={"stream": stream, "confidence": confidence}
        )

    except Exception as e:
        self.retry(exc=e, countdown=60)

Tally Sync Worker (Phase 1)

# workers/tally_sync.py
import xmltodict
import requests

@shared_task
def sync_tally_stock():
    """Periodic stock sync from Tally"""
    xml_payload = """
    <ENVELOPE>
        <HEADER><VERSION>1</VERSION></HEADER>
        <BODY>
            <DESC><STATICVARIABLES><SVEXPORTFORMAT>XML</SVEXPORTFORMAT></STATICVARIABLES></DESC>
            <DATA><COLLECTION name="Stock Items"><FETCH>Name, ClosingBalance</FETCH></COLLECTION></DATA>
        </BODY>
    </ENVELOPE>
    """

    response = requests.post(
        "http://localhost:9000",
        data=xml_payload,
        headers={"Content-Type": "application/xml"}
    )

    data = xmltodict.parse(response.text)
    # Upsert to local cache for fast AI lookups
    for item in data["ENVELOPE"]["BODY"]["DATA"]["COLLECTION"]["ITEM"]:
        redis_client.hset(
            "tally:stock",
            item["NAME"],
            item["CLOSINGBALANCE"]
        )

Testing Strategy

Test Pyramid

Level Coverage Target Tools
Unit 85%+ pytest, pytest-cov
Integration 70%+ pytest + testcontainers
E2E Critical user flows Playwright
LangGraph Agents 90%+ (node logic) LangGraph testing utils

Example Tests

# tests/test_tally_integration.py
import pytest
from unittest.mock import patch

@patch('workers.tally_sync.requests.post')
def test_tally_stock_lookup_success(mock_post):
    """Test Tally XML stock lookup"""
    mock_post.return_value.text = """
    <ENVELOPE>
        <BODY>
            <DATA>
                <COLLECTION>
                    <ITEM><NAME>Zinc Oxide</NAME><CLOSINGBALANCE>500</CLOSINGBALANCE></ITEM>
                </COLLECTION>
            </DATA>
        </BODY>
    </ENVELOPE>
    """

    from workers.tally_sync import sync_tally_stock
    sync_tally_stock.apply()

    # Verify Redis cache
    stock = redis_client.hget("tally:stock", "Zinc Oxide")
    assert stock == "500"

# tests/test_langgraph_agents.py
def test_decision_support_agent():
    """Test LangGraph draft generation"""
    from agents.decision_support import agent

    result = agent.invoke({
        "email_id": "test_123",
        "product": "Zinc Oxide",
        "customer_name": "ABC Corp"
    })

    assert result["draft"] is not None
    assert result["confidence"] > 0.7
    assert "Zinc Oxide" in result["draft"]

Deployment

Docker Compose (Full Stack Dev)

version: '3.8'
services:
  api:
    build: ./backend
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:pass@postgres/pebble
      - REDIS_URL=redis://redis:6379
      - TALLY_XML_URL=http://host.docker.internal:9000
    depends_on:
      - postgres
      - redis

  worker:
    build: ./backend
    command: celery -A workers worker -l INFO -Q email,tally,ai
    depends_on:
      - redis
      - postgres

  crm-ui:
    build: ./crm-ui
    ports:
      - "3000:3000"
    environment:
      - VITE_API_URL=http://localhost:8000

  plane-web:
    image: makeplane/plane-frontend:latest
    ports:
      - "3001:3000"
    depends_on:
      - plane-api

  plane-api:
    image: makeplane/plane-backend:latest
    environment:
      - DATABASE_URL=postgresql://postgres:pass@postgres/plane
      - SECRET_KEY=${PLANE_SECRET}

  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: pebble
    volumes:
      - pgdata:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

  meilisearch:
    image: getmeili/meilisearch:latest
    ports:
      - "7700:7700"

volumes:
  pgdata:

CI/CD Pipeline (GitHub Actions)

# .github/workflows/deploy.yml
name: CI/CD
on:
  push:
    branches: [main, develop]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run tests
        run: |
          cd backend
          pip install -r requirements-dev.txt
          pytest --cov=core --cov=agents

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - name: Build Docker images
        run: |
          docker build -t pebble-api:${{ github.sha }} ./backend
          docker build -t pebble-crm:${{ github.sha }} ./crm-ui

  deploy-staging:
    needs: build
    if: github.ref == 'refs/heads/develop'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: ./scripts/deploy-staging.sh

Environment Configuration

Required Variables

# .env.example

# === Database ===
DATABASE_URL=postgresql://user:password@localhost:5432/pebble
REDIS_URL=redis://localhost:6379/0

# === Email Ingestion ===
MS_CLIENT_ID=your_azure_app_id
MS_CLIENT_SECRET=your_azure_secret
MS_TENANT_ID=your_tenant_id

# === Tally/ERP (Phase 1) ===
TALLY_XML_URL=http://localhost:9000
TALLY_COMPANY=your_company_name
CURRENT_ERP_PROVIDER=focus_rt

# === AI/LLM ===
LLM_PROVIDER=azure  # or 'ollama' for local
AZURE_OPENAI_KEY=your_openai_key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
OLLAMA_BASE_URL=http://localhost:11434

# === Authentication ===
JWT_SECRET_KEY=your_secret_key_here
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30

# === Feature Flags ===
ENABLE_TALLY_SYNC=true
ENABLE_AI_DRAFTS=false  # Phase 1
ENABLE_LANGGRAPH_AGENTS=false  # Phase 1

Key Implementation Milestones

MVP (POC) - Production Ready (14-16 Weeks)

  • Email ingestion (IMAP + Graph API)
  • AI classification (CRM vs ERP streams)
  • Plane.so Kanban integration
  • Native CRM Master (8 tabs)
  • Entity validation (CIN/PAN/GST)
  • Activity Stream logging

Phase 1 (Tally Integration) - 6-8 Weeks

  • Tally XML connector (stock, orders, ledger)
  • Dispatch & Logistics Management Screens
  • LangGraph decision support agents
  • AI-grounded draft replies
  • Multi-entity inventory dashboard

Phase 4 (Advanced Intelligence)

  • LlamaIndex knowledge grounding
  • Semantic email threading
  • Unified search (emails + docs + ERP)

← Back to Use Cases | View Architecture