Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/motiadev/motia/llms.txt

Use this file to discover all available pages before exploring further.

Workflows in Motia are built by connecting Steps together through queue events. Each Step processes data and enqueues events for the next Step, creating powerful event-driven architectures.

Simple workflow pattern

The most basic workflow connects an API endpoint to a background worker:
// Step 1: API receives request
export const config = {
  name: 'SendMessage',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/messages',
    }
  ],
  enqueues: ['message.sent']
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (req, { enqueue }) => {
  await enqueue({
    topic: 'message.sent',
    data: { text: req.body.text }
  })
  return { status: 200, body: { ok: true } }
}
// Step 2: Worker processes message
export const config = {
  name: 'ProcessMessage',
  triggers: [
    {
      type: 'queue',
      topic: 'message.sent',
    }
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger }) => {
  logger.info('Processing message', input)
}

Multi-step workflow

Build complex workflows by chaining multiple Steps:
1

Step 1: Create the order

Start with an API endpoint that creates an order:
// steps/create-order.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'CreateOrder',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/orders',
      bodySchema: z.object({
        pet: z.object({ name: z.string(), photoUrl: z.string() }),
        foodOrder: z.object({ quantity: z.number() }),
      }),
    },
  ],
  enqueues: ['order.created'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (request, { enqueue, logger, traceId }) => {
  const { pet, foodOrder } = request.body
  
  logger.info('Creating order', { pet, foodOrder, traceId })
  
  const petRecord = await createPet(pet)
  
  await enqueue({
    topic: 'order.created',
    data: {
      petId: petRecord.id,
      quantity: foodOrder.quantity,
      email: 'customer@example.com',
    },
  })
  
  return { status: 200, body: petRecord }
}
2

Step 2: Process the order

Process the order and create a shipping record:
// steps/process-order.step.ts
import { queue } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  petId: z.string(),
  quantity: z.number(),
  email: z.string(),
})

export const config = {
  name: 'ProcessOrder',
  triggers: [queue('order.created', { input: orderSchema })],
  enqueues: ['order.processed'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue, traceId }) => {
  logger.info('Processing order', { input, traceId })
  
  const order = await createOrder({
    ...input,
    shipDate: new Date().toISOString(),
    status: 'placed',
  })
  
  await state.set('orders', order.id, order)
  
  await enqueue({
    topic: 'order.processed',
    data: {
      orderId: order.id,
      email: input.email,
      status: order.status,
    },
  })
}
3

Step 3: Send notification

Notify the customer that their order was processed:
// steps/send-notification.step.ts
import { queue, jsonSchema } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'SendNotification',
  triggers: [
    queue('order.processed', {
      input: jsonSchema(
        z.object({
          orderId: z.string(),
          email: z.string(),
          status: z.string(),
        }),
      ),
    }),
  ],
  enqueues: [],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, traceId }) => {
  logger.info('Sending notification', {
    orderId: input.orderId,
    email: input.email.replace(/(?<=.{2}).(?=.*@)/g, '*'),
    traceId,
  })
  
  // Send email notification
  await emailService.send({
    to: input.email,
    template: 'order-confirmation',
    data: {
      orderId: input.orderId,
      status: input.status,
    },
  })
}

Conditional workflows

Use trigger conditions to create branching logic:
import type { TriggerCondition } from 'motia'

const isHighValue: TriggerCondition<{ amount: number }> = (input) => {
  return input.amount > 1000
}

const isVerifiedUser: TriggerCondition<ApiRequest<{ user: { verified: boolean } }>> = (input, ctx) => {
  if (ctx.trigger.type !== 'http') return false
  return input.body.user.verified === true
}

export const config = {
  name: 'ProcessOrder',
  triggers: [
    {
      type: 'queue',
      topic: 'order.created',
      input: z.object({ amount: z.number() }),
      condition: isHighValue, // Only process high-value orders
    },
    {
      type: 'http',
      method: 'POST',
      path: '/orders/manual',
      condition: isVerifiedUser, // Only allow verified users
    },
  ],
  enqueues: ['order.processed'],
} as const satisfies StepConfig

Multi-trigger workflows

Handle different trigger types with pattern matching:
export const config = {
  name: 'MultiTriggerOrder',
  triggers: [
    queue('order.created', { input: orderSchema }),
    http('POST', '/orders/manual', { bodySchema: orderSchema }),
    {
      type: 'cron',
      expression: '0 */6 * * *', // Every 6 hours
    },
  ],
  enqueues: ['order.processed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, ctx) => {
  return ctx.match({
    http: async ({ request }) => {
      ctx.logger.info('Processing manual order', { body: request.body })
      
      const order = await processOrder(request.body)
      
      await ctx.enqueue({
        topic: 'order.processed',
        data: { orderId: order.id, source: 'manual' },
      })
      
      return { status: 200, body: { order } }
    },
    
    queue: async (input) => {
      ctx.logger.info('Processing queued order', { input })
      
      const order = await processOrder(input)
      
      await ctx.enqueue({
        topic: 'order.processed',
        data: { orderId: order.id, source: 'queue' },
      })
    },
    
    cron: async () => {
      ctx.logger.info('Processing batch orders')
      
      const pendingOrders = await ctx.state.list('pending-orders')
      
      for (const order of pendingOrders) {
        await ctx.enqueue({
          topic: 'order.created',
          data: order,
        })
      }
    },
  })
}

Parallel workflows

Enqueue multiple events to process work in parallel:
export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
  logger.info('Starting parallel workflow')
  
  // Enqueue multiple tasks to run in parallel
  await Promise.all([
    enqueue({ topic: 'process-payment', data: { orderId: input.orderId } }),
    enqueue({ topic: 'check-inventory', data: { items: input.items } }),
    enqueue({ topic: 'send-confirmation', data: { email: input.email } }),
  ])
  
  logger.info('All tasks enqueued')
}

State-driven workflows

Use state triggers to react to state changes:
export const config = {
  name: 'OnOrderStateChange',
  triggers: [
    {
      type: 'state',
      namespace: 'orders',
      event: 'set',
    },
  ],
  enqueues: ['order.status.changed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
  const { key, new_value, old_value } = input
  
  if (old_value?.status !== new_value?.status) {
    logger.info('Order status changed', {
      orderId: key,
      oldStatus: old_value?.status,
      newStatus: new_value?.status,
    })
    
    await enqueue({
      topic: 'order.status.changed',
      data: {
        orderId: key,
        status: new_value.status,
      },
    })
  }
}

Workflow observability

Track workflows across Steps using traceId:
export const handler: Handlers<typeof config> = async (input, { logger, traceId, enqueue }) => {
  logger.info('Step started', { traceId, input })
  
  // The traceId is automatically propagated to enqueued events
  await enqueue({
    topic: 'next-step',
    data: input,
  })
  
  logger.info('Step completed', { traceId })
}
View the complete workflow trace in the iii Console dashboard.

Error handling in workflows

Handle errors at each Step:
export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue }) => {
  try {
    const result = await processOrder(input)
    
    await state.set('orders', result.id, {
      ...result,
      status: 'completed',
    })
    
    await enqueue({
      topic: 'order.completed',
      data: result,
    })
    
  } catch (error) {
    logger.error('Order processing failed', { error, input })
    
    await state.set('orders', input.orderId, {
      status: 'failed',
      error: error.message,
    })
    
    await enqueue({
      topic: 'order.failed',
      data: { orderId: input.orderId, error: error.message },
    })
  }
}

Queue triggers

Learn about queue trigger configuration

Background jobs

Process work asynchronously

State triggers

React to state changes

Context API

Access workflow context