Publishing
There are three ways to publish realtime messages, each suited to different scenarios.
| Method | Durable | Retries re-fire | Step ID | Best for |
|---|---|---|---|---|
publish() | No | Yes | - | High-frequency streaming (tokens, progress) |
step.realtime.publish() | Yes | No (memoized) | Required | State transitions, final results |
inngest.realtime.publish() | No | - | - | Publishing outside a function |
publish(topicRef, data)
Available in the function handler arguments. Publishes immediately with no memoization - if the function retries, the publish fires again. Best for high-frequency, streaming-style updates where duplicates are acceptable.
- Name
topicRef- Type
- TopicRef<TData>
- Required
- required
- Description
A topic accessor from a channel instance (e.g.
ch.status). Carries the channel name, topic name, and schema.
- Name
data- Type
- TData
- Required
- required
- Description
The message payload. Must match the topic's schema. Validated at runtime if the schema supports it.
inngest.createFunction(
{ id: "stream-tokens", triggers: [{ event: "app/generate" }] },
async ({ event, step, publish }) => {
const ch = pipelineChannel({ runId: event.data.runId });
await step.run("generate", async () => {
const stream = await openai.chat.completions.create({
model: "gpt-4o-mini",
stream: true,
messages: [{ role: "user", content: event.data.prompt }],
});
let full = "";
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content ?? "";
if (token) {
full += token;
// Stream each token to subscribers
await publish(ch.tokens, { token });
}
}
return full;
});
},
);
publish() runs inside step.run() blocks as well as at the top level of your function handler. When used inside a step, the publish still fires on every retry of that step.
step.realtime.publish(id, topicRef, data)
A durable step that memoizes the publish. If the function retries past this step, the publish won't re-fire. The message appears in the function's execution graph. Best for important state transitions and final results.
- Name
id- Type
- string
- Required
- required
- Description
A unique step ID. Used for memoization and appears in function logs.
- Name
topicRef- Type
- TopicRef<TData>
- Required
- required
- Description
A topic accessor from a channel instance.
- Name
data- Type
- TData
- Required
- required
- Description
The message payload. Must match the topic's schema.
Returns Promise<TData>, the published data.
inngest.createFunction(
{ id: "process-upload", triggers: [{ event: "app/upload" }] },
async ({ event, step, publish }) => {
const ch = uploadsChannel({ uploadId: event.data.uploadId });
await publish(ch.status, { message: "Processing..." });
const result = await step.run("process", async () => {
return processUpload(event.data);
});
// Durable - won't re-publish on retry
await step.realtime.publish("publish-result", ch.result, {
success: true,
url: result.url,
});
},
);
inngest.realtime.publish(topicRef, data)
Publishes from server-side code outside of a function. Useful for triggering realtime updates from API routes or webhooks.
- Name
topicRef- Type
- TopicRef<TData>
- Required
- required
- Description
A topic accessor from a channel instance.
- Name
data- Type
- TData
- Required
- required
- Description
The message payload. Must match the topic's schema.
Returns Promise<void>.
import { inngest } from "./client";
import { alertsChannel } from "./channels";
// In an API route, webhook handler, etc.
export async function POST(req: Request) {
const body = await req.json();
await inngest.realtime.publish(alertsChannel.alert, {
message: body.message,
severity: body.severity,
});
return new Response("OK");
}
Choosing a publish method
Use publish() when:
- Streaming tokens, progress percentages, or log lines
- The data is ephemeral and duplicates on retry are fine
- You want minimum latency (no step overhead)
Use step.realtime.publish() when:
- Publishing a final result or state transition
- You need exactly-once delivery semantics (memoized)
- The publish should appear in the function's execution graph
Use inngest.realtime.publish() when:
- Publishing from outside a function (API routes, webhooks, cron jobs)
- Sending updates that aren't tied to a specific function run
Type safety
All three methods validate data against the topic's schema at both compile time and runtime:
const ch = pipelineChannel({ runId: "abc" });
// TypeScript error - missing required field
await publish(ch.status, { message: "ok" }); // ✓
await publish(ch.status, { wrong: "field" }); // ✗ compile error
// Runtime validation - throws if data doesn't match Zod schema
await publish(ch.status, someUntypedData); // validated at runtime