Skip to main content
Version: Next

Basic Operations

This guide covers the fundamental operations of the CommandKit Queue system: sending and receiving messages.

Setting Up the Driver

Before you can send or receive messages, you need to set up a driver. The driver handles the actual message queue backend.

import { setDriver } from '@commandkit/queue';
import { RedisPubSubDriver } from '@commandkit/queue/discordjs';
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

// Create a Redis connection
const redis = new Redis();

// Create a broker
const broker = new PubSubRedisBroker(redis);

// Create a driver
const driver = new RedisPubSubDriver(broker);

// Set the driver
setDriver(driver);

Sending Messages

Use the send function to publish messages to a topic.

import { send } from '@commandkit/queue';

// Send a simple message
await send('user-events', { userId: '123', action: 'login' });

// Send different types of data
await send('notifications', {
type: 'welcome',
userId: '123',
message: 'Welcome to our platform!',
});

await send('analytics', {
event: 'page_view',
page: '/dashboard',
timestamp: Date.now(),
});

Receiving Messages

Use the receive function to subscribe to messages from a topic.

import { receive } from '@commandkit/queue';

// Basic message handling
await receive('user-events', (message) => {
console.log(`User ${message.userId} performed ${message.action}`);
});

// Handle different message types
await receive('notifications', (message) => {
switch (message.type) {
case 'welcome':
console.log(`Welcome message for user ${message.userId}`);
break;
case 'reminder':
console.log(`Reminder: ${message.message}`);
break;
}
});

// Async message handling
await receive('analytics', async (message) => {
await processAnalyticsEvent(message);
});

Type Safety

You can define types for your messages to get better TypeScript support.

interface UserEvent {
userId: string;
action: 'login' | 'logout' | 'register';
timestamp?: number;
}

interface Notification {
type: 'welcome' | 'reminder' | 'alert';
userId: string;
message: string;
}

// Type-safe sending
await send('user-events', {
userId: '123',
action: 'login',
timestamp: Date.now(),
} as UserEvent);

// Type-safe receiving
await receive('user-events', (message: UserEvent) => {
console.log(`User ${message.userId} ${message.action}`);
});

Multiple Subscribers

You can have multiple subscribers listening to the same topic.

// Subscriber 1: Log all user events
await receive('user-events', (message) => {
console.log(`[LOG] User event: ${message.userId} - ${message.action}`);
});

// Subscriber 2: Update user analytics
await receive('user-events', async (message) => {
await updateUserAnalytics(message.userId, message.action);
});

// Subscriber 3: Send notifications
await receive('user-events', (message) => {
if (message.action === 'login') {
sendNotification(message.userId, 'Welcome back!');
}
});

Error Handling

Always handle errors when sending or receiving messages.

// Error handling for sending
try {
await send('user-events', { userId: '123', action: 'login' });
} catch (error) {
console.error('Failed to send message:', error);
}

// Error handling for receiving
await receive('user-events', (message) => {
try {
processUserEvent(message);
} catch (error) {
console.error('Failed to process message:', error);
}
});

Best Practices

1. Use Descriptive Topic Names

// Good
await send('user-authentication-events', message);
await send('order-processing-updates', message);

// Avoid
await send('events', message);
await send('data', message);

2. Structure Your Messages

// Good - structured message
await send('user-events', {
type: 'login',
userId: '123',
timestamp: Date.now(),
});

// Avoid - unstructured
await send('user-events', 'user logged in');

3. Handle Message Processing Gracefully

await receive('user-events', async (message) => {
try {
// Process the message
await processUserEvent(message);

// Acknowledge successful processing
console.log(`Processed event for user ${message.userId}`);
} catch (error) {
// Log error but don't crash
console.error(`Failed to process event for user ${message.userId}:`, error);

// Optionally retry or send to dead letter queue
await handleFailedMessage(message, error);
}
});

4. Use Appropriate Message Sizes

// Good - reasonable message size
await send('user-profile-updates', {
userId: '123',
changes: {
displayName: 'New Name',
avatar: 'https://example.com/avatar.jpg',
},
});

// Avoid - very large messages
await send('user-profile-updates', {
userId: '123',
fullProfile: {
/* massive object */
},
});

Next Steps