Implementation Overview¶
This document outlines the development approach and technical implementation details for the Pebble Business Orchestrator.
Architecture Reference¶
For a detailed deep-dive into the event-driven system design, data flow diagrams, and scalability strategies, see the High-Level Architecture document.
Technology Stack¶
Backend & Sync¶
| Layer | Technology | Purpose |
|---|---|---|
| Language | Python 3.11+ | Core orchestration logic |
| API Framework | FastAPI | REST API for UI & Webhooks |
| Sync Engine | Celery + Redis | Async CRM/ERP synchronization |
| Connectors | Odoo/Zoho SDKs | Third-party system integration |
AI/ML (Orchestration)¶
| Component | Technology |
|---|---|
| Classification | Transformers (BERT/RoBERTa) |
| Intent / LLM | Azure OpenAI / Ollama |
| NLP | spaCy (Entity Extraction) |
| OCR | Azure/Tesseract (Tender IQ) |
Data Stores¶
| Store | Technology | Purpose |
|---|---|---|
| Communications | PostgreSQL 15 | Email cards & Audit logs |
| Metadata | Redis | Real-time state / Sessions |
| Search | Meilisearch | Quick card/lead lookup |
Project Structure¶
pebble-orchestrator/
├── core/ # Business logic & Workflows
├── connectors/ # Odoo, Zoho, Focus RT integration
├── workers/ # Async sync & AI classification
├── api/ # FastAPI routes for Kanban UI
├── web/ # React-based Kanban frontend
├── scripts/ # Migration & Master data tools
├── tests/ # Unit & Integration tests
└── docker-compose.yml
Development Workflow¶
Git Flow¶
gitGraph
commit id: "main"
branch develop
commit id: "feature-base"
branch feature/ingestion
commit id: "upload-api"
commit id: "batch-upload"
checkout develop
merge feature/ingestion
branch feature/dedup
commit id: "hash-dedup"
commit id: "content-dedup"
checkout develop
merge feature/dedup
checkout main
merge develop tag: "v0.1.0"
Branching Strategy¶
| Branch | Purpose | Merge To |
|---|---|---|
main |
Production-ready | - |
develop |
Integration | main |
feature/* |
New features | develop |
fix/* |
Bug fixes | develop |
release/* |
Release prep | main |
API Design¶
RESTful Conventions¶
| Method | Endpoint | Purpose |
|---|---|---|
| POST | /documents |
Upload document |
| GET | /documents |
List documents |
| GET | /documents/:id |
Get document |
| DELETE | /documents/:id |
Delete document |
| POST | /search |
Search documents |
| GET | /tags |
List tags |
Request/Response Format¶
// POST /documents
// Request (multipart/form-data)
{
"file": "<binary>",
"tags": ["manual-tag-1"]
}
// Response
{
"id": "doc_abc123",
"filename": "invoice.pdf",
"status": "processing",
"created_at": "2024-01-15T10:30:00Z"
}
Error Handling¶
{
"error": {
"code": "DOCUMENT_NOT_FOUND",
"message": "Document with ID 'doc_xyz' not found",
"details": {}
}
}
Database Schema¶
Core Tables¶
-- Documents
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
filename VARCHAR(255) NOT NULL,
original_filename VARCHAR(255),
mime_type VARCHAR(100),
size_bytes BIGINT,
storage_path TEXT,
hash_md5 CHAR(32),
hash_sha256 CHAR(64),
status VARCHAR(50) DEFAULT 'pending',
doc_type VARCHAR(100),
category VARCHAR(100),
confidence DECIMAL(5,2),
extracted_text TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Tags
CREATE TABLE tags (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) UNIQUE NOT NULL,
type VARCHAR(50) DEFAULT 'custom',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Document-Tag relationship
CREATE TABLE document_tags (
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
tag_id UUID REFERENCES tags(id) ON DELETE CASCADE,
source VARCHAR(50) DEFAULT 'manual',
confidence DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (document_id, tag_id)
);
-- Processing jobs
CREATE TABLE processing_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES documents(id),
job_type VARCHAR(50),
status VARCHAR(50) DEFAULT 'pending',
result JSONB,
error TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
Indexes¶
CREATE INDEX idx_documents_status ON documents(status);
CREATE INDEX idx_documents_type ON documents(doc_type);
CREATE INDEX idx_documents_hash_md5 ON documents(hash_md5);
CREATE INDEX idx_documents_created ON documents(created_at);
CREATE INDEX idx_tags_name ON tags(name);
Worker Implementation¶
OCR Worker¶
# workers/ocr.py
from celery import shared_task
import pytesseract
from PIL import Image
@shared_task(bind=True, max_retries=3)
def process_ocr(self, document_id: str, file_path: str):
try:
# Load image/PDF
image = Image.open(file_path)
# Run OCR
text = pytesseract.image_to_string(
image,
lang='eng',
config='--oem 3 --psm 6'
)
# Save result
update_document(document_id, extracted_text=text)
# Trigger next step
process_embedding.delay(document_id)
except Exception as e:
self.retry(exc=e, countdown=60)
Embedding Worker¶
# workers/embedding.py
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
@shared_task
def process_embedding(document_id: str):
doc = get_document(document_id)
# Generate embedding
embedding = model.encode(doc.extracted_text)
# Store in Qdrant
qdrant_client.upsert(
collection_name="documents",
points=[{
"id": document_id,
"vector": embedding.tolist(),
"payload": {"filename": doc.filename}
}]
)
# Trigger dedup check
check_duplicates.delay(document_id)
Testing Strategy¶
Test Pyramid¶
| Level | Coverage | Tools |
|---|---|---|
| Unit | 80%+ | pytest |
| Integration | 60%+ | pytest + testcontainers |
| E2E | Critical paths | Playwright |
Example Tests¶
# tests/test_ingestion.py
import pytest
from fastapi.testclient import TestClient
def test_upload_pdf(client: TestClient, sample_pdf):
response = client.post(
"/api/v1/documents",
files={"file": sample_pdf}
)
assert response.status_code == 202
assert "id" in response.json()
assert response.json()["status"] == "processing"
def test_upload_unsupported_format(client: TestClient):
response = client.post(
"/api/v1/documents",
files={"file": ("test.exe", b"binary", "application/x-msdownload")}
)
assert response.status_code == 400
Deployment¶
Docker Compose (Development)¶
version: '3.8'
services:
api:
build: ./api
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@postgres/pebble
- REDIS_URL=redis://redis:6379
depends_on:
- postgres
- redis
worker:
build: ./api
command: celery -A workers worker -l INFO
depends_on:
- redis
postgres:
image: postgres:15
environment:
POSTGRES_DB: pebble
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
redis:
image: redis:7
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
meilisearch:
image: getmeili/meilisearch
ports:
- "7700:7700"
minio:
image: minio/minio
command: server /data
ports:
- "9000:9000"
CI/CD Pipeline¶
flowchart LR
A[Push] --> B[Lint]
B --> C[Unit Tests]
C --> D[Build Images]
D --> E[Integration Tests]
E --> F{Branch?}
F -->|develop| G[Deploy Staging]
F -->|main| H[Deploy Production]
Environment Configuration¶
A .env template is required for both local development and production deployment.
# Database & Core
DATABASE_URL=postgresql://user:password@localhost:5432/pebble
REDIS_URL=redis://localhost:6379/0
SECRET_KEY=yoursecretkeyhere
# Microsoft Graph (Ingestion)
MS_CLIENT_ID=your_client_id
MS_CLIENT_SECRET=your_client_secret
MS_TENANT_ID=your_tenant_id
MS_REDIRECT_URI=https://pebble.company.com/api/auth/callback
# CRM/ERP Integrations
ODOO_URL=https://your-odoo-instance.com
ODOO_DB=your_db
ODOO_USERNAME=your_user
ODOO_PASSWORD=your_password
ZOHO_AUTH_TOKEN=your_zoho_token
FOCUS_RT_API_ENDPOINT=https://your-focus-instance.com/api
# AI & Vision
LLM_PROVIDER=azure # or 'ollama'
AZURE_OPENAI_KEY=your_key
AZURE_OPENAI_ENDPOINT=your_endpoint
OLLAMA_BASE_URL=http://localhost:11434
TESSERACT_CMD=/usr/bin/tesseract
# Storage
MINIO_ROOT_USER=admin
MINIO_ROOT_PASSWORD=password
MINIO_ENDPOINT=localhost:9000
Key Live Screens¶
The Pebble Business Orchestrator features several high-impact user interfaces designed for operational efficiency.
1. Visual Kanban Board¶
A real-time, drag-and-drop workspace where every card represents a customer email thread. Reps can classify, move, and action leads directly from the board.
2. Quality Control Dashboard¶
A centralized analytics view for batch purity, sample throughput, and COA status tracking. Includes automated threshold validation alerts.
3. Prelead Screening Queue¶
A high-velocity interface for the marketing team to adjudicate raw data batches and move qualified entities into the CRM pipeline.
4. Email Stream Router¶
A configuration interface for mapping incoming email mailboxes to specific CRM or ERP functional streams.