A connection adapter is the piece that decides how chunks get from your server to the ChatClient (and through it, to your framework's useChat). Everything else in TanStack AI — chunk processing, message reassembly, tool calls, UI updates — is transport-agnostic. The adapter is the only thing that touches the network.
This page covers every supported transport, when to pick which, and how to build a custom one.
| You have… | Use |
|---|---|
| A normal HTTP server and want the default | fetchServerSentEvents |
| An environment that blocks SSE (some edge runtimes, RN, strict proxies) | fetchHttpStream |
| A TanStack Start (or other) server function that already returns an async iterable | stream |
| An RPC framework like Cap'n Web, gRPC-Web, or tRPC | rpcStream |
| A single long-lived WebSocket (or BroadcastChannel, postMessage, shared worker) serving many runs | Custom subscribe / send adapter |
| Standard SSE but with custom fetch wrapping (auth refresh, retries) | fetchServerSentEvents with fetchClient |
| Something else entirely (HTTP/3, Server-Sent Events over a different protocol, etc.) | Custom connect adapter |
All adapters produce the same StreamChunk events (AG-UI Protocol) — the choice is purely about transport.
The default. SSE is well-supported across browsers, transparent through most proxies, and easy to debug. Pair it with toServerSentEventsResponse() on the server.
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages, sendMessage } = useChat({
connection: fetchServerSentEvents("/api/chat"),
});import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages, sendMessage } = useChat({
connection: fetchServerSentEvents("/api/chat"),
});Dynamic URL and headers. Pass functions when the value depends on per-request state (current user, fresh token):
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents(
() => `/api/chat?user=${currentUserId}`,
() => ({
headers: { Authorization: `Bearer ${getToken()}` },
}),
),
});import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents(
() => `/api/chat?user=${currentUserId}`,
() => ({
headers: { Authorization: `Bearer ${getToken()}` },
}),
),
});Static body. Anything in options.body is merged into the AG-UI forwardedProps payload sent to your server. Per-message data passed to sendMessage wins over this:
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
body: { provider: "openai", model: "gpt-5.1" },
}),
});import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
body: { provider: "openai", model: "gpt-5.1" },
}),
});Tip: body and forwardedProps populate the same wire field. Use body for static defaults, the forwardedProps constructor option (or per-sendMessage data) for dynamic values. Runtime values always win.
For environments that don't speak SSE — some edge runtimes, certain mobile WebViews, or anywhere a proxy strips text/event-stream — use raw newline-delimited JSON. The wire format is one JSON StreamChunk per line:
import { useChat, fetchHttpStream } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchHttpStream("/api/chat"),
});import { useChat, fetchHttpStream } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchHttpStream("/api/chat"),
});Server-side, write each chunk as JSON.stringify(chunk) + "\n" to the response body. Options (url, headers, body, fetchClient, dynamic functions) match fetchServerSentEvents exactly.
When your client can call into your server without going over HTTP — TanStack Start server functions, RSC streams, in-process tests — skip the transport entirely. stream() takes a factory that returns an AsyncIterable<StreamChunk> and wires it straight into the client:
import { useChat, stream } from "@tanstack/ai-react";
import { chatServerFn } from "./server/chat.server";
// `chatServerFn` is a server function that returns an AsyncIterable<StreamChunk>,
// e.g. the result of `chat({ adapter, model, messages })` on the server.
const { messages } = useChat({
connection: stream((messages, data) => chatServerFn({ messages, ...data })),
});import { useChat, stream } from "@tanstack/ai-react";
import { chatServerFn } from "./server/chat.server";
// `chatServerFn` is a server function that returns an AsyncIterable<StreamChunk>,
// e.g. the result of `chat({ adapter, model, messages })` on the server.
const { messages } = useChat({
connection: stream((messages, data) => chatServerFn({ messages, ...data })),
});The factory receives the conversation messages plus any per-request data you passed to sendMessage. Return any async iterable that yields StreamChunk objects — a generator, the output of chat() on the server, a transformed stream, anything.
Tip: stream() is request-scoped. The factory is invoked once per sendMessage, the iterable runs to completion, and the connection closes. If you need a single long-lived channel that multiplexes many sends — for example a WebSocket — use subscribe / send instead.
rpcStream() is identical in behavior to stream() but reads better at call sites that hand off to an RPC client. Use it when integrating with Cap'n Web, gRPC-Web, tRPC subscriptions, or any RPC framework that already returns an async iterable:
import { useChat, rpcStream } from "@tanstack/ai-react";
import { api } from "./rpc-client";
// `api.chat.stream` is your RPC method; it must return an AsyncIterable<StreamChunk>.
const { messages } = useChat({
connection: rpcStream((messages, data) =>
api.chat.stream({ messages, ...data }),
),
});import { useChat, rpcStream } from "@tanstack/ai-react";
import { api } from "./rpc-client";
// `api.chat.stream` is your RPC method; it must return an AsyncIterable<StreamChunk>.
const { messages } = useChat({
connection: rpcStream((messages, data) =>
api.chat.stream({ messages, ...data }),
),
});A persistent transport — WebSocket, BroadcastChannel, postMessage between iframes, a shared worker — is fundamentally different from request/response. You open the channel once, then send and receive over it for the lifetime of the client. stream()/connect() can't model this cleanly because they assume one async iterable per request.
For these cases, implement the SubscribeConnectionAdapter interface directly. The shape (full definition in The Adapter Interface):
import type { SubscribeConnectionAdapter } from "@tanstack/ai-react";
// subscribe(abortSignal?): AsyncIterable<StreamChunk> — long-lived
// send(messages, data?, abortSignal?, runContext?): Promise<void> — one per user messageimport type { SubscribeConnectionAdapter } from "@tanstack/ai-react";
// subscribe(abortSignal?): AsyncIterable<StreamChunk> — long-lived
// send(messages, data?, abortSignal?, runContext?): Promise<void> — one per user messageThe runtime correlates them: chunks emitted on the subscription queue between send() and the next terminal event (RUN_FINISHED / RUN_ERROR) are attributed to that run.
import { useChat, type SubscribeConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";
function websocketConnection(url: string): SubscribeConnectionAdapter {
const ws = new WebSocket(url);
const queue: Array<StreamChunk> = [];
let pending: ((chunk: StreamChunk | null) => void) | null = null;
let closed = false;
const ready = new Promise<void>((resolve) => {
ws.addEventListener("open", () => resolve(), { once: true });
});
function deliver(chunk: StreamChunk | null) {
const resolve = pending;
if (resolve) {
pending = null;
resolve(chunk);
} else if (chunk !== null) {
queue.push(chunk);
}
}
ws.addEventListener("message", (event) => {
deliver(JSON.parse(event.data) as StreamChunk);
});
ws.addEventListener("close", () => {
closed = true;
deliver(null);
});
return {
async *subscribe(abortSignal) {
while (!abortSignal?.aborted && !closed) {
const buffered = queue.shift();
if (buffered !== undefined) {
yield buffered;
continue;
}
const chunk = await new Promise<StreamChunk | null>((resolve) => {
pending = resolve;
abortSignal?.addEventListener("abort", () => resolve(null), {
once: true,
});
});
if (chunk === null) return;
yield chunk;
}
},
async send(messages, data, _abortSignal, runContext) {
await ready;
ws.send(
JSON.stringify({
threadId: runContext?.threadId,
runId: runContext?.runId,
messages,
data,
}),
);
},
};
}
const { messages } = useChat({
connection: websocketConnection("wss://example.com/chat"),
});import { useChat, type SubscribeConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";
function websocketConnection(url: string): SubscribeConnectionAdapter {
const ws = new WebSocket(url);
const queue: Array<StreamChunk> = [];
let pending: ((chunk: StreamChunk | null) => void) | null = null;
let closed = false;
const ready = new Promise<void>((resolve) => {
ws.addEventListener("open", () => resolve(), { once: true });
});
function deliver(chunk: StreamChunk | null) {
const resolve = pending;
if (resolve) {
pending = null;
resolve(chunk);
} else if (chunk !== null) {
queue.push(chunk);
}
}
ws.addEventListener("message", (event) => {
deliver(JSON.parse(event.data) as StreamChunk);
});
ws.addEventListener("close", () => {
closed = true;
deliver(null);
});
return {
async *subscribe(abortSignal) {
while (!abortSignal?.aborted && !closed) {
const buffered = queue.shift();
if (buffered !== undefined) {
yield buffered;
continue;
}
const chunk = await new Promise<StreamChunk | null>((resolve) => {
pending = resolve;
abortSignal?.addEventListener("abort", () => resolve(null), {
once: true,
});
});
if (chunk === null) return;
yield chunk;
}
},
async send(messages, data, _abortSignal, runContext) {
await ready;
ws.send(
JSON.stringify({
threadId: runContext?.threadId,
runId: runContext?.runId,
messages,
data,
}),
);
},
};
}
const { messages } = useChat({
connection: websocketConnection("wss://example.com/chat"),
});Tip: Your server is responsible for emitting RUN_FINISHED (or RUN_ERROR) at the end of each run. Without it, the client will not know the assistant turn has ended and will wait indefinitely. See Streaming for the full event lifecycle.
Pick subscribe / send when any of these are true:
Otherwise, prefer fetchServerSentEvents or stream() — they're simpler and require no connection lifecycle management.
If you're keeping SSE or HTTP streaming but need to wrap fetch — for auth refresh, retries, logging, or routing through an edge proxy — pass a fetchClient:
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
async function authedFetch(input: RequestInfo | URL, init?: RequestInit) {
let response = await fetch(input, init);
if (response.status === 401) {
await refreshToken();
response = await fetch(input, init);
}
return response;
}
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
fetchClient: authedFetch,
}),
});import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
async function authedFetch(input: RequestInfo | URL, init?: RequestInit) {
let response = await fetch(input, init);
if (response.status === 401) {
await refreshToken();
response = await fetch(input, init);
}
return response;
}
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
fetchClient: authedFetch,
}),
});The fetchClient must satisfy the standard fetch signature. fetchHttpStream accepts the same option.
When none of the built-ins fit but the transport is still request-scoped (one request per user message), implement ConnectConnectionAdapter directly. This is the lowest-level escape hatch short of going persistent:
import { useChat, type ConnectConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";
const myAdapter: ConnectConnectionAdapter = {
async *connect(messages, data, abortSignal, runContext) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
threadId: runContext?.threadId,
runId: runContext?.runId,
messages,
...data,
}),
...(abortSignal ? { signal: abortSignal } : {}),
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
if (!response.body) throw new Error("Response has no body");
// Example: newline-delimited JSON. Replace this loop with whatever
// framing your wire format uses, yielding one `StreamChunk` per event.
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.trim()) yield JSON.parse(line) as StreamChunk;
}
}
},
};
const { messages } = useChat({ connection: myAdapter });import { useChat, type ConnectConnectionAdapter } from "@tanstack/ai-react";
import type { StreamChunk } from "@tanstack/ai";
const myAdapter: ConnectConnectionAdapter = {
async *connect(messages, data, abortSignal, runContext) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
threadId: runContext?.threadId,
runId: runContext?.runId,
messages,
...data,
}),
...(abortSignal ? { signal: abortSignal } : {}),
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
if (!response.body) throw new Error("Response has no body");
// Example: newline-delimited JSON. Replace this loop with whatever
// framing your wire format uses, yielding one `StreamChunk` per event.
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.trim()) yield JSON.parse(line) as StreamChunk;
}
}
},
};
const { messages } = useChat({ connection: myAdapter });runContext carries threadId, runId, clientTools, and forwardedProps. Include them in your request payload so the server can build an AG-UI-compliant response. If your connect stream completes without emitting RUN_FINISHED, the runtime synthesizes one for you; if it throws, a RUN_ERROR is synthesized.
A ConnectionAdapter is a union — provide either connect, or both subscribe and send. Never both modes.
export interface RunAgentInputContext {
threadId: string;
runId: string;
parentRunId?: string;
clientTools?: Array<{ name: string; description: string; parameters: unknown }>;
forwardedProps?: Record<string, unknown>;
}
export interface ConnectConnectionAdapter {
connect(
messages: UIMessage[] | ModelMessage[],
data?: Record<string, any>,
abortSignal?: AbortSignal,
runContext?: RunAgentInputContext,
): AsyncIterable<StreamChunk>;
}
export interface SubscribeConnectionAdapter {
subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk>;
send(
messages: UIMessage[] | ModelMessage[],
data?: Record<string, any>,
abortSignal?: AbortSignal,
runContext?: RunAgentInputContext,
): Promise<void>;
}
export type ConnectionAdapter =
| ConnectConnectionAdapter
| SubscribeConnectionAdapter;export interface RunAgentInputContext {
threadId: string;
runId: string;
parentRunId?: string;
clientTools?: Array<{ name: string; description: string; parameters: unknown }>;
forwardedProps?: Record<string, unknown>;
}
export interface ConnectConnectionAdapter {
connect(
messages: UIMessage[] | ModelMessage[],
data?: Record<string, any>,
abortSignal?: AbortSignal,
runContext?: RunAgentInputContext,
): AsyncIterable<StreamChunk>;
}
export interface SubscribeConnectionAdapter {
subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk>;
send(
messages: UIMessage[] | ModelMessage[],
data?: Record<string, any>,
abortSignal?: AbortSignal,
runContext?: RunAgentInputContext,
): Promise<void>;
}
export type ConnectionAdapter =
| ConnectConnectionAdapter
| SubscribeConnectionAdapter;Internally, ChatClient normalizes both shapes to a single subscribe/send pair via normalizeConnectionAdapter(). If you provide connect, it gets wrapped in an async queue; if you provide subscribe + send natively, they're used as-is.
Static headers go in options.headers:
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
headers: { Authorization: `Bearer ${token}` },
}),
});import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", {
headers: { Authorization: `Bearer ${token}` },
}),
});For tokens that change per request (refresh tokens, short-lived JWTs), pass a function — it's called on every send, so the header always reflects the latest token:
import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", () => ({
headers: { Authorization: `Bearer ${getToken()}` },
})),
});import { useChat, fetchServerSentEvents } from "@tanstack/ai-react";
const { messages } = useChat({
connection: fetchServerSentEvents("/api/chat", () => ({
headers: { Authorization: `Bearer ${getToken()}` },
})),
});Cookies are sent automatically when credentials is "same-origin" (default) or "include".
Every adapter — built-in or custom — receives an AbortSignal. Built-ins propagate it to fetch; custom adapters must honor it themselves. useChat's stop() aborts the current run by triggering the signal:
const { stop } = useChat({ connection: fetchServerSentEvents("/api/chat") });
stop(); // aborts the active streamconst { stop } = useChat({ connection: fetchServerSentEvents("/api/chat") });
stop(); // aborts the active streamFor SubscribeConnectionAdapter, the signal in subscribe() ends the entire subscription (component unmount); the signal in send() ends just the in-flight send.
Adapters should throw on transport errors (HTTP non-2xx, parse failures, dropped sockets). The ChatClient catches the throw, emits a RUN_ERROR chunk if none has been emitted yet, and surfaces it via onError / the error state:
const { error } = useChat({
connection: fetchServerSentEvents("/api/chat"),
onError: (err) => console.error("Chat failed:", err),
});const { error } = useChat({
connection: fetchServerSentEvents("/api/chat"),
onError: (err) => console.error("Chat failed:", err),
});Don't swallow AbortError — let it propagate so the client knows the abort succeeded.