Skip to content

Implementation Overview

This document outlines the development approach and technical implementation details for the Pebble Business Orchestrator.

Architecture Reference

For a detailed deep-dive into the event-driven system design, data flow diagrams, and scalability strategies, see the High-Level Architecture document.


Technology Stack

Backend & Sync

Layer Technology Purpose
Language Python 3.11+ Core orchestration logic
API Framework FastAPI REST API for UI & Webhooks
Sync Engine Celery + Redis Async CRM/ERP synchronization
Connectors Odoo/Zoho SDKs Third-party system integration

AI/ML (Orchestration)

Component Technology
Classification Transformers (BERT/RoBERTa)
Intent / LLM Azure OpenAI / Ollama
NLP spaCy (Entity Extraction)
OCR Azure/Tesseract (Tender IQ)

Data Stores

Store Technology Purpose
Communications PostgreSQL 15 Email cards & Audit logs
Metadata Redis Real-time state / Sessions
Search Meilisearch Quick card/lead lookup

Project Structure

pebble-orchestrator/
├── core/                   # Business logic & Workflows
├── connectors/             # Odoo, Zoho, Focus RT integration
├── workers/               # Async sync & AI classification
├── api/                    # FastAPI routes for Kanban UI
├── web/                   # React-based Kanban frontend
├── scripts/               # Migration & Master data tools
├── tests/                 # Unit & Integration tests
└── docker-compose.yml

Development Workflow

Git Flow

gitGraph
    commit id: "main"
    branch develop
    commit id: "feature-base"
    branch feature/ingestion
    commit id: "upload-api"
    commit id: "batch-upload"
    checkout develop
    merge feature/ingestion
    branch feature/dedup
    commit id: "hash-dedup"
    commit id: "content-dedup"
    checkout develop
    merge feature/dedup
    checkout main
    merge develop tag: "v0.1.0"

Branching Strategy

