Skip to main content
Version: Next

Advanced Features

This guide covers advanced features of the CommandKit Queue system, including custom drivers, error handling, and performance optimizations.

Custom Drivers

You can create custom drivers to work with any message queue backend. A driver must implement the MessageQueue interface.

Creating a Custom Driver

import type { MessageQueue, Awaitable } from '@commandkit/queue';

class CustomQueueDriver implements MessageQueue {
private subscribers = new Map<
string,
Set<(message: any) => Awaitable<void>>
>();

async send(topic: string, message: any): Promise<void> {
// Implement your message sending logic here
console.log(`Sending message to topic ${topic}:`, message);

// Example: Send to your custom queue backend
await this.sendToBackend(topic, message);
}

async receive(
topic: string,
handler: (message: any) => Awaitable<void>,
): Promise<void> {
// Subscribe to the topic
if (!this.subscribers.has(topic)) {
this.subscribers.set(topic, new Set());
}

this.subscribers.get(topic)!.add(handler);

// Start listening for messages on this topic
await this.startListening(topic);
}

async close(): Promise<void> {
// Clean up resources
this.subscribers.clear();
await this.cleanup();
}

private async sendToBackend(topic: string, message: any): Promise<void> {
// Implement your backend communication
// This could be RabbitMQ, Apache Kafka, AWS SQS, etc.
}

private async startListening(topic: string): Promise<void> {
// Start listening for messages from your backend
// When a message arrives, notify all subscribers
}

private async cleanup(): Promise<void> {
// Clean up any connections or resources
}
}

RabbitMQ Driver Example

import amqp from 'amqplib';
import type { MessageQueue, Awaitable } from '@commandkit/queue';

class RabbitMQDriver implements MessageQueue {
private connection?: amqp.Connection;
private channel?: amqp.Channel;
private subscribers = new Map<
string,
Set<(message: any) => Awaitable<void>>
>();

constructor(private url: string) {}

async send(topic: string, message: any): Promise<void> {
if (!this.channel) {
throw new Error('Driver not initialized');
}

await this.channel.assertExchange(topic, 'fanout', { durable: false });
this.channel.publish(topic, '', Buffer.from(JSON.stringify(message)));
}

async receive(
topic: string,
handler: (message: any) => Awaitable<void>,
): Promise<void> {
if (!this.channel) {
throw new Error('Driver not initialized');
}

await this.channel.assertExchange(topic, 'fanout', { durable: false });
const queue = await this.channel.assertQueue('', { exclusive: true });

await this.channel.bindQueue(queue.queue, topic, '');

this.channel.consume(queue.queue, async (msg) => {
if (msg) {
const content = JSON.parse(msg.content.toString());
await handler(content);
this.channel!.ack(msg);
}
});
}

async connect(): Promise<void> {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createChannel();
}

async close(): Promise<void> {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}

// Usage
const driver = new RabbitMQDriver('amqp://localhost');
await driver.connect();
setDriver(driver);

AWS SQS Driver Example

import {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
} from '@aws-sdk/client-sqs';
import type { MessageQueue, Awaitable } from '@commandkit/queue';

class SQSDriver implements MessageQueue {
private client: SQSClient;
private queueUrls = new Map<string, string>();
private polling = new Map<string, NodeJS.Timeout>();

constructor(region: string) {
this.client = new SQSClient({ region });
}

async send(topic: string, message: any): Promise<void> {
const queueUrl = await this.getQueueUrl(topic);

await this.client.send(
new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message),
}),
);
}

async receive(
topic: string,
handler: (message: any) => Awaitable<void>,
): Promise<void> {
const queueUrl = await this.getQueueUrl(topic);

// Start polling for messages
const pollMessages = async () => {
try {
const response = await this.client.send(
new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20,
}),
);

for (const message of response.Messages || []) {
try {
const content = JSON.parse(message.Body!);
await handler(content);

// Delete the message after successful processing
await this.client.send(
new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle!,
}),
);
} catch (error) {
console.error('Failed to process message:', error);
}
}
} catch (error) {
console.error('Error polling messages:', error);
}

