Docs
CodeRabbit
Cloudflare
AG Grid
SerpAPI
Netlify
OpenRouter
Neon
WorkOS
Clerk
Electric
PowerSync
Sentry
Railway
Prisma
Strapi
Unkey
CodeRabbit
Cloudflare
AG Grid
SerpAPI
Netlify
OpenRouter
Neon
WorkOS
Clerk
Electric
PowerSync
Sentry
Railway
Prisma
Strapi
Unkey
Class References
Function References
Interface References
Type Alias References
Variable References
Chat & Streaming

Connection Adapters

A connection adapter is the piece that decides how chunks get from your server to the ChatClient (and through it, to your framework's useChat). Everything else in TanStack AI — chunk processing, message reassembly, tool calls, UI updates — is transport-agnostic. The adapter is the only thing that touches the network.

This page covers every supported transport, when to pick which, and how to build a custom one.

Pick a Transport

You have…Use
A normal HTTP server and want the defaultfetchServerSentEvents
An environment that blocks SSE (some edge runtimes, RN, strict proxies)fetchHttpStream
A TanStack Start (or other) server function that already returns an async iterablestream
An RPC framework like Cap'n Web, gRPC-Web, or tRPCrpcStream
A single long-lived WebSocket (or BroadcastChannel, postMessage, shared worker) serving many runsCustom subscribe / send adapter
Standard SSE but with custom fetch wrapping (auth refresh, retries)fetchServerSentEvents with fetchClient
Something else entirely (HTTP/3, Server-Sent Events over a different protocol, etc.)Custom connect adapter

All adapters produce the same StreamChunk events (AG-UI Protocol) — the choice is purely about transport.

Server-Sent Events (SSE)

The default. SSE is well-supported across browsers, transparent through most proxies, and easy to debug. Pair it with toServerSentEventsResponse() on the server.

typescript
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages, sendMessage } = useChat({
  connection: fetchServerSentEvents("/api/chat"),
});
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages, sendMessage } = useChat({
  connection: fetchServerSentEvents("/api/chat"),
});

Dynamic URL and headers. Pass functions when the value depends on per-request state (current user, fresh token):

typescript
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents(
    () => `/api/chat?user=${currentUserId}`,
    () => ({
      headers: { Authorization: `Bearer ${getToken()}` },
    }),
  ),
});
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents(
    () => `/api/chat?user=${currentUserId}`,
    () => ({
      headers: { Authorization: `Bearer ${getToken()}` },
    }),
  ),
});

Static body. Anything in options.body is merged into the AG-UI forwardedProps payload sent to your server. Per-message data passed to sendMessage wins over this:

typescript
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", {
    body: { provider: "openai", model: "gpt-5.1" },
  }),
});
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", {
    body: { provider: "openai", model: "gpt-5.1" },
  }),
});

Tip: body and forwardedProps populate the same wire field. Use body for static defaults, the forwardedProps constructor option (or per-sendMessage data) for dynamic values. Runtime values always win.

HTTP Streaming (NDJSON)

For environments that don't speak SSE — some edge runtimes, certain mobile WebViews, or anywhere a proxy strips text/event-stream — use raw newline-delimited JSON. The wire format is one JSON StreamChunk per line:

typescript
import { useChat, fetchHttpStream } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchHttpStream("/api/chat"),
});
import { useChat, fetchHttpStream } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchHttpStream("/api/chat"),
});

Server-side, write each chunk as JSON.stringify(chunk) + "\n" to the response body. Options (url, headers, body, fetchClient, dynamic functions) match fetchServerSentEvents exactly.

Server Functions and Direct Async Iterables

When your client can call into your server without going over HTTP — TanStack Start server functions, RSC streams, in-process tests — skip the transport entirely. stream() takes a factory that returns an AsyncIterable<StreamChunk> and wires it straight into the client:

typescript
import { useChat, stream } from "@tanstack/ai-react";
import { chatServerFn } from "./server/chat.server";

// `chatServerFn` is a server function that returns an AsyncIterable<StreamChunk>,
// e.g. the result of `chat({ adapter, model, messages })` on the server.
const { messages } = useChat({
  connection: stream((messages, data) => chatServerFn({ messages, ...data })),
});
import { useChat, stream } from "@tanstack/ai-react";
import { chatServerFn } from "./server/chat.server";

// `chatServerFn` is a server function that returns an AsyncIterable<StreamChunk>,
// e.g. the result of `chat({ adapter, model, messages })` on the server.
const { messages } = useChat({
  connection: stream((messages, data) => chatServerFn({ messages, ...data })),
});

