Data Pipelines & Reliability¶
This document describes the flow of critical data across the Pebble Orchestrator, highlighting asynchronous processing steps and reliability strategies.
Core Pipelines¶
1. Email Ingestion Pipeline (Sync → Async)¶
The journey from an Outlook Mailbox to a Native Kanban Card with structured and semantic persistence.
graph LR
A["O365 Mailbox"] -->|Webhook| B["Pebble Listener"]
B -->|Push| C[("Redis Queue")]
C -->|Worker 1| D["Metadata Extraction"]
D -->|Persistence| E[("PostgreSQL Core")]
E -->|Semantic Sync| V[("Vector DB - Shadow Memory")]
E -->|Worker 2| F["AI Classifier"]
F -->|Internal API| G["Native Kanban State"]
2. Clean Room Ingestion Sequence¶
Detailed flow for multi-entity validation during data entry.
sequenceDiagram
participant S as "Source (Excel/Email)"
participant E as "Ingestion Service"
participant AI as "AI Enrichment"
participant DB as "PostgreSQL Core"
participant K as "Kanban UI"
S->>E: Raw Record
E->>DB: Check GST/CIN/Email
DB-->>E: Match/No Match
E->>AI: Scored Metadata
AI-->>E: Urgency + Sentiment
E->>DB: Commit Native Record
DB->>K: Broadcast State Update
3. CRM Master State Management (Event-Driven)¶
Internal synchronization between the Kanban state and Customer context.
- Trigger: User moves card from
Qualify→Quotein the Native Kanban UI. - Action:
- Frontend emits
WORKFLOW_STATE_CHANGEsignal. - Pebble Core validates CRM prerequisites (e.g., GSTIN verification).
- Internal Service updates Company status in the Unified Database.
- Triggers AI Assistant to suggest "Landed Cost" templates.
4. AI Enrichment Pipeline¶
Enhancing cards with deep intelligence.
- Steps:
- OCR Stage: Scans attachments for CIN/PAN/GST.
- Sentiment Stage: Grades lead urgency (P0-P3).
- Routing Stage: Agentic reasoning to determine next-best-action. This stage determines if a card lands in the Global Unclassified Queue (Low Confidence) or the HOD Pending Queue (High Confidence) as per the Lead State Machine (Linked to US-WFL-010).
[!NOTE] Optimization Strategy: While Phase 1 uses zero-shot LLMs for Agentic Triage, we may introduce smaller, faster models like DistilBERT for specific classification tasks in later phases to optimize for cost and latency.
5. Process Automation Pipeline (n8n)¶
Triggered by the Master Sheet (Deterministic SOPs).
- Trigger: New card in Processing or Human Feedback received.
- Action:
1. n8n fetches the Decision Matrix from the Master Sheet.
2. Executes multi-platform steps: "Upload to Nextcloud -> Tag in Tally -> Move card".
6. Learning Loop: The "Why?" Collector¶
Captures the delta between AI prediction and Human intent. - Trigger: User overrides AI move. - Capture: UI prompts for "Why?". - Storage: User response + original Email text is embedded and saved in the Vector DB (Shadow Memory). - Optimization: Once a week, these "Why" pairs are used to fine-tune the classifier, reducing the "Low Confidence" triage rate.
4. Sales-to-Operations Handover¶
The critical transition from "Promising Lead" to "Committed Order".
graph LR
Quote[Approved Quote] -->|User Trigger| SO[Sales Order Gen]
SO -->|Async Event| ERP[Tally XML Push]
ERP -->|Ack Received| Status[Update: Placed]
Status -->|Stock Check| Inv[Invoice Gen]
Inv -->|Email| Cust[Customer Notified]
5. Quality & LIMS Integration¶
Connecting physical lab samples to digital approvals.
- Trigger: Technician scans Sample QR Code.
- Action:
1. Fetch Spec.Thresholds from Master Data.
2. Compare Observed Values.
3. Auto-Grade: If Pass, update CRM Opportunity Status to Sample Approved.
6. Tender (GEM) Ingestion¶
- Source:
tender@email box receiving GEM notifications. - Extraction: AI parses "Bid Number", "Deadline", and "Item Name".
- Action: Creates a card in the Tender Kanban Board with
Deadlineas the Due Date.
Reliability & Fault Tolerance¶
Dead Letter Queues (DLQ)¶
Any message failing > 3 times is moved to a DLQ.
- Alert: IT team notified via OPS-006.
- Action: Manual inspection or "Re-ingest" after fixing the upstream issue (e.g., ERP API downtime).
Idempotency¶
To prevent "Duplicate Lead Burnout", every email message ID is hashed and cached in Redis. If the same ID is seen again, the pipeline drops it before processing.
Sync Retries¶
When syncing with external ERPs (Focus RT/Business Central):
- Strategy: Exponential backoff (1m, 5m, 15m, 1h).
- Hard Limit: After 24 hours of failure, the card status in Kanban is updated to SYNC_ERROR.
Pipeline User Stories (DevOps)¶
US-DVO-005: Pipeline Pulse Monitoring¶
As a DevOps Engineer, I want to see real-time queue depths so that I can scale worker pods before ingestion lag impacts sales. - AC: Grafana dashboard showing Redis queue length and worker processing time.
US-DVO-006: Bulk Re-classification¶
As an Admin, I want to re-run the classification engine on a batch of cards so that I can update the board after a model retraining. - AC: Management command/API to trigger batch re-classification for a specific date range.