Documentation Index
Fetch the complete documentation index at: https://mintlify.com/motiadev/motia/llms.txt
Use this file to discover all available pages before exploring further.
This example demonstrates how to build production-grade background workers that process tasks asynchronously with:
- Queue-based task distribution
- Multiple worker instances
- State-driven workflows
- Cron-triggered maintenance
Use cases
- Image processing pipelines
- Email batch sending
- Data import/export
- Report generation
- Cleanup and maintenance tasks
Architecture
This example processes messages through a pipeline:
- HTTP endpoint: Receives messages and enqueues them
- Queue worker: Processes messages in the background
- Cron job: Handles periodic cleanup
Step 1: Create the message endpoint
Create steps/send-message.step.ts to receive and enqueue messages:
steps/send-message.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'SendMessage',
description: 'Receives messages and enqueues for background processing',
triggers: [
{
type: 'http',
method: 'POST',
path: '/messages',
bodySchema: z.object({
text: z.string().min(1).max(500),
priority: z.enum(['low', 'normal', 'high']).default('normal'),
metadata: z.record(z.string()).optional(),
}),
responseSchema: {
200: z.object({
messageId: z.string(),
status: z.string(),
}),
400: z.object({ error: z.string() }),
},
},
],
enqueues: ['message.sent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
request,
{ enqueue, logger, state }
) => {
const { text, priority, metadata } = request.body
const messageId = `msg-${Date.now()}-${Math.random().toString(36).substring(7)}`
logger.info('Received message', { messageId, priority })
// Store message metadata
await state.set('messages', messageId, {
text,
priority,
metadata,
status: 'pending',
createdAt: new Date().toISOString(),
})
// Enqueue for processing
await enqueue({
topic: 'message.sent',
data: { messageId, text, priority },
})
return {
status: 200,
body: { messageId, status: 'enqueued' },
}
}
Step 2: Create the background worker
Create steps/process-message.step.ts to process messages:
steps/process-message.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const inputSchema = z.object({
messageId: z.string(),
text: z.string(),
priority: z.enum(['low', 'normal', 'high']),
})
export const config = {
name: 'ProcessMessage',
description: 'Background worker for message processing',
triggers: [
{
type: 'queue',
topic: 'message.sent',
input: inputSchema,
},
],
enqueues: ['message.processed'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
input,
{ logger, state, enqueue }
) => {
const { messageId, text, priority } = input
logger.info('Processing message', { messageId, priority })
try {
// Simulate processing work (e.g., API calls, transformations)
const processedText = await processText(text)
const processingTime = await simulateWork(priority)
// Update state
await state.update('messages', messageId, {
status: 'processed',
processedAt: new Date().toISOString(),
processedText,
processingTime,
})
logger.info('Message processed successfully', {
messageId,
processingTime,
})
// Enqueue next step
await enqueue({
topic: 'message.processed',
data: { messageId, processedText },
})
} catch (error) {
logger.error('Message processing failed', {
messageId,
error: error.message,
})
// Update state with error
await state.update('messages', messageId, {
status: 'failed',
error: error.message,
failedAt: new Date().toISOString(),
})
throw error // Will be handled by queue retry mechanism
}
}
// Simulate text processing
async function processText(text: string): Promise<string> {
// Example: uppercase, word count, etc.
return text.toUpperCase()
}
// Simulate processing time based on priority
async function simulateWork(priority: string): Promise<number> {
const delay = priority === 'high' ? 100 : priority === 'normal' ? 500 : 1000
await new Promise((resolve) => setTimeout(resolve, delay))
return delay
}
Worker features
- Error handling: Catches errors and updates state
- Retry support: Throwing errors triggers queue retry mechanism
- Priority processing: Adjusts work based on priority
- State tracking: Updates message status throughout lifecycle
Step 3: Add periodic cleanup
Create steps/cleanup-messages.step.ts for maintenance:
steps/cleanup-messages.step.ts
import type { Handlers, StepConfig } from 'motia'
interface Message {
messageId: string
status: string
createdAt: string
processedAt?: string
}
export const config = {
name: 'CleanupMessages',
description: 'Clean up old processed messages',
triggers: [
{
type: 'cron',
expression: '0 0 2 * * * *', // Every day at 2 AM
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
_input,
{ logger, state }
) => {
logger.info('Starting message cleanup')
const messages = await state.list<Message>('messages')
const cutoffDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // 30 days ago
let deletedCount = 0
for (const message of messages) {
const messageDate = new Date(message.processedAt || message.createdAt)
if (
message.status === 'processed' &&
messageDate < cutoffDate
) {
await state.delete('messages', message.messageId)
deletedCount++
}
}
logger.info('Cleanup completed', {
totalMessages: messages.length,
deletedCount,
})
}
Advanced: Multiple workers with state trigger
Create steps/notify-completion.step.ts to react to state changes:
steps/notify-completion.step.ts
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'NotifyCompletion',
description: 'Send notification when message is processed',
triggers: [
{
type: 'state',
namespace: 'messages',
filter: {
status: 'processed',
},
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
input,
{ logger }
) => {
logger.info('Message completed', {
messageId: input.key,
data: input.value,
})
// Send notification (e.g., email, webhook, etc.)
await sendNotification({
type: 'message_completed',
messageId: input.key,
completedAt: input.value.processedAt,
})
}
async function sendNotification(notification: any) {
// Implementation for notification service
console.log('Notification sent:', notification)
}
Testing the worker
Send a message
curl -X POST http://localhost:3000/messages \
-H "Content-Type: application/json" \
-d '{
"text": "Process this message",
"priority": "high",
"metadata": {
"source": "api",
"userId": "user-123"
}
}'
Response:
{
"messageId": "msg-1234567890-abc123",
"status": "enqueued"
}
Monitor processing
Check the iii Console to see:
- Message received by HTTP endpoint
- Message enqueued to
message.sent topic
- Background worker processing the message
- State updated with processing results
- Notification sent via state trigger
Configuration: Queue strategies
Configure queue behavior in iii-config.yaml:
queues:
message.sent:
max_retries: 3
retry_delay: 5000
concurrency: 5
dead_letter_queue: message.failed
Queue options
- max_retries: Number of retry attempts for failed messages
- retry_delay: Delay between retries in milliseconds
- concurrency: Number of concurrent workers
- dead_letter_queue: Queue for permanently failed messages
Scaling workers
The iii engine automatically distributes work across multiple instances. To scale:
# Start multiple instances
iii -c iii-config.yaml --instances 3
Each instance will process messages from the same queue, providing:
- Load distribution
- High availability
- Fault tolerance
What you learned
Queue triggers
Process messages asynchronously with queues
Error handling
Handle failures and implement retry logic
State triggers
React to state changes automatically
Cron jobs
Schedule periodic maintenance tasks
Next steps
ChessArena example
See a full production app with background workers
Workflows guide
Build complex multi-step workflows