The factory receives the conversation messages plus any per-request data you passed to sendMessage. Return any async iterable that yields StreamChunk objects — a generator, the output of chat() on the server, a transformed stream, anything.

Tip: stream() is request-scoped. The factory is invoked once per sendMessage, the iterable runs to completion, and the connection closes. If you need a single long-lived channel that multiplexes many sends — for example a WebSocket — use subscribe / send instead.

RPC Streams

rpcStream() is identical in behavior to stream() but reads better at call sites that hand off to an RPC client. Use it when integrating with Cap'n Web, gRPC-Web, tRPC subscriptions, or any RPC framework that already returns an async iterable:

typescript
import { useChat, rpcStream } from "@tanstack/ai-react";
import { api } from "./rpc-client";

// `api.chat.stream` is your RPC method; it must return an AsyncIterable<StreamChunk>.
const { messages } = useChat({
  connection: rpcStream((messages, data) =>
    api.chat.stream({ messages, ...data }),
  ),
});
import { useChat, rpcStream } from "@tanstack/ai-react";
import { api } from "./rpc-client";

// `api.chat.stream` is your RPC method; it must return an AsyncIterable<StreamChunk>.
const { messages } = useChat({
  connection: rpcStream((messages, data) =>
    api.chat.stream({ messages, ...data }),
  ),
});

Persistent Transports (WebSockets and Friends)

A persistent transport — WebSocket, BroadcastChannel, postMessage between iframes, a shared worker — is fundamentally different from request/response. You open the channel once, then send and receive over it for the lifetime of the client. stream()/connect() can't model this cleanly because they assume one async iterable per request.

For these cases, implement the SubscribeConnectionAdapter interface directly. The shape (full definition in The Adapter Interface):

typescript
import type { SubscribeConnectionAdapter } from "@tanstack/ai-react";

// subscribe(abortSignal?): AsyncIterable<StreamChunk>   — long-lived
// send(messages, data?, abortSignal?, runContext?): Promise<void> — one per user message
import type { SubscribeConnectionAdapter } from "@tanstack/ai-react";

// subscribe(abortSignal?): AsyncIterable<StreamChunk>   — long-lived
// send(messages, data?, abortSignal?, runContext?): Promise<void> — one per user message
  • subscribe() is called once by the ChatClient and returns a long-lived async iterable of every chunk the channel produces.
  • send() is called once per user message to push a request frame onto the channel. It returns when the frame has been written — chunks arrive separately through subscribe().

The runtime correlates them: chunks emitted on the subscription queue between send() and the next terminal event (RUN_FINISHED / RUN_ERROR) are attributed to that run.

WebSocket example

