Subscribing

Server-side subscription covers two operations: minting tokens (for client-side use) and consuming streams (for server-side processing). Tokens are short-lived JWTs scoped to a specific channel and set of topics.


getSubscriptionToken(app, options)

Mints a scoped subscription token on the server. The token authorizes a client to subscribe to specific topics on a specific channel.

  • Name
    app
    Type
    Inngest
    Required
    required
    Description

    Your Inngest client instance.

  • Name
    options.channel
    Type
    ChannelInstance | string
    Required
    required
    Description

    The channel to authorize. Can be a channel instance or a plain string.

  • Name
    options.topics
    Type
    string[]
    Required
    required
    Description

    The topics the token grants access to. The client can only subscribe to these topics.

Returns Promise<Token> with key (the JWT string), channel, and topics.

import { getSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const token = await getSubscriptionToken(inngest, {
  channel: ch,
  topics: ["status", "tokens"],
});

// token.key     - JWT string for the client
// token.channel  - the channel instance/name
// token.topics   - ["status", "tokens"]

Always mint tokens on the server. Never expose your Inngest signing key to the client. The token is scoped to the specified channel and topics, so a client cannot use it to access other channels.

Framework examples

// app/actions.ts
"use server";
import { getSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";

export async function getRealtimeToken(runId: string) {
  const ch = pipelineChannel({ runId });
  const token = await getSubscriptionToken(inngest, {
    channel: ch,
    topics: ["status", "tokens"],
  });
  return token.key;
}

subscribe(options)

Creates a server-side subscription to a realtime channel. Returns a ReadableStream by default, or accepts callbacks for event-driven consumption.

Stream subscription

Without onMessage, subscribe returns an async-iterable ReadableStream.

  • Name
    app
    Type
    Inngest
    Required
    optional
    Description

    Your Inngest client instance. Used to resolve connection details.

  • Name
    channel
    Type
    ChannelInstance | string
    Required
    required
    Description

    The channel to subscribe to.

  • Name
    topics
    Type
    string[]
    Required
    required
    Description

    The topics to subscribe to.

  • Name
    key
    Type
    string
    Required
    optional
    Description

    A pre-minted JWT token key. If not provided, app is used to mint a token automatically.

  • Name
    validate
    Type
    boolean
    Required
    optional
    Description

    Enable schema validation on incoming messages. Defaults to true.

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const stream = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status", "tokens"],
});

// ReadableStream - use getReader()
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  console.log(value.topic, value.data);
}

Stream methods

The returned stream has additional helper methods:

  • Name
    getJsonStream()
    Type
    ReadableStream<Message>
    Required
    optional
    Description

    Returns a new ReadableStream that emits parsed JSON messages. Use this when you want to pipe messages to another consumer.

  • Name
    getEncodedStream()
    Type
    ReadableStream<Uint8Array>
    Required
    optional
    Description

    Returns a new ReadableStream with SSE-formatted Uint8Array chunks (data: {...}\n\n). Useful for streaming responses to clients via Server-Sent Events.

  • Name
    close(reason?)
    Type
    (reason?: string) => void
    Required
    optional
    Description

    Closes the underlying WebSocket connection.

  • Name
    unsubscribe(reason?)
    Type
    (reason?: string) => void
    Required
    optional
    Description

    Alias for close().

// Stream JSON to another consumer
const jsonStream = stream.getJsonStream();
const response = new Response(jsonStream);

// Stream as SSE
const sseStream = stream.getEncodedStream();
return new Response(sseStream, {
  headers: { "Content-Type": "text/event-stream" },
});

// Clean up
stream.close();

Callback subscription

Pass onMessage to use an event-driven pattern instead of streams.

  • Name
    onMessage
    Type
    (message: Message) => void
    Required
    required
    Description

    Called for each incoming message.

  • Name
    onError
    Type
    (error: unknown) => void
    Required
    optional
    Description

    Called when a connection error occurs.

Returns Promise<{ close, unsubscribe }>.

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const sub = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status"],
  onMessage: (message) => {
    console.log(`[${message.topic}]`, message.data);
  },
  onError: (err) => {
    console.error("Subscription error:", err);
  },
});

// Clean up when done
sub.close();

Message shape

Each message received from a subscription includes:

  • Name
    topic
    Type
    string
    Required
    optional
    Description

    The topic name this message was published to.

  • Name
    channel
    Type
    string
    Required
    optional
    Description

    The resolved channel name.

  • Name
    data
    Type
    TData
    Required
    optional
    Description

    The message payload, typed according to the topic's schema.

  • Name
    kind
    Type
    "data" | "run" | "datastream-start" | "datastream-end" | "chunk"
    Required
    optional
    Description

    The message kind. Most messages are "data". Run lifecycle updates are "run".

  • Name
    runId
    Type
    string | undefined
    Required
    optional
    Description

    The Inngest function run ID, if the message was published from within a function.

  • Name
    fnId
    Type
    string | undefined
    Required
    optional
    Description

    The Inngest function ID.

  • Name
    createdAt
    Type
    Date
    Required
    optional
    Description

    When the message was created.

Server-side stream example

A complete example using subscribe to monitor a long-running function and react to its output:

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { workflowChannel } from "./channels";

async function monitorWorkflow(runId: string) {
  const ch = workflowChannel({ runId });

  const stream = await subscribe({
    app: inngest,
    channel: ch,
    topics: ["status", "result"],
  });

  const reader = stream.getReader();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    if (value.topic === "status") {
      console.log("Status:", value.data.message);
    }

    if (value.topic === "result") {
      console.log("Result:", value.data);
      stream.close();
    }
  }
}