Branch Purpose Merge To
main Production-ready -
develop Integration main
feature/* New features develop
fix/* Bug fixes develop
release/* Release prep main

API Design

RESTful Conventions

Method Endpoint Purpose
POST /documents Upload document
GET /documents List documents
GET /documents/:id Get document
DELETE /documents/:id Delete document
POST /search Search documents
GET /tags List tags

Request/Response Format

// POST /documents
// Request (multipart/form-data)
{
  "file": "<binary>",
  "tags": ["manual-tag-1"]
}

// Response
{
  "id": "doc_abc123",
  "filename": "invoice.pdf",
  "status": "processing",
  "created_at": "2024-01-15T10:30:00Z"
}

Error Handling

{
  "error": {
    "code": "DOCUMENT_NOT_FOUND",
    "message": "Document with ID 'doc_xyz' not found",
    "details": {}
  }
}

Database Schema

Core Tables

-- Documents
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    filename VARCHAR(255) NOT NULL,
    original_filename VARCHAR(255),
    mime_type VARCHAR(100),
    size_bytes BIGINT,
    storage_path TEXT,
    hash_md5 CHAR(32),
    hash_sha256 CHAR(64),
    status VARCHAR(50) DEFAULT 'pending',
    doc_type VARCHAR(100),
    category VARCHAR(100),
    confidence DECIMAL(5,2),
    extracted_text TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Tags
CREATE TABLE tags (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(100) UNIQUE NOT NULL,
    type VARCHAR(50) DEFAULT 'custom',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Document-Tag relationship
CREATE TABLE document_tags (
    document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
    tag_id UUID REFERENCES tags(id) ON DELETE CASCADE,
    source VARCHAR(50) DEFAULT 'manual',
    confidence DECIMAL(5,2),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    PRIMARY KEY (document_id, tag_id)
);

-- Processing jobs
CREATE TABLE processing_jobs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    document_id UUID REFERENCES documents(id),
    job_type VARCHAR(50),
    status VARCHAR(50) DEFAULT 'pending',
    result JSONB,
    error TEXT,
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

Indexes

CREATE INDEX idx_documents_status ON documents(status);
CREATE INDEX idx_documents_type ON documents(doc_type);
CREATE INDEX idx_documents_hash_md5 ON documents(hash_md5);
CREATE INDEX idx_documents_created ON documents(created_at);
CREATE INDEX idx_tags_name ON tags(name);

Worker Implementation

OCR Worker

# workers/ocr.py
from celery import shared_task
import pytesseract
from PIL import Image

@shared_task(bind=True, max_retries=3)
def process_ocr(self, document_id: str, file_path: str):
    try:
        # Load image/PDF
        image = Image.open(file_path)

        # Run OCR
        text = pytesseract.image_to_string(
            image,
            lang='eng',
            config='--oem 3 --psm 6'
        )

        # Save result
        update_document(document_id, extracted_text=text)

        # Trigger next step
        process_embedding.delay(document_id)

    except Exception as e:
        self.retry(exc=e, countdown=60)

Embedding Worker

# workers/embedding.py
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')

@shared_task
def process_embedding(document_id: str):
    doc = get_document(document_id)

    # Generate embedding
    embedding = model.encode(doc.extracted_text)

    # Store in Qdrant
    qdrant_client.upsert(
        collection_name="documents",
        points=[{
            "id": document_id,
            "vector": embedding.tolist(),
            "payload": {"filename": doc.filename}
        }]
    )

    # Trigger dedup check
    check_duplicates.delay(document_id)

Testing Strategy

Test Pyramid

Level Coverage Tools
Unit 80%+ pytest
Integration 60%+ pytest + testcontainers
E2E Critical paths Playwright

Example Tests

# tests/test_ingestion.py
import pytest
from fastapi.testclient import TestClient

def test_upload_pdf(client: TestClient, sample_pdf):
    response = client.post(
        "/api/v1/documents",
        files={"file": sample_pdf}
    )
    assert response.status_code == 202
    assert "id" in response.json()
    assert response.json()["status"] == "processing"

def test_upload_unsupported_format(client: TestClient):
    response = client.post(
        "/api/v1/documents",
        files={"file": ("test.exe", b"binary", "application/x-msdownload")}
    )
    assert response.status_code == 400

Deployment

Docker Compose (Development)

version: '3.8'
services:
  api:
    build: ./api
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres/pebble
      - REDIS_URL=redis://redis:6379
    depends_on:
      - postgres
      - redis

  worker:
    build: ./api
    command: celery -A workers worker -l INFO
    depends_on:
      - redis

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: pebble
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass

  redis:
    image: redis:7

  qdrant:
    image: qdrant/qdrant
    ports:
      - "6333:6333"

  meilisearch:
    image: getmeili/meilisearch
    ports:
      - "7700:7700"

  minio:
    image: minio/minio
    command: server /data
    ports:
      - "9000:9000"

CI/CD Pipeline

flowchart LR
    A[Push] --> B[Lint]
    B --> C[Unit Tests]
    C --> D[Build Images]
    D --> E[Integration Tests]
    E --> F{Branch?}
    F -->|develop| G[Deploy Staging]
    F -->|main| H[Deploy Production]

Environment Configuration

A .env template is required for both local development and production deployment.

# Database & Core
DATABASE_URL=postgresql://user:password@localhost:5432/pebble
REDIS_URL=redis://localhost:6379/0
SECRET_KEY=yoursecretkeyhere

# Microsoft Graph (Ingestion)
MS_CLIENT_ID=your_client_id
MS_CLIENT_SECRET=your_client_secret
MS_TENANT_ID=your_tenant_id
MS_REDIRECT_URI=https://pebble.company.com/api/auth/callback

# CRM/ERP Integrations
ODOO_URL=https://your-odoo-instance.com
ODOO_DB=your_db
ODOO_USERNAME=your_user
ODOO_PASSWORD=your_password

ZOHO_AUTH_TOKEN=your_zoho_token
FOCUS_RT_API_ENDPOINT=https://your-focus-instance.com/api

# AI & Vision
LLM_PROVIDER=azure  # or 'ollama'
AZURE_OPENAI_KEY=your_key
AZURE_OPENAI_ENDPOINT=your_endpoint
OLLAMA_BASE_URL=http://localhost:11434
TESSERACT_CMD=/usr/bin/tesseract

# Storage
MINIO_ROOT_USER=admin
MINIO_ROOT_PASSWORD=password
MINIO_ENDPOINT=localhost:9000

Key Live Screens

The Pebble Business Orchestrator features several high-impact user interfaces designed for operational efficiency.

1. Visual Kanban Board

A real-time, drag-and-drop workspace where every card represents a customer email thread. Reps can classify, move, and action leads directly from the board.

2. Quality Control Dashboard

A centralized analytics view for batch purity, sample throughput, and COA status tracking. Includes automated threshold validation alerts.

3. Prelead Screening Queue

A high-velocity interface for the marketing team to adjudicate raw data batches and move qualified entities into the CRM pipeline.

4. Email Stream Router

A configuration interface for mapping incoming email mailboxes to specific CRM or ERP functional streams.


← Back to Use Cases