Kafka Job Orchestrator (KJO)

Technical Documentation and Architecture Overview

Producer Flow Visualization

Producer Architecture MongoDB Data Source (PENDING items) Redis Get Excluded IDs (PROD:PREFIX:*) Redis 1. Check Hash 2. Save Item ID 3. Save Hash Producer Orchestrates Flow Batch Processing Kafka Topic Message Queue (JSON messages) 1 Get Excluded IDs 2 Query Items 3 Check & Save 4 Send to Kafka TTL Settings ID: 20 seconds Hash: 20 seconds

Detailed Process Steps

1

Get Excluded IDs from Redis

Queries Redis for active processing keys and recently sent items to build an exclusion list.

// Get processing IDs (from consumers)
const processingIds = await redisService.getProcessingIds();
// Example: ['id1', 'id2'] from CONS:PREFIX:*

// Get recently sent IDs (from producers)
const sentIds = await redisService.getSentIds();
// Example: ['id3', 'id4'] from PREFIX_SENT:*

const excludedIds = [...new Set([...processingIds, ...sentIds])];
Processing: CONS:PREFIX:*
Sent: PREFIX_SENT:*
2

Query Data Source with Criteria

Executes database query with specified criteria, excluding items found in Redis.

// Abstract method - you implement this
async getNextProcessingItems(criteria, limit, excludedIds) {
  // Example MongoDB implementation
  return await collection.find({
    ...criteria,  // { state: STATES.PENDING }
    _id: { $nin: excludedIds }
  })
  .limit(limit)
  .toArray();
}

// Usage in your producer
await this.produceMessages(
  { state: STATES.PENDING, priority: 'high' },
  50,
  "new"
);
Compatible with MongoDB, MySQL, PostgreSQL, and other data sources.
3

Generate & Check Message Hash

Generates MD5 hash of item content and compares with stored hash values.

// Generate hash from item content
const messageHash = redisService.generateMessageHash(item);
// Uses: md5(id + URL/email + state)

// Check if already sent with same content
const isRecentlySent = await redisService.isSentRecently(
  itemId,
  messageHash
);

// Compare stored hash with current
if (storedHash === messageHash) {
  skip(); // Same content already sent
}
Content-based dedup
MD5 hashing
4

Create & Send Kafka Messages

Constructs Kafka messages with deterministic keys and sends to configured topic.

// Create messages with stable keys
const messages = messageService.createKafkaMessages(
  itemsWithHashes,
  messageType
);

// Message structure
{
  key: "new-a1b2c3d4e5f6",  // Stable key
  value: JSON.stringify({
    id: item._id.toString(),
    data: item,
    type: messageType
  }),
  headers: {
    'message-type': 'new',
    'item-id': 'id123',
    'content-hash': 'abc123...',
    'timestamp': '1234567890'
  }
}

// Send with acks=-1 (all replicas)
await kafkaProducer.send({
  topic: this.topic,
  messages,
  acks: -1,
  timeout: 30000
});
Acknowledgment
acks=-1
All replicas
Idempotent
false
For acks flexibility
Timeout
30s
Configurable
5

Mark Items as Sent in Redis

Records item ID and content hash in Redis with configured TTL.

// For each successfully sent item
for (const { item, messageHash } of itemsToProcess) {
  const itemId = this.getItemId(item);

  // Store item ID with message hash
  await redisService.setSentMessage(itemId, messageHash);
  // Key: PREFIX_SENT:itemId
  // Value: messageHash
  // TTL: 20 seconds (default, 2x processing TTL)

  // Call success handler
  await this.onSuccess(itemId);
}

Storage Components

  • ID tracking: Item identifier storage
  • Hash storage: Content fingerprint for comparison
  • TTL configuration: 20 second expiration (2x processing TTL)

Producer Specifications

Deduplication
Hybrid
ID + Content Hash
Flexibility
Any DB
MongoDB, MySQL, API
Configuration
Flexible
Criteria-based queries

Deduplication Mechanisms Explained