typescript
import { useChat, type SubscribeConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";

function websocketConnection(url: string): SubscribeConnectionAdapter {
  const ws = new WebSocket(url);
  const queue: Array<StreamChunk> = [];
  let pending: ((chunk: StreamChunk | null) => void) | null = null;
  let closed = false;

  const ready = new Promise<void>((resolve) => {
    ws.addEventListener("open", () => resolve(), { once: true });
  });

  function deliver(chunk: StreamChunk | null) {
    const resolve = pending;
    if (resolve) {
      pending = null;
      resolve(chunk);
    } else if (chunk !== null) {
      queue.push(chunk);
    }
  }

  ws.addEventListener("message", (event) => {
    deliver(JSON.parse(event.data) as StreamChunk);
  });
  ws.addEventListener("close", () => {
    closed = true;
    deliver(null);
  });

  return {
    async *subscribe(abortSignal) {
      while (!abortSignal?.aborted && !closed) {
        const buffered = queue.shift();
        if (buffered !== undefined) {
          yield buffered;
          continue;
        }
        const chunk = await new Promise<StreamChunk | null>((resolve) => {
          pending = resolve;
          abortSignal?.addEventListener("abort", () => resolve(null), {
            once: true,
          });
        });
        if (chunk === null) return;
        yield chunk;
      }
    },

    async send(messages, data, _abortSignal, runContext) {
      await ready;
      ws.send(
        JSON.stringify({
          threadId: runContext?.threadId,
          runId: runContext?.runId,
          messages,
          data,
        }),
      );
    },
  };
}

const { messages } = useChat({
  connection: websocketConnection("wss://example.com/chat"),
});
import { useChat, type SubscribeConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";

function websocketConnection(url: string): SubscribeConnectionAdapter {
  const ws = new WebSocket(url);
  const queue: Array<StreamChunk> = [];
  let pending: ((chunk: StreamChunk | null) => void) | null = null;
  let closed = false;

  const ready = new Promise<void>((resolve) => {
    ws.addEventListener("open", () => resolve(), { once: true });
  });

  function deliver(chunk: StreamChunk | null) {
    const resolve = pending;
    if (resolve) {
      pending = null;
      resolve(chunk);
    } else if (chunk !== null) {
      queue.push(chunk);
    }
  }

  ws.addEventListener("message", (event) => {
    deliver(JSON.parse(event.data) as StreamChunk);
  });
  ws.addEventListener("close", () => {
    closed = true;
    deliver(null);
  });

  return {
    async *subscribe(abortSignal) {
      while (!abortSignal?.aborted && !closed) {
        const buffered = queue.shift();
        if (buffered !== undefined) {
          yield buffered;
          continue;
        }
        const chunk = await new Promise<StreamChunk | null>((resolve) => {
          pending = resolve;
          abortSignal?.addEventListener("abort", () => resolve(null), {
            once: true,
          });
        });
        if (chunk === null) return;
        yield chunk;
      }
    },

    async send(messages, data, _abortSignal, runContext) {
      await ready;
      ws.send(
        JSON.stringify({
          threadId: runContext?.threadId,
          runId: runContext?.runId,
          messages,
          data,
        }),
      );
    },
  };
}

const { messages } = useChat({
  connection: websocketConnection("wss://example.com/chat"),
});

Tip: Your server is responsible for emitting RUN_FINISHED (or RUN_ERROR) at the end of each run. Without it, the client will not know the assistant turn has ended and will wait indefinitely. See Streaming for the full event lifecycle.

When to choose persistent over request-scoped

Pick subscribe / send when any of these are true:

  • A single connection multiplexes many runs (chat thread keeps the socket open across messages).
  • The server pushes chunks outside of a request (presence updates, server-initiated tool calls, broadcast notifications).
  • You want to share one connection across multiple tabs (BroadcastChannel) or workers.

Otherwise, prefer fetchServerSentEvents or stream() — they're simpler and require no connection lifecycle management.

Custom Fetch Client

If you're keeping SSE or HTTP streaming but need to wrap fetch — for auth refresh, retries, logging, or routing through an edge proxy — pass a fetchClient:

typescript
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

async function authedFetch(input: RequestInfo | URL, init?: RequestInit) {
  let response = await fetch(input, init);
  if (response.status === 401) {
    await refreshToken();
    response = await fetch(input, init);
  }
  return response;
}

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", {
    fetchClient: authedFetch,
  }),
});
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

async function authedFetch(input: RequestInfo | URL, init?: RequestInit) {
  let response = await fetch(input, init);
  if (response.status === 401) {
    await refreshToken();
    response = await fetch(input, init);
  }
  return response;
}

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", {
    fetchClient: authedFetch,
  }),
});

The fetchClient must satisfy the standard fetch signature. fetchHttpStream accepts the same option.

Custom Request-Scoped Adapters

When none of the built-ins fit but the transport is still request-scoped (one request per user message), implement ConnectConnectionAdapter directly. This is the lowest-level escape hatch short of going persistent:

typescript
import { useChat, type ConnectConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";

const myAdapter: ConnectConnectionAdapter = {
  async *connect(messages, data, abortSignal, runContext) {
    const response = await fetch("/api/chat", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        threadId: runContext?.threadId,
        runId: runContext?.runId,
        messages,
        ...data,
      }),
      ...(abortSignal ? { signal: abortSignal } : {}),
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    if (!response.body) throw new Error("Response has no body");

    // Example: newline-delimited JSON. Replace this loop with whatever
    // framing your wire format uses, yielding one `StreamChunk` per event.
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = "";
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split("\n");
      buffer = lines.pop() ?? "";
      for (const line of lines) {
        if (line.trim()) yield JSON.parse(line) as StreamChunk;
      }
    }
  },
};

const { messages } = useChat({ connection: myAdapter });
import { useChat, type ConnectConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";

const myAdapter: ConnectConnectionAdapter = {
  async *connect(messages, data, abortSignal, runContext) {
    const response = await fetch("/api/chat", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        threadId: runContext?.threadId,
        runId: runContext?.runId,
        messages,
        ...data,
      }),
      ...(abortSignal ? { signal: abortSignal } : {}),
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    if (!response.body) throw new Error("Response has no body");

    // Example: newline-delimited JSON. Replace this loop with whatever
    // framing your wire format uses, yielding one `StreamChunk` per event.
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = "";
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split("\n");
      buffer = lines.pop() ?? "";
      for (const line of lines) {
        if (line.trim()) yield JSON.parse(line) as StreamChunk;
      }
    }
  },
};

const { messages } = useChat({ connection: myAdapter });

runContext carries threadId, runId, clientTools, and forwardedProps. Include them in your request payload so the server can build an AG-UI-compliant response. If your connect stream completes without emitting RUN_FINISHED, the runtime synthesizes one for you; if it throws, a RUN_ERROR is synthesized.

