Skip to content

Data Pipelines & Reliability

This document describes the flow of critical data across the Pebble Orchestrator, highlighting asynchronous processing steps and reliability strategies.


Core Pipelines

1. Email Ingestion Pipeline (Sync → Async)

The journey from an Outlook Mailbox to a Native Kanban Card with structured and semantic persistence.

graph LR
    A["O365 Mailbox"] -->|Webhook| B["Pebble Listener"]
    B -->|Push| C[("Redis Queue")]
    C -->|Worker 1| D["Metadata Extraction"]
    D -->|Persistence| E[("PostgreSQL Core")]
    E -->|Semantic Sync| V[("Vector DB - Shadow Memory")]
    E -->|Worker 2| F["AI Classifier"]
    F -->|Internal API| G["Native Kanban State"]

2. Clean Room Ingestion Sequence

Detailed flow for multi-entity validation during data entry.

sequenceDiagram
    participant S as "Source (Excel/Email)"
    participant E as "Ingestion Service"
    participant AI as "AI Enrichment"
    participant DB as "PostgreSQL Core"
    participant K as "Kanban UI"

    S->>E: Raw Record
    E->>DB: Check GST/CIN/Email
    DB-->>E: Match/No Match
    E->>AI: Scored Metadata
    AI-->>E: Urgency + Sentiment
    E->>DB: Commit Native Record
    DB->>K: Broadcast State Update

3. CRM Master State Management (Event-Driven)

Internal synchronization between the Kanban state and Customer context.

  • Trigger: User moves card from QualifyQuote in the Native Kanban UI.
  • Action:
  • Frontend emits WORKFLOW_STATE_CHANGE signal.
  • Pebble Core validates CRM prerequisites (e.g., GSTIN verification).
  • Internal Service updates Company status in the Unified Database.
  • Triggers AI Assistant to suggest "Landed Cost" templates.

4. AI Enrichment Pipeline

Enhancing cards with deep intelligence.

  • Steps:
  • OCR Stage: Scans attachments for CIN/PAN/GST.
  • Sentiment Stage: Grades lead urgency (P0-P3).
  • Routing Stage: Agentic reasoning to determine next-best-action. This stage determines if a card lands in the Global Unclassified Queue (Low Confidence) or the HOD Pending Queue (High Confidence) as per the Lead State Machine (Linked to US-WFL-010).

[!NOTE] Optimization Strategy: While Phase 1 uses zero-shot LLMs for Agentic Triage, we may introduce smaller, faster models like DistilBERT for specific classification tasks in later phases to optimize for cost and latency.

5. Process Automation Pipeline (n8n)

Triggered by the Master Sheet (Deterministic SOPs). - Trigger: New card in Processing or Human Feedback received. - Action: 1. n8n fetches the Decision Matrix from the Master Sheet. 2. Executes multi-platform steps: "Upload to Nextcloud -> Tag in Tally -> Move card".

6. Learning Loop: The "Why?" Collector

Captures the delta between AI prediction and Human intent. - Trigger: User overrides AI move. - Capture: UI prompts for "Why?". - Storage: User response + original Email text is embedded and saved in the Vector DB (Shadow Memory). - Optimization: Once a week, these "Why" pairs are used to fine-tune the classifier, reducing the "Low Confidence" triage rate.


4. Sales-to-Operations Handover

The critical transition from "Promising Lead" to "Committed Order".

graph LR
    Quote[Approved Quote] -->|User Trigger| SO[Sales Order Gen]
    SO -->|Async Event| ERP[Tally XML Push]
    ERP -->|Ack Received| Status[Update: Placed]
    Status -->|Stock Check| Inv[Invoice Gen]
    Inv -->|Email| Cust[Customer Notified]

5. Quality & LIMS Integration

Connecting physical lab samples to digital approvals. - Trigger: Technician scans Sample QR Code. - Action: 1. Fetch Spec.Thresholds from Master Data. 2. Compare Observed Values. 3. Auto-Grade: If Pass, update CRM Opportunity Status to Sample Approved.

6. Tender (GEM) Ingestion

  • Source: tender@ email box receiving GEM notifications.
  • Extraction: AI parses "Bid Number", "Deadline", and "Item Name".
  • Action: Creates a card in the Tender Kanban Board with Deadline as the Due Date.

Reliability & Fault Tolerance

Dead Letter Queues (DLQ)

Any message failing > 3 times is moved to a DLQ.

  • Alert: IT team notified via OPS-006.
  • Action: Manual inspection or "Re-ingest" after fixing the upstream issue (e.g., ERP API downtime).

Idempotency

To prevent "Duplicate Lead Burnout", every email message ID is hashed and cached in Redis. If the same ID is seen again, the pipeline drops it before processing.

Sync Retries

When syncing with external ERPs (Focus RT/Business Central): - Strategy: Exponential backoff (1m, 5m, 15m, 1h). - Hard Limit: After 24 hours of failure, the card status in Kanban is updated to SYNC_ERROR.


Pipeline User Stories (DevOps)

US-DVO-005: Pipeline Pulse Monitoring

As a DevOps Engineer, I want to see real-time queue depths so that I can scale worker pods before ingestion lag impacts sales. - AC: Grafana dashboard showing Redis queue length and worker processing time.

US-DVO-006: Bulk Re-classification

As an Admin, I want to re-run the classification engine on a batch of cards so that I can update the board after a model retraining. - AC: Management command/API to trigger batch re-classification for a specific date range.


← Back to Architecture