1. Item ID Deduplication

Tracks sent item identifiers to prevent duplicate transmission.

// Redis key pattern
EMAIL_SENT:item123 → "hash123..."
// TTL: 20 seconds (default, 2x processing TTL)

// Check during query
const sentIds = await redisService.getSentIds();
// Query excludes these IDs

// After sending
await redisService.setSentMessage(
  itemId,
  messageHash
);

2. Content Hash Deduplication

Compares message hashes to detect duplicate content across different IDs.

// Generate hash from content
const hash = md5(id + URL/email + state);

// Check if same content was sent
const storedHash = await redisService
  .getSentMessageHash(itemId);

if (storedHash === hash) {
  skip(); // Same content already sent
}

Operation Sequence

  • Query Time: Excludes IDs found in Redis
  • Before Send: Validates content hash against stored values
  • After Send: Records ID and hash with TTL
  • TTL Strategy: 20 second expiration for sent items (2x processing TTL)
  • Processing Keys: CONS:PREFIX:* pattern for active processing

Abstract Producer Implementation

Provided Functionality

  • Redis deduplication (ID + content hash)
  • Kafka connection and message sending
  • Stable message key generation
  • Message headers and metadata
  • Error handling and logging

Required Implementation

class MyProducer extends AbstractProducer {
  // 1. Query your data source
  async getNextProcessingItems(criteria, limit, excludedIds) {
    return await myDB.find(criteria, excludedIds);
  }

  // 2. Get item ID
  getItemId(item) {
    return item._id.toString();
  }

  // 3. Get display key
  getItemKey(item) {
    return item.name || item.email;
  }
}

Consumer Flow Visualization

Consumer Architecture Kafka Message Source (Pull Model) Consumer Message Processor Business Logic Redis Status Check 1 Is Processing? (CONS:PREFIX:id) Database Status Check 2 Is Completed? (Skip if true) Redis Set Processing State (Virtual) (TTL: 5 min) Process Execute Logic MongoDB Update State (COMPLETED/FAILED) Redis Remove Processing Key 1 Pull Messages 2 Status Check 1 Redis 3 Status Check 2 Database 4 Set Processing 5 Execute Logic 6 Update State 7 Cleanup Virtual Processing State EXISTS only in Redis Auto-cleanup on crash TTL prevents stuck items Flow Summary 1. Pull → 2. Check Redis 3. Check DB → 4. Set Processing 5. Execute → 6. Update → 7. Cleanup

Consumer Processing Steps

1

Configure & Pull Messages from Kafka

Consumer establishes connection to Kafka broker and begins message consumption.

// Consumer configuration
const consumer = kafkaClient.consumer({
  groupId: this.consumerGroup,
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
  maxBytesPerPartition: 1048576, // 1MB
  retry: {
    retries: 10,
    initialRetryTime: 1000,
    maxRetryTime: 30000
  }
});

// Start consuming with concurrency control
await consumer.run({
  partitionsConsumedConcurrently: 1, // configurable
  eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
    await this._handleIncomingMessage(
      topic, partition, message, heartbeat, pause
    );
  }
});
Max Concurrent
3
Messages (env: MAX_CONCURRENT_MESSAGES)
Partition Concurrency
1
Default sequential
Status Report
30s
Processing stats
2

Process Message with Concurrency Control

Manages concurrent message processing within configured limits.

// Concurrent processing with queue management
await this.processor.processWithConcurrency(async () => {
  await this._processItemWithDuplicationChecks(
    messageData,
    pause  // Can pause partition on error
  );
}, heartbeat);

// Automatic heartbeat during long processing
// Prevents consumer timeout/rebalancing
Active Tasks
2/3
Processing
Queue Length
5
Waiting
Processed
147
Total
3

Check Processing & Completion Status

Verifies processing status in Redis and completion status in database before proceeding.

// Step 2: Check Redis for processing
const messageId = await this.getMessageId(messageData);
const redisKey = `CONS:${prefix}:${messageId}`;

