Technical Documentation and Architecture Overview
Queries Redis for active processing keys and recently sent items to build an exclusion list.
// Get processing IDs (from consumers)
const processingIds = await redisService.getProcessingIds();
// Example: ['id1', 'id2'] from CONS:PREFIX:*
// Get recently sent IDs (from producers)
const sentIds = await redisService.getSentIds();
// Example: ['id3', 'id4'] from PREFIX_SENT:*
const excludedIds = [...new Set([...processingIds, ...sentIds])];
Executes database query with specified criteria, excluding items found in Redis.
// Abstract method - you implement this
async getNextProcessingItems(criteria, limit, excludedIds) {
// Example MongoDB implementation
return await collection.find({
...criteria, // { state: STATES.PENDING }
_id: { $nin: excludedIds }
})
.limit(limit)
.toArray();
}
// Usage in your producer
await this.produceMessages(
{ state: STATES.PENDING, priority: 'high' },
50,
"new"
);
Generates MD5 hash of item content and compares with stored hash values.
// Generate hash from item content
const messageHash = redisService.generateMessageHash(item);
// Uses: md5(id + URL/email + state)
// Check if already sent with same content
const isRecentlySent = await redisService.isSentRecently(
itemId,
messageHash
);
// Compare stored hash with current
if (storedHash === messageHash) {
skip(); // Same content already sent
}
Constructs Kafka messages with deterministic keys and sends to configured topic.
// Create messages with stable keys
const messages = messageService.createKafkaMessages(
itemsWithHashes,
messageType
);
// Message structure
{
key: "new-a1b2c3d4e5f6", // Stable key
value: JSON.stringify({
id: item._id.toString(),
data: item,
type: messageType
}),
headers: {
'message-type': 'new',
'item-id': 'id123',
'content-hash': 'abc123...',
'timestamp': '1234567890'
}
}
// Send with acks=-1 (all replicas)
await kafkaProducer.send({
topic: this.topic,
messages,
acks: -1,
timeout: 30000
});
Records item ID and content hash in Redis with configured TTL.
// For each successfully sent item
for (const { item, messageHash } of itemsToProcess) {
const itemId = this.getItemId(item);
// Store item ID with message hash
await redisService.setSentMessage(itemId, messageHash);
// Key: PREFIX_SENT:itemId
// Value: messageHash
// TTL: 20 seconds (default, 2x processing TTL)
// Call success handler
await this.onSuccess(itemId);
}
Storage Components
Tracks sent item identifiers to prevent duplicate transmission.
// Redis key pattern
EMAIL_SENT:item123 → "hash123..."
// TTL: 20 seconds (default, 2x processing TTL)
// Check during query
const sentIds = await redisService.getSentIds();
// Query excludes these IDs
// After sending
await redisService.setSentMessage(
itemId,
messageHash
);
Compares message hashes to detect duplicate content across different IDs.
// Generate hash from content
const hash = md5(id + URL/email + state);
// Check if same content was sent
const storedHash = await redisService
.getSentMessageHash(itemId);
if (storedHash === hash) {
skip(); // Same content already sent
}
Operation Sequence
class MyProducer extends AbstractProducer {
// 1. Query your data source
async getNextProcessingItems(criteria, limit, excludedIds) {
return await myDB.find(criteria, excludedIds);
}
// 2. Get item ID
getItemId(item) {
return item._id.toString();
}
// 3. Get display key
getItemKey(item) {
return item.name || item.email;
}
}
Consumer establishes connection to Kafka broker and begins message consumption.
// Consumer configuration
const consumer = kafkaClient.consumer({
groupId: this.consumerGroup,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
retry: {
retries: 10,
initialRetryTime: 1000,
maxRetryTime: 30000
}
});
// Start consuming with concurrency control
await consumer.run({
partitionsConsumedConcurrently: 1, // configurable
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
await this._handleIncomingMessage(
topic, partition, message, heartbeat, pause
);
}
});
Manages concurrent message processing within configured limits.
// Concurrent processing with queue management
await this.processor.processWithConcurrency(async () => {
await this._processItemWithDuplicationChecks(
messageData,
pause // Can pause partition on error
);
}, heartbeat);
// Automatic heartbeat during long processing
// Prevents consumer timeout/rebalancing
Verifies processing status in Redis and completion status in database before proceeding.
// Step 2: Check Redis for processing
const messageId = await this.getMessageId(messageData);
const redisKey = `CONS:${prefix}:${messageId}`;
if (await redis.exists(redisKey)) {
console.log('⏭️ Already processing in Redis');
return; // Skip this message
}
// Step 3: Check DB for completion
if (await this.isItemCompleted(messageId)) {
console.log('✅ Already completed in DB');
return; // Skip this message
}
// Both checks passed - proceed
console.log('🆕 New item to process');
Sets temporary processing flag in Redis with configured TTL value.
// Mark as processing with configurable TTL
await this._startProcessing(messageId);
// Sets Redis key: CONS:PREFIX:messageId
await this.redisService.setProcessingKey(
messageId,
300 // 5 minute TTL (configurable)
);
// Virtual state benefits:
// ✅ No DB pollution
// ✅ Auto-expires on crash
// ✅ Clean recovery
Virtual State Pattern
Processing state maintained in Redis memory without database persistence
Executes business logic with error handling and recovery mechanisms.
try {
// Execute your business logic
const result = await this.process(messageData);
// Handle success
await this._handleProcessingSuccess(messageId, result);
} catch (error) {
// Intelligent error handling
if (this._shouldPausePartitionOnError(error)) {
// Pause partition for critical errors
await pause();
console.log(`⏸️ Pausing partition: ${error.message}`);
}
// Handle failure
await this._handleProcessingFailure(messageId, error);
// Recovery delay based on error type
const delay = this._getErrorRecoveryDelay(error);
await new Promise(resolve => setTimeout(resolve, delay));
}
Updates database with processing outcome and result data.
// Success path
if (success) {
// Mark as completed (your implementation)
await this.markItemAsCompleted(messageId);
// Handle processing result (optional)
if (result) {
await this.handleProcessingResult?.(messageId, result);
}
// Success callback
await this.onSuccess?.(messageId);
}
// Failure path
else {
// Mark as failed (your implementation)
await this.markItemAsFailed(messageId);
// Failure callback with error details
await this.onFailed?.(messageId, error);
}
Removes processing flag from Redis after operation completion.
// Cleanup in finally block - always executes
await this._cleanupProcessing(messageId);
// Removes Redis key: CONS:PREFIX:messageId
await this.redisService.deleteProcessingKey(messageId);
// Ensures:
// ✅ No stuck items after crashes (TTL backup)
// ✅ Failed items can be retried
// ✅ Clean Redis state
// ✅ Prevents memory leaks
Cleanup Mechanism
Redis TTL ensures automatic key expiration on process failure
Status reports generated at 30-second intervals (configurable parameter).
📊 EmailConsumer Status Report:
╔═══════════════════════════════════════════════════════════╗
║ Active Tasks: 2/3 (66.67% capacity) ║
║ Queue Length: 5 (waiting for slot) ║
║ Total Processed: 1,247 (41.57/min average) ║
║ Success Rate: 98.2% (23 failures) ║
║ Uptime: 00:30:00 (started 10:45 AM) ║
╚═══════════════════════════════════════════════════════════╝
Active Partitions: [0, 1, 2, 3]
Last Error: "SMTP timeout" (2 minutes ago)
Memory Usage: 124MB / 512MB (24.2%)
Critical errors trigger automatic partition pause
Error-specific delay intervals configured by type
// Error type detection & delays
if (isRateLimitError(error)) {
delay = 30000; // 30 seconds
} else if (isNetworkError(error)) {
delay = 10000; // 10 seconds
} else if (isDatabaseError(error)) {
delay = 5000; // 5 seconds
} else {
delay = 2000; // 2 seconds default
}
Critical Error
Detected
Partition
Paused
Recovery
Delay
Auto
Resume
Message Received
Consumer pulls message from Kafka
Redis Key Set
CONS:PREFIX:ID with TTL (e.g., 5 min)
Processing
Business logic executes
Key Removed
Cleanup on completion or TTL expiry
// Traditional DB approach ❌
await db.updateOne(
{ _id: id },
{ state: 'PROCESSING' }
);
// Problem: Stuck forever on crash
// Virtual state approach ✅
await redis.setex(
\`CONS:EMAIL:\${id}\`,
300, // 5 min TTL
'1'
);
// Auto-cleanup on crash!
# Broker Configuration
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Replication
default.replication.factor=2
min.insync.replicas=2
unclean.leader.election.enable=false
# Log Configuration
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
Processing Keys
CONS:PREFIX:itemId
TTL: 300s (5 minutes)
Sent Message Keys
PREFIX_SENT:itemId
TTL: 20s (default, 2x processing)
Message Hash Storage
Value: md5(id+content+state)
Scaling Rule of Thumb
Start with partitions = 2x expected consumer instances for growth room
Query Database
Fetch PENDING items with criteria
Check Redis Exclusions
Get processing & sent IDs to exclude
Verify Content Hash
Compare MD5 hash to prevent duplicates
Send to Kafka
Batch send with stable keys & headers
Mark as Sent in Redis
Store ID + hash with 20s TTL (default)
Pull from Kafka
Consumer group coordinates partitions
Check Processing State
Verify not already processing in Redis
Check Completion State
Verify not already completed in DB
Set Processing in Redis
Virtual state with 300s TTL
Execute Business Logic
Process message & update DB
Cleanup Redis
Remove processing key
# Step 1: Get excluded IDs
KEYS CONS:EMAIL:* → ["id1", "id2"] # Processing
KEYS EMAIL_SENT:* → ["id3", "id4"] # Recently sent
excludedIds = [...new Set([...ids1, ...ids2])]
# Step 2: Check if content changed
GET EMAIL_SENT:id5 → "abc123..." # Old hash
newHash = md5(id + email + state) # New hash
if (oldHash === newHash) skip()
# Step 3: After sending to Kafka
SETEX EMAIL_SENT:id5 20 "xyz789..." # Save new hash
# Expires in 20 seconds (default TTL)
# Step 1: Check if processing
EXISTS CONS:EMAIL:id5 → 0 # Not processing
# Step 2: Set processing state
SETEX CONS:EMAIL:id5 300 "1" # Mark processing
# Expires in 5 minutes (virtual state)
# Step 3: After processing complete
DEL CONS:EMAIL:id5 # Remove key
# If consumer crashes:
# Key auto-expires after 5 minutes
# Item becomes available for retry
Key Insight: Coordination Through Redis
Redis acts as the coordination layer between Producer and Consumer:
PREFIX_SENT:*
keys to mark items as sentCONS:PREFIX:*
keys to mark items as processing// Stable key format
key = `${messageType}-${itemId}`
// Example: "new-507f1f77bcf86cd799439011"
// Partition assignment
partition = hash(key) % partitionCount
const { AbstractConsumer } = require('@jonaskahn/kafka-job-orchestrator');
class EmailConsumer extends AbstractConsumer {
// Define your states (PROCESSING is virtual - not in DB)
static get STATES() {
return {
PENDING: 1, // Ready to process
COMPLETED: 2, // Successfully sent
FAILED: 3 // Failed to send
};
}
constructor() {
const config = {
// REQUIRED: Kafka topic name
topic: 'email-queue',
// REQUIRED: Consumer group ID
consumerGroup: 'email-processors',
// REQUIRED: Redis configuration
redisOptions: {
// REQUIRED: Redis key prefix for deduplication
keyPrefix: 'EMAIL',
// OPTIONAL: TTL for processing keys (default: 300 seconds)
processingTtl: 300,
// OPTIONAL: TTL for sent message keys (default: 20 seconds)
sentTtl: 20
},
// OPTIONAL: Topic auto-creation configuration
topicOptions: {
// OPTIONAL: Number of partitions (default: 1)
partitions: 4,
// OPTIONAL: Replication factor (default: 1)
replicationFactor: 2,
// OPTIONAL: Auto-create topic if not exists (default: true)
autoCreate: true,
// OPTIONAL: Additional Kafka topic configurations
configEntries: {
'retention.ms': '604800000', // 7 days retention
'compression.type': 'gzip',
'cleanup.policy': 'delete'
},
// OPTIONAL: Additional topic options
retentionMs: '604800000',
segmentMs: '86400000', // 1 day
compressionType: 'gzip',
cleanupPolicy: 'delete'
}
};
super(config);
// Initialize your services
this.emailService = new EmailService();
}
// REQUIRED: Extract ID from Kafka message
getMessageId(messageData) {
return messageData.id;
}
// REQUIRED: Extract display key for logging
getMessageKey(messageData) {
return messageData.data.recipient;
}
// REQUIRED: Your business logic - send the email
async process(messageData) {
const { recipient, subject, body } = messageData.data;
// Send email using your service
const result = await this.emailService.send({
to: recipient,
subject: subject,
html: body
});
return result; // Return result for handleProcessingResult
}
// REQUIRED: Update database state to COMPLETED
async markItemAsCompleted(emailId) {
await EmailModel.updateOne(
{ _id: emailId },
{ state: EmailConsumer.STATES.COMPLETED, sentAt: new Date() }
);
}
// REQUIRED: Update database state to FAILED
async markItemAsFailed(emailId) {
await EmailModel.updateOne(
{ _id: emailId },
{ state: EmailConsumer.STATES.FAILED, lastError: new Date() }
);
}
// REQUIRED: Check if email was already sent
async isItemCompleted(emailId) {
const email = await EmailModel.findById(emailId);
return email?.state === EmailConsumer.STATES.COMPLETED;
}
// OPTIONAL: Save email provider response
async handleProcessingResult(emailId, result) {
await EmailModel.updateOne(
{ _id: emailId },
{ messageId: result.messageId, provider: result.provider }
);
}
// OPTIONAL: Called on successful processing
async onSuccess(emailId) {
console.log(`✅ Successfully processed email: ${emailId}`);
// Add custom success handling, metrics, etc.
}
// OPTIONAL: Called on failed processing
async onFailed(emailId, error) {
console.error(`❌ Failed to process email: ${emailId}`, error);
// Add custom error handling, alerts, etc.
}
}
// Start the consumer
const consumer = new EmailConsumer();
await consumer.connect();
await consumer.startConsuming();
getMessageId(messageData)
Required
Extract unique identifier from Kafka message.
async getMessageId(messageData) {
return messageData.id;
}
getMessageKey(messageData)
Required
Extract human-readable key for logging purposes.
async getMessageKey(messageData) {
return messageData.email || messageData.name || messageData.id;
}
process(messageData)
Required
Your main business logic. Process the message and return result.
async process(messageData) {
// Your business logic here
const result = await doWork(messageData);
return result;
}
markItemAsCompleted(itemId)
Required
Update database state to COMPLETED. Called after successful processing.
async markItemAsCompleted(itemId) {
await this.collection.updateOne(
{ _id: itemId },
{ $set: { state: 'COMPLETED', completedAt: new Date() } }
);
}
markItemAsFailed(itemId)
Required
Update database state to FAILED. Called when processing fails.
async markItemAsFailed(itemId) {
await this.collection.updateOne(
{ _id: itemId },
{ $set: { state: 'FAILED', failedAt: new Date() } }
);
}
isItemCompleted(itemId)
Required
Check if item is already completed to prevent reprocessing.
async isItemCompleted(itemId) {
const item = await this.collection.findOne(
{ _id: itemId, state: 'COMPLETED' }
);
return item !== null;
}
handleProcessingResult(id, result)
Optional
Handle processing result after successful business logic. Use to save content, update additional fields, etc.
async handleProcessingResult(id, result) {
// Save processed content, update additional fields
await this.saveProcessedContent(id, result);
}
onSuccess(itemId)
Optional
Hook called after successful processing. Use for logging, metrics, etc.
async onSuccess(itemId) {
console.log(`Successfully processed: ${itemId}`);
// Add custom success handling
}
onFailed(itemId, error)
Optional
Hook called when processing fails. Use for error handling, alerts, etc.
async onFailed(itemId, error) {
console.error(`Failed to process: ${itemId}`, error);
// Add custom error handling, alerts, etc.
}
getNextProcessingItems(criteria, limit, excludedIds)
Required
Fetch items from your data source based on criteria.
async getNextProcessingItems(criteria, limit, excludedIds) {
const { state, priority } = criteria;
return await Model.find({
state: state,
_id: { $nin: excludedIds }
}).limit(limit);
}
getItemId(item)
Required
Extract unique identifier from database item.
getItemId(item) {
return item._id.toString();
}
getItemKey(item)
Required
Extract human-readable key for logging purposes.
getItemKey(item) {
return item.name || item.email || item._id;
}
onSuccess(itemId)
Optional
Hook called after successfully sending message to Kafka.
async onSuccess(itemId) {
console.log(`Successfully sent: ${itemId}`);
// Add custom success handling
}
onFailed(itemId, error)
Optional
Hook called when message sending fails.
async onFailed(itemId, error) {
console.error(`Failed to send: ${itemId}`, error);
// Add custom error handling, retry logic, etc.
}
getMessageId(messageData)
- Extract unique IDgetMessageKey(messageData)
- Extract display key
process(messageData)
- Business logicmarkItemAsCompleted(itemId)
- Update state to
COMPLETED
markItemAsFailed(itemId)
- Update state to FAILED
isItemCompleted(itemId)
- Check completion status
handleProcessingResult(id, result)
- Handle results
onSuccess(id)
- Success callbackonFailed(id, error)
- Failure callbackgetNextProcessingItems(criteria, limit, excludedIds)
-
Fetch datagetItemId(item)
- Extract unique IDgetItemKey(item)
- Extract display keyonSuccess(itemId)
- Success callbackonFailed(itemId, error)
- Failure callbackasync/await
pattern