Documentation Index Fetch the complete documentation index at: https://mintlify.com/motiadev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Workflows in Motia are built by connecting Steps together through queue events. Each Step processes data and enqueues events for the next Step, creating powerful event-driven architectures.
Simple workflow pattern
The most basic workflow connects an API endpoint to a background worker:
// Step 1: API receives request
export const config = {
name: 'SendMessage' ,
triggers: [
{
type: 'http' ,
method: 'POST' ,
path: '/messages' ,
}
],
enqueues: [ 'message.sent' ]
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( req , { enqueue }) => {
await enqueue ({
topic: 'message.sent' ,
data: { text: req . body . text }
})
return { status: 200 , body: { ok: true } }
}
// Step 2: Worker processes message
export const config = {
name: 'ProcessMessage' ,
triggers: [
{
type: 'queue' ,
topic: 'message.sent' ,
}
],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( input , { logger }) => {
logger . info ( 'Processing message' , input )
}
Multi-step workflow
Build complex workflows by chaining multiple Steps:
Step 1: Create the order
Start with an API endpoint that creates an order: // steps/create-order.step.ts
import type { Handlers , StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'CreateOrder' ,
triggers: [
{
type: 'http' ,
method: 'POST' ,
path: '/orders' ,
bodySchema: z . object ({
pet: z . object ({ name: z . string (), photoUrl: z . string () }),
foodOrder: z . object ({ quantity: z . number () }),
}),
},
],
enqueues: [ 'order.created' ],
flows: [ 'order-workflow' ],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( request , { enqueue , logger , traceId }) => {
const { pet , foodOrder } = request . body
logger . info ( 'Creating order' , { pet , foodOrder , traceId })
const petRecord = await createPet ( pet )
await enqueue ({
topic: 'order.created' ,
data: {
petId: petRecord . id ,
quantity: foodOrder . quantity ,
email: 'customer@example.com' ,
},
})
return { status: 200 , body: petRecord }
}
Step 2: Process the order
Process the order and create a shipping record: // steps/process-order.step.ts
import { queue } from 'motia'
import { z } from 'zod'
const orderSchema = z . object ({
petId: z . string (),
quantity: z . number (),
email: z . string (),
})
export const config = {
name: 'ProcessOrder' ,
triggers: [ queue ( 'order.created' , { input: orderSchema })],
enqueues: [ 'order.processed' ],
flows: [ 'order-workflow' ],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( input , { logger , state , enqueue , traceId }) => {
logger . info ( 'Processing order' , { input , traceId })
const order = await createOrder ({
... input ,
shipDate: new Date (). toISOString (),
status: 'placed' ,
})
await state . set ( 'orders' , order . id , order )
await enqueue ({
topic: 'order.processed' ,
data: {
orderId: order . id ,
email: input . email ,
status: order . status ,
},
})
}
Step 3: Send notification
Notify the customer that their order was processed: // steps/send-notification.step.ts
import { queue , jsonSchema } from 'motia'
import { z } from 'zod'
export const config = {
name: 'SendNotification' ,
triggers: [
queue ( 'order.processed' , {
input: jsonSchema (
z . object ({
orderId: z . string (),
email: z . string (),
status: z . string (),
}),
),
}),
],
enqueues: [],
flows: [ 'order-workflow' ],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( input , { logger , traceId }) => {
logger . info ( 'Sending notification' , {
orderId: input . orderId ,
email: input . email . replace ( / (?<= . {2} ) . (?= . * @ ) / g , '*' ),
traceId ,
})
// Send email notification
await emailService . send ({
to: input . email ,
template: 'order-confirmation' ,
data: {
orderId: input . orderId ,
status: input . status ,
},
})
}
Conditional workflows
Use trigger conditions to create branching logic:
import type { TriggerCondition } from 'motia'
const isHighValue : TriggerCondition <{ amount : number }> = ( input ) => {
return input . amount > 1000
}
const isVerifiedUser : TriggerCondition < ApiRequest <{ user : { verified : boolean } }>> = ( input , ctx ) => {
if ( ctx . trigger . type !== 'http' ) return false
return input . body . user . verified === true
}
export const config = {
name: 'ProcessOrder' ,
triggers: [
{
type: 'queue' ,
topic: 'order.created' ,
input: z . object ({ amount: z . number () }),
condition: isHighValue , // Only process high-value orders
},
{
type: 'http' ,
method: 'POST' ,
path: '/orders/manual' ,
condition: isVerifiedUser , // Only allow verified users
},
],
enqueues: [ 'order.processed' ],
} as const satisfies StepConfig
Multi-trigger workflows
Handle different trigger types with pattern matching:
export const config = {
name: 'MultiTriggerOrder' ,
triggers: [
queue ( 'order.created' , { input: orderSchema }),
http ( 'POST' , '/orders/manual' , { bodySchema: orderSchema }),
{
type: 'cron' ,
expression: '0 */6 * * *' , // Every 6 hours
},
],
enqueues: [ 'order.processed' ],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( _ , ctx ) => {
return ctx . match ({
http : async ({ request }) => {
ctx . logger . info ( 'Processing manual order' , { body: request . body })
const order = await processOrder ( request . body )
await ctx . enqueue ({
topic: 'order.processed' ,
data: { orderId: order . id , source: 'manual' },
})
return { status: 200 , body: { order } }
},
queue : async ( input ) => {
ctx . logger . info ( 'Processing queued order' , { input })
const order = await processOrder ( input )
await ctx . enqueue ({
topic: 'order.processed' ,
data: { orderId: order . id , source: 'queue' },
})
},
cron : async () => {
ctx . logger . info ( 'Processing batch orders' )
const pendingOrders = await ctx . state . list ( 'pending-orders' )
for ( const order of pendingOrders ) {
await ctx . enqueue ({
topic: 'order.created' ,
data: order ,
})
}
},
})
}
Parallel workflows
Enqueue multiple events to process work in parallel:
export const handler : Handlers < typeof config > = async ( input , { enqueue , logger }) => {
logger . info ( 'Starting parallel workflow' )
// Enqueue multiple tasks to run in parallel
await Promise . all ([
enqueue ({ topic: 'process-payment' , data: { orderId: input . orderId } }),
enqueue ({ topic: 'check-inventory' , data: { items: input . items } }),
enqueue ({ topic: 'send-confirmation' , data: { email: input . email } }),
])
logger . info ( 'All tasks enqueued' )
}
State-driven workflows
Use state triggers to react to state changes:
export const config = {
name: 'OnOrderStateChange' ,
triggers: [
{
type: 'state' ,
namespace: 'orders' ,
event: 'set' ,
},
],
enqueues: [ 'order.status.changed' ],
} as const satisfies StepConfig
export const handler : Handlers < typeof config > = async ( input , { logger , enqueue }) => {
const { key , new_value , old_value } = input
if ( old_value ?. status !== new_value ?. status ) {
logger . info ( 'Order status changed' , {
orderId: key ,
oldStatus: old_value ?. status ,
newStatus: new_value ?. status ,
})
await enqueue ({
topic: 'order.status.changed' ,
data: {
orderId: key ,
status: new_value . status ,
},
})
}
}
Workflow observability
Track workflows across Steps using traceId:
export const handler : Handlers < typeof config > = async ( input , { logger , traceId , enqueue }) => {
logger . info ( 'Step started' , { traceId , input })
// The traceId is automatically propagated to enqueued events
await enqueue ({
topic: 'next-step' ,
data: input ,
})
logger . info ( 'Step completed' , { traceId })
}
View the complete workflow trace in the iii Console dashboard.
Error handling in workflows
Handle errors at each Step:
export const handler : Handlers < typeof config > = async ( input , { logger , state , enqueue }) => {
try {
const result = await processOrder ( input )
await state . set ( 'orders' , result . id , {
... result ,
status: 'completed' ,
})
await enqueue ({
topic: 'order.completed' ,
data: result ,
})
} catch ( error ) {
logger . error ( 'Order processing failed' , { error , input })
await state . set ( 'orders' , input . orderId , {
status: 'failed' ,
error: error . message ,
})
await enqueue ({
topic: 'order.failed' ,
data: { orderId: input . orderId , error: error . message },
})
}
}
Queue triggers Learn about queue trigger configuration
Background jobs Process work asynchronously
State triggers React to state changes
Context API Access workflow context