if (await redis.exists(redisKey)) {
  console.log('⏭️ Already processing in Redis');
  return; // Skip this message
}
// Step 3: Check DB for completion
if (await this.isItemCompleted(messageId)) {
  console.log('✅ Already completed in DB');
  return; // Skip this message
}

// Both checks passed - proceed
console.log('🆕 New item to process');
Redis Check
Processing?
Prevents concurrent work
DB Check
Completed?
Prevents re-processing
4

Set Virtual Processing State in Redis

Sets temporary processing flag in Redis with configured TTL value.

// Mark as processing with configurable TTL
await this._startProcessing(messageId);

// Sets Redis key: CONS:PREFIX:messageId
await this.redisService.setProcessingKey(
  messageId,
  300  // 5 minute TTL (configurable)
);

// Virtual state benefits:
// ✅ No DB pollution
// ✅ Auto-expires on crash
// ✅ Clean recovery

Virtual State Pattern

Processing state maintained in Redis memory without database persistence

5

Execute Business Logic & Handle Errors

Executes business logic with error handling and recovery mechanisms.

try {
  // Execute your business logic
  const result = await this.process(messageData);
  
  // Handle success
  await this._handleProcessingSuccess(messageId, result);
  
} catch (error) {
  // Intelligent error handling
  if (this._shouldPausePartitionOnError(error)) {
    // Pause partition for critical errors
    await pause();
    console.log(`⏸️ Pausing partition: ${error.message}`);
  }
  
  // Handle failure
  await this._handleProcessingFailure(messageId, error);
  
  // Recovery delay based on error type
  const delay = this._getErrorRecoveryDelay(error);
  await new Promise(resolve => setTimeout(resolve, delay));
}
Auto-pause on errors
Smart recovery delays
Error categorization
6

Update Database State & Handle Results

Updates database with processing outcome and result data.

// Success path
if (success) {
  // Mark as completed (your implementation)
  await this.markItemAsCompleted(messageId);
  
  // Handle processing result (optional)
  if (result) {
    await this.handleProcessingResult?.(messageId, result);
  }
  
  // Success callback
  await this.onSuccess?.(messageId);
}

// Failure path
else {
  // Mark as failed (your implementation)
  await this.markItemAsFailed(messageId);
  
  // Failure callback with error details
  await this.onFailed?.(messageId, error);
}
Success
markItemAsCompleted
Failure
markItemAsFailed
7

Cleanup Virtual Processing State

Removes processing flag from Redis after operation completion.

// Cleanup in finally block - always executes
await this._cleanupProcessing(messageId);

// Removes Redis key: CONS:PREFIX:messageId
await this.redisService.deleteProcessingKey(messageId);

// Ensures:
// ✅ No stuck items after crashes (TTL backup)
// ✅ Failed items can be retried
// ✅ Clean Redis state
// ✅ Prevents memory leaks

Cleanup Mechanism

Redis TTL ensures automatic key expiration on process failure

Real-time Status Reporting

Status reports generated at 30-second intervals (configurable parameter).

    📊 EmailConsumer Status Report:

    ╔═══════════════════════════════════════════════════════════╗
    ║ Active Tasks:    2/3        (66.67% capacity)             ║
    ║ Queue Length:    5          (waiting for slot)            ║
    ║ Total Processed: 1,247      (41.57/min average)           ║
    ║ Success Rate:    98.2%      (23 failures)                 ║
    ║ Uptime:         00:30:00   (started 10:45 AM)             ║
    ╚═══════════════════════════════════════════════════════════╝

    Active Partitions: [0, 1, 2, 3]
    Last Error: "SMTP timeout" (2 minutes ago)
    Memory Usage: 124MB / 512MB (24.2%)
                        
Throughput
41.57/min
Messages processed
Success Rate
98.2%
23 failures
Capacity
66.67%
2 of 3 slots

Consumer Specifications

State Management
Virtual
Redis-only processing
Concurrency
Managed
Configurable parallelism
Recovery
Auto
TTL-based cleanup

