Realtime

The v4 SDK includes a built-in realtime system for streaming updates from your Inngest functions to client applications. There's no separate package to install: channels, publishing, and subscriptions are all part of inngest.

Realtime is built into the v4 SDK. If you're using v3, see the v3 realtime docs.

Key concepts

  • Channels define a named scope for messages. They can be static or parameterized with runtime values like a runId.
  • Topics are typed message streams within a channel, each with a schema.
  • Publishing sends data to a topic from inside a function (with publish or step.realtime.publish) or from server-side code (inngest.realtime.publish).
  • Subscribing consumes messages via a React hook (useRealtime) or server-side stream/callback (subscribe).

Imports

WhatImport from
realtime, staticSchema"inngest"
useRealtime, getSubscriptionToken"inngest/react"
subscribe"inngest/realtime"
publish, step.realtime.publishFunction handler context (no import needed)

Quick start

1. Define a channel

src/inngest/channels.ts

import { realtime, staticSchema } from "inngest";
import { z } from "zod";

export const pipelineChannel = realtime.channel({
  name: ({ runId }: { runId: string }) => `pipeline:${runId}`,
  topics: {
    status: {
      schema: z.object({ message: z.string(), step: z.string().optional() }),
    },
    tokens: {
      schema: staticSchema<{ token: string }>(),
    },
  },
});

2. Publish from a function

src/inngest/functions.ts

import { inngest } from "./client";
import { pipelineChannel } from "./channels";

export const generate = inngest.createFunction(
  { id: "generate", triggers: [{ event: "app/generate" }] },
  async ({ event, step, publish }) => {
    const ch = pipelineChannel({ runId: event.data.runId });

    // Fire-and-forget status update
    await publish(ch.status, { message: "Starting..." });

    const result = await step.run("do-work", async () => {
      return doWork();
    });

    // Durable publish: memoized, won't re-fire on retry
    await step.realtime.publish("final-status", ch.status, {
      message: "Done!",
      step: "complete",
    });

    return result;
  },
);

3. Subscribe from React

src/app/page.tsx

"use client";
import { useRealtime, getSubscriptionToken } from "inngest/react";
import { pipelineChannel } from "../inngest/channels";

export default function PipelinePage({ runId }: { runId: string }) {
  const ch = pipelineChannel({ runId });

  const { status, latest, history, runStatus } = useRealtime({
    channel: ch,
    topics: ["status", "tokens"],
    token: () =>
      getSubscriptionToken(inngest, { channel: ch, topics: ["status", "tokens"] })
        .then((t) => t.key!),
  });

  return (
    <div>
      <p>Connection: {status} | Run: {runStatus}</p>
      {latest.status && <p>Status: {latest.status.data.message}</p>}
      <ul>
        {history.map((msg, i) => (
          <li key={i}>[{msg.topic}] {JSON.stringify(msg.data)}</li>
        ))}
      </ul>
    </div>
  );
}

Next steps

  • Channels & topics: defining channels, parameterized names, schemas, and type inference
  • Publishing: the three publish modes and when to use each
  • useRealtime: full React hook API reference
  • Subscribing: server-side tokens, streams, and callback subscriptions