// Continue polling
this.polling.set(topic, setTimeout(pollMessages, 1000));
};

pollMessages();
}

async close(): Promise<void> {
// Stop all polling
for (const timeout of this.polling.values()) {
clearTimeout(timeout);
}
this.polling.clear();
}

private async getQueueUrl(topic: string): Promise<string> {
if (this.queueUrls.has(topic)) {
return this.queueUrls.get(topic)!;
}

// Create queue if it doesn't exist
// Implementation depends on your AWS setup
const queueUrl = `https://sqs.${region}.amazonaws.com/123456789012/${topic}`;
this.queueUrls.set(topic, queueUrl);
return queueUrl;
}
}

Error Handling Strategies

Retry Logic

Implement retry logic for failed message processing:

import { receive } from '@commandkit/queue';

async function withRetry<T>(
fn: () => Promise<T>,
maxRetries: number = 3,
delay: number = 1000,
): Promise<T> {
let lastError: Error;

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;

if (attempt === maxRetries) {
throw lastError;
}

// Exponential backoff
await new Promise((resolve) => setTimeout(resolve, delay * attempt));
}
}

throw lastError!;
}

// Usage with retry
await receive('user-events', async (message) => {
await withRetry(
async () => {
await processUserEvent(message);
},
3,
1000,
);
});

Dead Letter Queue

Implement a dead letter queue for failed messages:

import { send, receive } from '@commandkit/queue';

// Process messages with dead letter queue
await receive('user-events', async (message) => {
try {
await processUserEvent(message);
} catch (error) {
console.error('Failed to process message:', error);

// Send to dead letter queue
await send('dead-letter-queue', {
originalTopic: 'user-events',
message,
error: error.message,
timestamp: Date.now(),
retryCount: 0,
});
}
});

// Process dead letter queue
await receive('dead-letter-queue', async (message) => {
const { originalTopic, message: originalMessage, retryCount } = message;

if (retryCount < 3) {
// Retry processing
try {
await processUserEvent(originalMessage);
} catch (error) {
// Increment retry count and send back to dead letter queue
await send('dead-letter-queue', {
...message,
retryCount: retryCount + 1,
});
}
} else {
// Log final failure
console.error('Message permanently failed:', originalMessage);
}
});

Circuit Breaker Pattern

Implement a circuit breaker to prevent cascading failures:

class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';

constructor(
private threshold: number = 5,
private timeout: number = 60000,
) {}

async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = 'HALF_OPEN';
} else {
throw new Error('Circuit breaker is OPEN');
}
}

try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}

private onSuccess(): void {
this.failures = 0;
this.state = 'CLOSED';
}

private onFailure(): void {
this.failures++;
this.lastFailureTime = Date.now();

if (this.failures >= this.threshold) {
this.state = 'OPEN';
}
}
}

// Usage
const circuitBreaker = new CircuitBreaker(5, 60000);

await receive('user-events', async (message) => {
await circuitBreaker.execute(async () => {
await processUserEvent(message);
});
});

Performance Optimizations

Message Batching

Batch multiple messages for better performance:

class MessageBatcher {
private batches = new Map<string, any[]>();
private timers = new Map<string, NodeJS.Timeout>();

constructor(
private batchSize: number = 100,
private batchTimeout: number = 1000,
) {}

async addToBatch(topic: string, message: any): Promise<void> {
if (!this.batches.has(topic)) {
this.batches.set(topic, []);
}

this.batches.get(topic)!.push(message);

// Flush batch if it's full
if (this.batches.get(topic)!.length >= this.batchSize) {
await this.flushBatch(topic);
} else {
// Set timeout to flush batch
if (this.timers.has(topic)) {
clearTimeout(this.timers.get(topic)!);
}

this.timers.set(
topic,
setTimeout(() => {
this.flushBatch(topic);
}, this.batchTimeout),
);
}
}

private async flushBatch(topic: string): Promise<void> {
const batch = this.batches.get(topic);
if (!batch || batch.length === 0) return;

// Send batched message
await send(`${topic}-batch`, {
messages: batch,
timestamp: Date.now(),
});

// Clear batch and timer
this.batches.set(topic, []);
if (this.timers.has(topic)) {
clearTimeout(this.timers.get(topic)!);
this.timers.delete(topic);
}
}
}