Error Handling Configuration

Pausable Errors

Critical errors trigger automatic partition pause

DatabaseConnectionError
ServiceUnavailableError
RateLimitExceededError
CircuitBreakerOpenError

Recovery Delays

Error-specific delay intervals configured by type

// Error type detection & delays
if (isRateLimitError(error)) {
  delay = 30000; // 30 seconds
} else if (isNetworkError(error)) {
  delay = 10000; // 10 seconds
} else if (isDatabaseError(error)) {
  delay = 5000;  // 5 seconds
} else {
  delay = 2000;  // 2 seconds default
}

Partition Pause Flow

Critical Error

Detected

Partition

Paused

Recovery

Delay

Auto

Resume

Virtual Processing State Deep Dive

How It Works

1

Message Received

Consumer pulls message from Kafka

2

Redis Key Set

CONS:PREFIX:ID with TTL (e.g., 5 min)

3

Processing

Business logic executes

4

Key Removed

Cleanup on completion or TTL expiry

Benefits Over DB State

// Traditional DB approach ❌
await db.updateOne(
  { _id: id },
  { state: 'PROCESSING' }
);
// Problem: Stuck forever on crash

// Virtual state approach ✅
await redis.setex(
  \`CONS:EMAIL:\${id}\`,
  300, // 5 min TTL
  '1'
);
// Auto-cleanup on crash!
TTL should be longer than max processing time

Understanding Virtual Processing State

Traditional Approach ❌

  • Stores PROCESSING state in database
  • Items can get stuck if system crashes
  • Requires cleanup jobs for orphaned items
  • Extra database writes impact performance

Virtual State Approach ✅

  • PROCESSING state only in Redis (memory)
  • Automatic cleanup with Redis TTL
  • No stuck items after crashes
  • Better performance, less DB load

System Architecture Diagram

System Architecture Overview Producer Flow 1. Cron Scheduler * * * * * 2. Producer Start Batch: 100 3. Redis Get Excluded CONS:* SENT:* 4. Database Query PENDING 5. Redis Hash Check Compare Skip if same 6. Kafka Send acks=-1 7. Redis Mark Sent SETEX TTL: 20s Kafka Topic Partition 0 Partition 1 Partition 2 Partition 3 Consumer Flow 8. Consumer Group Pull Messages Partitions 9. Redis Check 1 EXISTS CONS:*:id 10. DB Check 2 Already Completed? 11. Redis Set State SETEX TTL: 300s 12. Process Business Logic Execute 13. DB Update COMPLETED or FAILED 14. Redis Cleanup DEL CONS:*:id Redis Shared State Producer Keys: EMAIL_SENT:id123 → "hash" TTL: 20 seconds Consumer Keys: CONS:EMAIL:id789 → "1" TTL: 300 seconds Coordination: • Producer reads CONS:* • Consumer reads SENT:* • No direct comms needed

System Components Deep Dive

Apache Kafka Cluster Configuration

# Broker Configuration
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Replication
default.replication.factor=2
min.insync.replicas=2
unclean.leader.election.enable=false

# Log Configuration
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

Key Design Decisions

  • Partitions: 4 per topic for parallelism
  • Replication: Factor 2 for high availability
  • Retention: 7 days for replay capability
  • Compression: GZIP for network efficiency
Ensure min.insync.replicas ≤ replication.factor

Redis Cluster Architecture

Key Patterns Used

Processing Keys

CONS:PREFIX:itemId

TTL: 300s (5 minutes)

Sent Message Keys

PREFIX_SENT:itemId

TTL: 20s (default, 2x processing)

Message Hash Storage

Value: md5(id+content+state)
TTL Logic: SENT TTL defaults to 2x PROCESSING TTL to ensure sent messages are tracked longer than processing operations.

Cluster Benefits

  • Sub-millisecond latency for checks
  • Automatic failover with Sentinel
  • Optional persistence for recovery
  • TTL-based automatic cleanup
Typical Memory
~500MB
For 1M keys
Operations
100K/s
Per instance

Horizontal Scaling Strategy

Producer Scaling

  • • Add producer instances
  • • No coordination needed
  • • Redis prevents duplicates
  • • Linear throughput increase

Kafka Scaling

  • • Add partitions (careful!)
  • • Add brokers for storage
  • • Increase replication
  • • Topic-level tuning

Consumer Scaling

  • • Max = partition count
  • • Auto-rebalancing
  • • Increase concurrency
  • • Add consumer groups

Scaling Rule of Thumb

Start with partitions = 2x expected consumer instances for growth room

Complete Flow Step-by-Step

Producer Flow

  • Query Database

    Fetch PENDING items with criteria

  • Check Redis Exclusions

    Get processing & sent IDs to exclude

  • Verify Content Hash

    Compare MD5 hash to prevent duplicates

  • Send to Kafka

    Batch send with stable keys & headers

  • Mark as Sent in Redis

    Store ID + hash with 20s TTL (default)

Consumer Flow

  • Pull from Kafka

    Consumer group coordinates partitions

  • Check Processing State

    Verify not already processing in Redis

  • Check Completion State

    Verify not already completed in DB

  • Set Processing in Redis

    Virtual state with 300s TTL

  • Execute Business Logic

    Process message & update DB

  • Cleanup Redis

    Remove processing key

Redis State Management Deep Dive

Producer Redis Operations

# Step 1: Get excluded IDs
KEYS CONS:EMAIL:*     → ["id1", "id2"]  # Processing
KEYS EMAIL_SENT:*     → ["id3", "id4"]  # Recently sent
excludedIds = [...new Set([...ids1, ...ids2])]

# Step 2: Check if content changed
GET EMAIL_SENT:id5    → "abc123..."     # Old hash
newHash = md5(id + email + state)       # New hash
if (oldHash === newHash) skip()

# Step 3: After sending to Kafka
SETEX EMAIL_SENT:id5 20 "xyz789..."     # Save new hash
# Expires in 20 seconds (default TTL)

Consumer Redis Operations

# Step 1: Check if processing
EXISTS CONS:EMAIL:id5 → 0               # Not processing

# Step 2: Set processing state
SETEX CONS:EMAIL:id5 300 "1"           # Mark processing
# Expires in 5 minutes (virtual state)

# Step 3: After processing complete
DEL CONS:EMAIL:id5                      # Remove key

# If consumer crashes:
# Key auto-expires after 5 minutes
# Item becomes available for retry

Key Insight: Coordination Through Redis

Redis acts as the coordination layer between Producer and Consumer:

  • Producer writes PREFIX_SENT:* keys to mark items as sent
  • Consumer reads these keys to avoid re-processing recently sent items
  • Consumer writes CONS:PREFIX:* keys to mark items as processing
  • Producer reads these keys to avoid sending items being processed
  • TTLs ensure automatic cleanup without manual intervention

Kafka Partitioning & Message Distribution

Key Generation

// Stable key format
key = `${messageType}-${itemId}`
// Example: "new-507f1f77bcf86cd799439011"

// Partition assignment
partition = hash(key) % partitionCount

Benefits

  • Same item always goes to same partition
  • Ordered processing per item
  • Even distribution across partitions

Consumer Assignment

Consumer 1 → Partitions 0, 1
Consumer 2 → Partitions 2, 3
Consumer 3 → Idle (standby)

Monitoring & Observability

Metrics to Monitor

  • • Message production rate
  • • Consumer lag per partition
  • • Processing success/failure rates
  • • Redis memory usage
  • • Kafka disk usage
  • • Processing duration p50/p95/p99

Recommended Stack

  • Metrics: Prometheus + Grafana
  • Logs: ELK Stack or Loki
  • Traces: Jaeger or Zipkin
  • Kafka: Kafka Manager/UI
  • Alerts: AlertManager

Key Dashboards

System Overview
Consumer Lag
Processing Stats
Error Tracking

Implementation Examples

const { AbstractConsumer } = require('@jonaskahn/kafka-job-orchestrator');

class EmailConsumer extends AbstractConsumer {
    // Define your states (PROCESSING is virtual - not in DB)
    static get STATES() {
        return {
            PENDING: 1,    // Ready to process
            COMPLETED: 2,  // Successfully sent
            FAILED: 3      // Failed to send
        };
    }

    constructor() {
        const config = {
            // REQUIRED: Kafka topic name
            topic: 'email-queue',
            
            // REQUIRED: Consumer group ID
            consumerGroup: 'email-processors',
            
            // REQUIRED: Redis configuration
            redisOptions: {
                // REQUIRED: Redis key prefix for deduplication
                keyPrefix: 'EMAIL',
                
                // OPTIONAL: TTL for processing keys (default: 300 seconds)
                processingTtl: 300,
                
                // OPTIONAL: TTL for sent message keys (default: 20 seconds)
                sentTtl: 20
            },
            
            // OPTIONAL: Topic auto-creation configuration
            topicOptions: {
                // OPTIONAL: Number of partitions (default: 1)
                partitions: 4,
                
                // OPTIONAL: Replication factor (default: 1)
                replicationFactor: 2,
                
                // OPTIONAL: Auto-create topic if not exists (default: true)
                autoCreate: true,
                
                // OPTIONAL: Additional Kafka topic configurations
                configEntries: {
                    'retention.ms': '604800000', // 7 days retention
                    'compression.type': 'gzip',
                    'cleanup.policy': 'delete'
                },
                
                // OPTIONAL: Additional topic options
                retentionMs: '604800000',
                segmentMs: '86400000', // 1 day
                compressionType: 'gzip',
                cleanupPolicy: 'delete'
            }
        };
        super(config);
        
        // Initialize your services
        this.emailService = new EmailService();
    }

    // REQUIRED: Extract ID from Kafka message
    getMessageId(messageData) {
        return messageData.id;
    }

    // REQUIRED: Extract display key for logging
    getMessageKey(messageData) {
        return messageData.data.recipient;
    }

    // REQUIRED: Your business logic - send the email
    async process(messageData) {
        const { recipient, subject, body } = messageData.data;
        
        // Send email using your service
        const result = await this.emailService.send({
            to: recipient,
            subject: subject,
            html: body
        });
        
        return result; // Return result for handleProcessingResult
    }

    // REQUIRED: Update database state to COMPLETED
    async markItemAsCompleted(emailId) {
        await EmailModel.updateOne(
            { _id: emailId },
            { state: EmailConsumer.STATES.COMPLETED, sentAt: new Date() }
        );
    }

    // REQUIRED: Update database state to FAILED
    async markItemAsFailed(emailId) {
        await EmailModel.updateOne(
            { _id: emailId },
            { state: EmailConsumer.STATES.FAILED, lastError: new Date() }
        );
    }

    // REQUIRED: Check if email was already sent
    async isItemCompleted(emailId) {
        const email = await EmailModel.findById(emailId);
        return email?.state === EmailConsumer.STATES.COMPLETED;
    }

    // OPTIONAL: Save email provider response
    async handleProcessingResult(emailId, result) {
        await EmailModel.updateOne(
            { _id: emailId },
            { messageId: result.messageId, provider: result.provider }
        );
    }

    // OPTIONAL: Called on successful processing
    async onSuccess(emailId) {
        console.log(`✅ Successfully processed email: ${emailId}`);
        // Add custom success handling, metrics, etc.
    }

    // OPTIONAL: Called on failed processing
    async onFailed(emailId, error) {
        console.error(`❌ Failed to process email: ${emailId}`, error);
        // Add custom error handling, alerts, etc.
    }
}

// Start the consumer
const consumer = new EmailConsumer();
await consumer.connect();
await consumer.startConsuming();

Consumer Methods to Implement

getMessageId(messageData) Required

Extract unique identifier from Kafka message.

async getMessageId(messageData) {
    return messageData.id;
}
getMessageKey(messageData) Required

Extract human-readable key for logging purposes.

async getMessageKey(messageData) {
    return messageData.email || messageData.name || messageData.id;
}
process(messageData) Required

Your main business logic. Process the message and return result.

async process(messageData) {
    // Your business logic here
    const result = await doWork(messageData);
    return result;
}
markItemAsCompleted(itemId) Required

Update database state to COMPLETED. Called after successful processing.

async markItemAsCompleted(itemId) {
    await this.collection.updateOne(
        { _id: itemId },
        { $set: { state: 'COMPLETED', completedAt: new Date() } }
    );
}
markItemAsFailed(itemId) Required

Update database state to FAILED. Called when processing fails.

async markItemAsFailed(itemId) {
    await this.collection.updateOne(
        { _id: itemId },
        { $set: { state: 'FAILED', failedAt: new Date() } }
    );
}
isItemCompleted(itemId) Required

Check if item is already completed to prevent reprocessing.

async isItemCompleted(itemId) {
    const item = await this.collection.findOne(
        { _id: itemId, state: 'COMPLETED' }
    );
    return item !== null;
}
handleProcessingResult(id, result) Optional

Handle processing result after successful business logic. Use to save content, update additional fields, etc.

async handleProcessingResult(id, result) {
    // Save processed content, update additional fields
    await this.saveProcessedContent(id, result);
}
onSuccess(itemId) Optional

Hook called after successful processing. Use for logging, metrics, etc.

async onSuccess(itemId) {
    console.log(`Successfully processed: ${itemId}`);
    // Add custom success handling
}
onFailed(itemId, error) Optional

Hook called when processing fails. Use for error handling, alerts, etc.

async onFailed(itemId, error) {
    console.error(`Failed to process: ${itemId}`, error);
    // Add custom error handling, alerts, etc.
}

Producer Methods to Implement

getNextProcessingItems(criteria, limit, excludedIds) Required

Fetch items from your data source based on criteria.

async getNextProcessingItems(criteria, limit, excludedIds) {
    const { state, priority } = criteria;
    return await Model.find({
        state: state,
        _id: { $nin: excludedIds }
    }).limit(limit);
}
getItemId(item) Required

Extract unique identifier from database item.

getItemId(item) {
    return item._id.toString();
}
getItemKey(item) Required

Extract human-readable key for logging purposes.

getItemKey(item) {
    return item.name || item.email || item._id;
}
onSuccess(itemId) Optional

Hook called after successfully sending message to Kafka.

async onSuccess(itemId) {
    console.log(`Successfully sent: ${itemId}`);
    // Add custom success handling
}
onFailed(itemId, error) Optional

Hook called when message sending fails.

async onFailed(itemId, error) {
    console.error(`Failed to send: ${itemId}`, error);
    // Add custom error handling, retry logic, etc.
}

Consumer Implementation Summary

Required Methods (6)

getMessageId(messageData) - Extract unique ID
getMessageKey(messageData) - Extract display key
process(messageData) - Business logic
markItemAsCompleted(itemId) - Update state to COMPLETED
markItemAsFailed(itemId) - Update state to FAILED
isItemCompleted(itemId) - Check completion status

Optional Hook Methods (3)

handleProcessingResult(id, result) - Handle results
onSuccess(id) - Success callback
onFailed(id, error) - Failure callback

Producer Implementation Summary

Required Methods (3)

getNextProcessingItems(criteria, limit, excludedIds) - Fetch data
getItemId(item) - Extract unique ID
getItemKey(item) - Extract display key

Optional Hook Methods (2)

onSuccess(itemId) - Success callback
onFailed(itemId, error) - Failure callback

Implementation Requirements

  • Consumer: 6 required methods, 3 optional hook methods
  • Producer: 3 required methods, 2 optional hook methods
  • • Processing state management handled by Redis automatically
  • • Virtual state pattern - no database persistence required for framework
  • • Environment variables required for configuration
  • • Automatic topic creation supported on connection
  • • All async methods should use async/await pattern