Publishing

There are three ways to publish realtime messages, each suited to different scenarios.

MethodDurableRetries re-fireStep IDBest for
publish()NoYes-High-frequency streaming (tokens, progress)
step.realtime.publish()YesNo (memoized)RequiredState transitions, final results
inngest.realtime.publish()No--Publishing outside a function

publish(topicRef, data)

Available in the function handler arguments. Publishes immediately with no memoization - if the function retries, the publish fires again. Best for high-frequency, streaming-style updates where duplicates are acceptable.

  • Name
    topicRef
    Type
    TopicRef<TData>
    Required
    required
    Description

    A topic accessor from a channel instance (e.g. ch.status). Carries the channel name, topic name, and schema.

  • Name
    data
    Type
    TData
    Required
    required
    Description

    The message payload. Must match the topic's schema. Validated at runtime if the schema supports it.

inngest.createFunction(
  { id: "stream-tokens", triggers: [{ event: "app/generate" }] },
  async ({ event, step, publish }) => {
    const ch = pipelineChannel({ runId: event.data.runId });

    await step.run("generate", async () => {
      const stream = await openai.chat.completions.create({
        model: "gpt-4o-mini",
        stream: true,
        messages: [{ role: "user", content: event.data.prompt }],
      });

      let full = "";
      for await (const chunk of stream) {
        const token = chunk.choices[0]?.delta?.content ?? "";
        if (token) {
          full += token;
          // Stream each token to subscribers
          await publish(ch.tokens, { token });
        }
      }
      return full;
    });
  },
);

publish() runs inside step.run() blocks as well as at the top level of your function handler. When used inside a step, the publish still fires on every retry of that step.

step.realtime.publish(id, topicRef, data)

A durable step that memoizes the publish. If the function retries past this step, the publish won't re-fire. The message appears in the function's execution graph. Best for important state transitions and final results.

  • Name
    id
    Type
    string
    Required
    required
    Description

    A unique step ID. Used for memoization and appears in function logs.

  • Name
    topicRef
    Type
    TopicRef<TData>
    Required
    required
    Description

    A topic accessor from a channel instance.

  • Name
    data
    Type
    TData
    Required
    required
    Description

    The message payload. Must match the topic's schema.

Returns Promise<TData>, the published data.

inngest.createFunction(
  { id: "process-upload", triggers: [{ event: "app/upload" }] },
  async ({ event, step, publish }) => {
    const ch = uploadsChannel({ uploadId: event.data.uploadId });

    await publish(ch.status, { message: "Processing..." });

    const result = await step.run("process", async () => {
      return processUpload(event.data);
    });

    // Durable - won't re-publish on retry
    await step.realtime.publish("publish-result", ch.result, {
      success: true,
      url: result.url,
    });
  },
);

inngest.realtime.publish(topicRef, data)

Publishes from server-side code outside of a function. Useful for triggering realtime updates from API routes or webhooks.

  • Name
    topicRef
    Type
    TopicRef<TData>
    Required
    required
    Description

    A topic accessor from a channel instance.

  • Name
    data
    Type
    TData
    Required
    required
    Description

    The message payload. Must match the topic's schema.

Returns Promise<void>.

import { inngest } from "./client";
import { alertsChannel } from "./channels";

// In an API route, webhook handler, etc.
export async function POST(req: Request) {
  const body = await req.json();

  await inngest.realtime.publish(alertsChannel.alert, {
    message: body.message,
    severity: body.severity,
  });

  return new Response("OK");
}

Choosing a publish method

Use publish() when:

  • Streaming tokens, progress percentages, or log lines
  • The data is ephemeral and duplicates on retry are fine
  • You want minimum latency (no step overhead)

Use step.realtime.publish() when:

  • Publishing a final result or state transition
  • You need exactly-once delivery semantics (memoized)
  • The publish should appear in the function's execution graph

Use inngest.realtime.publish() when:

  • Publishing from outside a function (API routes, webhooks, cron jobs)
  • Sending updates that aren't tied to a specific function run

Type safety

All three methods validate data against the topic's schema at both compile time and runtime:

const ch = pipelineChannel({ runId: "abc" });

// TypeScript error - missing required field
await publish(ch.status, { message: "ok" }); // ✓
await publish(ch.status, { wrong: "field" }); // ✗ compile error

// Runtime validation - throws if data doesn't match Zod schema
await publish(ch.status, someUntypedData); // validated at runtime