The Adapter Interface

A ConnectionAdapter is a union — provide either connect, or both subscribe and send. Never both modes.

typescript
export interface RunAgentInputContext {
  threadId: string;
  runId: string;
  parentRunId?: string;
  clientTools?: Array<{ name: string; description: string; parameters: unknown }>;
  forwardedProps?: Record<string, unknown>;
}

export interface ConnectConnectionAdapter {
  connect(
    messages: UIMessage[] | ModelMessage[],
    data?: Record<string, any>,
    abortSignal?: AbortSignal,
    runContext?: RunAgentInputContext,
  ): AsyncIterable<StreamChunk>;
}

export interface SubscribeConnectionAdapter {
  subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk>;
  send(
    messages: UIMessage[] | ModelMessage[],
    data?: Record<string, any>,
    abortSignal?: AbortSignal,
    runContext?: RunAgentInputContext,
  ): Promise<void>;
}

export type ConnectionAdapter =
  | ConnectConnectionAdapter
  | SubscribeConnectionAdapter;
export interface RunAgentInputContext {
  threadId: string;
  runId: string;
  parentRunId?: string;
  clientTools?: Array<{ name: string; description: string; parameters: unknown }>;
  forwardedProps?: Record<string, unknown>;
}

export interface ConnectConnectionAdapter {
  connect(
    messages: UIMessage[] | ModelMessage[],
    data?: Record<string, any>,
    abortSignal?: AbortSignal,
    runContext?: RunAgentInputContext,
  ): AsyncIterable<StreamChunk>;
}

export interface SubscribeConnectionAdapter {
  subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk>;
  send(
    messages: UIMessage[] | ModelMessage[],
    data?: Record<string, any>,
    abortSignal?: AbortSignal,
    runContext?: RunAgentInputContext,
  ): Promise<void>;
}

export type ConnectionAdapter =
  | ConnectConnectionAdapter
  | SubscribeConnectionAdapter;

Internally, ChatClient normalizes both shapes to a single subscribe/send pair via normalizeConnectionAdapter(). If you provide connect, it gets wrapped in an async queue; if you provide subscribe + send natively, they're used as-is.

Authentication

Static headers go in options.headers:

typescript
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", {
    headers: { Authorization: `Bearer ${token}` },
  }),
});
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", {
    headers: { Authorization: `Bearer ${token}` },
  }),
});

For tokens that change per request (refresh tokens, short-lived JWTs), pass a function — it's called on every send, so the header always reflects the latest token:

typescript
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", () => ({
    headers: { Authorization: `Bearer ${getToken()}` },
  })),
});
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";

const { messages } = useChat({
  connection: fetchServerSentEvents("/api/chat", () => ({
    headers: { Authorization: `Bearer ${getToken()}` },
  })),
});

Cookies are sent automatically when credentials is "same-origin" (default) or "include".

Cancellation

Every adapter — built-in or custom — receives an AbortSignal. Built-ins propagate it to fetch; custom adapters must honor it themselves. useChat's stop() aborts the current run by triggering the signal:

typescript
const { stop } = useChat({ connection: fetchServerSentEvents("/api/chat") });
stop(); // aborts the active stream
const { stop } = useChat({ connection: fetchServerSentEvents("/api/chat") });
stop(); // aborts the active stream

For SubscribeConnectionAdapter, the signal in subscribe() ends the entire subscription (component unmount); the signal in send() ends just the in-flight send.

Error Handling

Adapters should throw on transport errors (HTTP non-2xx, parse failures, dropped sockets). The ChatClient catches the throw, emits a RUN_ERROR chunk if none has been emitted yet, and surfaces it via onError / the error state:

typescript
const { error } = useChat({
  connection: fetchServerSentEvents("/api/chat"),
  onError: (err) => console.error("Chat failed:", err),
});
const { error } = useChat({
  connection: fetchServerSentEvents("/api/chat"),
  onError: (err) => console.error("Chat failed:", err),
});

Don't swallow AbortError — let it propagate so the client knows the abort succeeded.

Best Practices

  • Default to SSE. It's the most compatible and the easiest to debug. Switch only when something blocks it.
  • Use stream() when you can. If you control both sides and don't need HTTP semantics, server functions are faster to wire up than building a custom adapter.
  • Reach for subscribe/send only when you need persistence. WebSockets are powerful but require you to handle reconnection, run correlation, and lifecycle yourself.
  • Always honor abortSignal. It's how the client cleans up on unmount and on stop().
  • Emit RUN_FINISHED from the server. Without it, the client never knows the turn ended.

Next Steps