// Usage
const batcher = new MessageBatcher(50, 500);

// Instead of sending individual messages
await send('user-events', message);

// Use batcher
await batcher.addToBatch('user-events', message);

Connection Pooling

Implement connection pooling for better resource management:

import Redis from 'ioredis';

class RedisConnectionPool {
private pool: Redis[] = [];
private currentIndex = 0;

constructor(
private size: number = 10,
private config: any = {},
) {}

async initialize(): Promise<void> {
for (let i = 0; i < this.size; i++) {
const redis = new Redis(this.config);
this.pool.push(redis);
}
}

getConnection(): Redis {
const connection = this.pool[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.size;
return connection;
}

async close(): Promise<void> {
await Promise.all(this.pool.map((redis) => redis.quit()));
this.pool = [];
}
}

// Usage
const pool = new RedisConnectionPool(10, {
host: 'localhost',
port: 6379,
});

await pool.initialize();

// Use pooled connections
const redis = pool.getConnection();
const broker = new PubSubRedisBroker(redis);

Monitoring and Metrics

Message Processing Metrics

Track message processing performance:

class QueueMetrics {
private metrics = {
messagesSent: 0,
messagesReceived: 0,
processingTimes: [] as number[],
errors: 0,
};

recordMessageSent(): void {
this.metrics.messagesSent++;
}

recordMessageReceived(): void {
this.metrics.messagesReceived++;
}

recordProcessingTime(time: number): void {
this.metrics.processingTimes.push(time);

// Keep only last 1000 times
if (this.metrics.processingTimes.length > 1000) {
this.metrics.processingTimes.shift();
}
}

recordError(): void {
this.metrics.errors++;
}

getMetrics() {
const avgProcessingTime =
this.metrics.processingTimes.length > 0
? this.metrics.processingTimes.reduce((a, b) => a + b, 0) /
this.metrics.processingTimes.length
: 0;

return {
...this.metrics,
avgProcessingTime,
errorRate:
this.metrics.messagesReceived > 0
? this.metrics.errors / this.metrics.messagesReceived
: 0,
};
}
}

// Usage
const metrics = new QueueMetrics();

await receive('user-events', async (message) => {
const startTime = Date.now();
metrics.recordMessageReceived();

try {
await processUserEvent(message);
metrics.recordProcessingTime(Date.now() - startTime);
} catch (error) {
metrics.recordError();
throw error;
}
});

// Log metrics periodically
setInterval(() => {
console.log('Queue metrics:', metrics.getMetrics());
}, 60000);

Testing

Unit Testing

Test your queue implementations:

import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { setDriver, send, receive } from '@commandkit/queue';

class MockDriver implements MessageQueue {
private messages: any[] = [];
private handlers = new Map<string, (message: any) => void>();

async send(topic: string, message: any): Promise<void> {
this.messages.push({ topic, message });

// Trigger handler if exists
const handler = this.handlers.get(topic);
if (handler) {
handler(message);
}
}

async receive(topic: string, handler: (message: any) => void): Promise<void> {
this.handlers.set(topic, handler);
}

getMessages() {
return this.messages;
}
}

describe('Queue', () => {
let mockDriver: MockDriver;

beforeEach(() => {
mockDriver = new MockDriver();
setDriver(mockDriver);
});

afterEach(() => {
// Clean up
});

it('should send and receive messages', async () => {
const receivedMessages: any[] = [];

await receive('test-topic', (message) => {
receivedMessages.push(message);
});

await send('test-topic', { test: 'data' });

expect(receivedMessages).toHaveLength(1);
expect(receivedMessages[0]).toEqual({ test: 'data' });
});
});

Next Steps

You now have a comprehensive understanding of CommandKit Queue's advanced features. You can:

  • Create custom drivers for your specific message queue backend
  • Implement robust error handling with retries and dead letter queues
  • Optimize performance with batching and connection pooling
  • Monitor your queue system with metrics
  • Test your queue implementations thoroughly