Home/Guides/Realtime App
Guide

Streaming & Realtime with Surf

Build commands that stream progressive output via SSE and push real-time events to AI agents via WebSocket with session-scoped delivery.

When to Use Streaming

Not every command needs streaming. Use it for operations that take more than a few hundred milliseconds and where the agent benefits from progressive results: AI text generation, large data exports, complex computations, or real-time monitoring.

Request Flow

Agent
POST /surf/execute
SSE Stream
chunk 1
chunk 2
done

Streaming Commands

Mark a command as streaming with stream: true and use ctx.emit() to send progressive chunks:

streaming.ts
import { createSurf } from '@surfjs/core'
 
const surf = createSurf({
name: 'AI Writer',
commands: {
// Streaming text generation
generate: {
description: 'Generate text with streaming output',
stream: true,
params: {
prompt: { type: 'string', required: true },
maxTokens: { type: 'number', default: 500 },
model: { type: 'string', default: 'gpt-4' },
},
run: async ({ prompt, maxTokens, model }, ctx) => {
const stream = await openai.chat.completions.create({
model,
messages: [{ role: 'user', content: prompt }],
max_tokens: maxTokens,
stream: true,
})
 
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content
if (text) {
// ctx.emit() sends an SSE chunk to the client
ctx.emit!({ text, done: false })
}
}
 
// Return value is sent as the final "done" event
return { done: true, model }
},
},
 
// Streaming data export
'export': {
description: 'Export large datasets with progress',
stream: true,
auth: 'required',
params: {
table: { type: 'string', required: true },
format: { type: 'string', enum: ['json', 'csv'], default: 'json' },
},
run: async ({ table, format }, ctx) => {
const total = await db.count(table)
let processed = 0
 
for await (const batch of db.streamBatches(table, 100)) {
processed += batch.length
ctx.emit!({
rows: format === 'csv' ? toCsv(batch) : batch,
progress: Math.round((processed / total) * 100),
})
}
 
return { total: processed, format }
},
},
},
})

💡 Tip: Both the command definition must have stream: true and the client request must include stream: true for SSE to activate.

Client-Side Streaming

The client SDK handles SSE streams automatically:

agent-stream.ts
import { SurfClient } from '@surfjs/client'
 
const client = await SurfClient.discover('https://ai-writer.com')
 
// Execute returns the final result after streaming completes
const result = await client.execute('generate', {
prompt: 'Explain quantum computing in 3 sentences',
})
// result => { done: true, model: 'gpt-4' }
 
// For raw HTTP SSE access:
const response = await fetch('https://ai-writer.com/surf/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
command: 'generate',
params: { prompt: 'Hello world' },
stream: true,
}),
})
 
// Parse SSE events
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const text = decoder.decode(value)
// data: { "type": "chunk", "data": { "text": "Hello" } }
// data: { "type": "done", "result": { "done": true } }
}

WebSocket & Events

For real-time bidirectional communication, Surf supports WebSocket connections. Define events in your config and emit them from anywhere in your code:

realtime-server.ts
import { createSurf } from '@surfjs/core'
import http from 'node:http'
import express from 'express'
 
const surf = createSurf({
name: 'Ops Dashboard',
// Define events with scoping
events: {
'metrics.update': {
description: 'Real-time metrics push',
scope: 'session', // Only the subscribing session receives this
data: {
cpu: { type: 'number' },
memory: { type: 'number' },
timestamp: { type: 'number' },
},
},
'alert.triggered': {
description: 'System alert',
scope: 'broadcast', // All connected clients receive this
data: {
severity: { type: 'string' },
message: { type: 'string' },
},
},
},
commands: {
status: {
description: 'Get current system status',
hints: { idempotent: true, sideEffects: false },
run: async () => ({
cpu: await metrics.cpu(),
memory: await metrics.memory(),
requests: await metrics.requestsPerMinute(),
uptime: process.uptime(),
}),
},
'metrics.stream': {
description: 'Stream live metrics via SSE',
stream: true,
params: {
interval: { type: 'number', default: 1000, description: 'Interval in ms' },
},
run: async ({ interval }, ctx) => {
for (let i = 0; i < 60; i++) {
ctx.emit!({
cpu: await metrics.cpu(),
memory: await metrics.memory(),
timestamp: Date.now(),
})
await sleep(interval)
}
return { streamed: 60 }
},
},
},
})
 
const app = express()
app.use(express.json())
app.use(surf.middleware())
 
const server = http.createServer(app)
surf.wsHandler(server) // Attach WebSocket transport (requires 'ws' package)
 
server.listen(3000)
 
// Emit events from your application logic
setInterval(async () => {
surf.emit('metrics.update', {
cpu: await metrics.cpu(),
memory: await metrics.memory(),
timestamp: Date.now(),
})
}, 5000)
 
// Broadcast alerts to all connected clients
alerting.on('alert', (alert) => {
surf.emit('alert.triggered', {
severity: alert.severity,
message: alert.message,
})
})

Event Scoping — Security Feature

Event scoping is a key security feature that controls who receives each event. Surf supports three scopes:

ScopeDeliveryExample
sessionOnly the triggering sessionCart updates, user metrics
globalAll subscribers (inc. server-side)System announcements
broadcastAll connected sessionsStock updates, live alerts

⚠️ Security: Events default to session scope, preventing one user from receiving another user's updates. Only use broadcast for non-sensitive data.

Agent WebSocket Client

Agents connect via WebSocket to execute commands in real-time and receive event push notifications:

agent-realtime.ts
import { SurfClient } from '@surfjs/client'
 
const client = await SurfClient.discover('https://ops-dashboard.com')
 
// Connect via WebSocket
const ws = await client.connect()
 
// Execute commands over WebSocket
const status = await ws.execute('status', {})
console.log('CPU:', status.cpu)
 
// Listen for events
ws.on('alert.triggered', (data) => {
console.log(`[ALERT] ${data.severity}: ${data.message}`)
})
 
ws.on('metrics.update', (data) => {
console.log(`CPU: ${data.cpu}%, Memory: ${data.memory}%`)
})
 
// Clean up
client.disconnect()

Summary

Surf's streaming and event system lets you build real-time AI agent integrations with minimal code. Use SSE streaming with ctx.emit() for progressive output, WebSocket for bidirectional communication, and event scoping for secure, session-aware delivery. The client SDK handles both transparently.

Ask your agent

Copy these prompts into Claude, OpenClaw, or any AI agent

OpenClaw / Claude

Add Surf streaming to my Node.js app. Install @surfjs/core and use createSurf() to define a "generate" command with stream: true that uses ctx.emit() to send chunks. Also define events with session scoping for real-time updates. Mount on Express with surf.middleware() and attach WebSocket with surf.wsHandler(server).

Claude Code / Cursor

My Next.js app has a live feed. Add Surf with createSurf() including: a streaming "generate" command using ctx.emit(), event definitions with session/broadcast scopes, and WebSocket support via surf.wsHandler(). Use @surfjs/core.

Test streaming with @surfjs/client

Using @surfjs/client, connect to http://localhost:3000 and call the "generate" command with { prompt: "explain Surf.js in 3 sentences" }. Then connect via WebSocket and listen for events.

💡 Works with OpenClaw, Claude Code, Cursor, Codex, and any agent that can make HTTP requests.