Bridge Server Architecture & Session Management

View as MarkdownOpen in Claude

Build a server that sits between MeetStream and your AI stack. The bridge receives real-time meeting audio, manages one session per MeetStream connection, routes audio through your processing pipeline, and sends responses (audio, chat, images) back into the meeting.


Architecture

MeetStream Bridge Architecture

The bridge server exposes two WebSocket endpoints. Each MeetStream connection opens one to each:

EndpointDirectionFormatPurpose
/bridge/audioMeetStream → YouBinary framesLive meeting audio with speaker metadata
/bridgeMeetStream ↔ YouJSON textCommands: send audio, chat, images, interrupt

Each MeetStream connection is identified by a bot_id. The session manager maintains one AI session per bot_id, created on first connection and torn down when both WebSockets disconnect.


Session Lifecycle

MeetStream Bridge Session Lifecycle

Key rules:

  • Either channel can arrive first. The session is created on whichever connects first.
  • Both channels share the same bot_id and the same AI session.
  • Cleanup happens when both WebSockets have disconnected for a given bot_id.

Core Components

1. Session Manager

The session manager is the central coordinator. It maps bot_id to:

  • An AI session (your STT/LLM/TTS pipeline)
  • References to the audio and control WebSockets
  • Per-session locks to prevent race conditions during creation
1import asyncio
2import json
3import logging
4from typing import Any, Dict, Optional
5
6from fastapi import WebSocket
7from starlette.websockets import WebSocketState
8
9logger = logging.getLogger("bridge")
10
11
12class SessionManager:
13 def __init__(self):
14 self.sessions: Dict[str, Any] = {}
15 self.audio_ws: Dict[str, WebSocket] = {}
16 self.control_ws: Dict[str, WebSocket] = {}
17 self._locks: Dict[str, asyncio.Lock] = {}
18
19 def _lock_for(self, bot_id: str) -> asyncio.Lock:
20 if bot_id not in self._locks:
21 self._locks[bot_id] = asyncio.Lock()
22 return self._locks[bot_id]
23
24 async def ensure_session(self, bot_id: str):
25 """Create an AI session for this bot_id if one doesn't exist."""
26 async with self._lock_for(bot_id):
27 if bot_id in self.sessions:
28 return
29 session = await self._create_session(bot_id)
30 self.sessions[bot_id] = session
31 logger.info(f"[{bot_id}] Session created")
32
33 async def close_session(self, bot_id: str):
34 """Tear down the AI session for this bot_id."""
35 async with self._lock_for(bot_id):
36 session = self.sessions.pop(bot_id, None)
37 if session:
38 await self._destroy_session(bot_id, session)
39 logger.info(f"[{bot_id}] Session closed")
40
41 async def maybe_cleanup(self, bot_id: str):
42 """Close the session if both WebSockets have disconnected."""
43 if bot_id not in self.audio_ws and bot_id not in self.control_ws:
44 await self.close_session(bot_id)
45
46 async def _create_session(self, bot_id: str) -> Any:
47 """
48 Replace this with your AI pipeline initialization.
49 Examples: start an STT stream, create an LLM session,
50 initialize a TTS engine, connect to a realtime API, etc.
51 """
52 raise NotImplementedError("Implement _create_session()")
53
54 async def _destroy_session(self, bot_id: str, session: Any):
55 """
56 Replace this with your AI pipeline teardown.
57 Close connections, flush buffers, release resources.
58 """
59 raise NotImplementedError("Implement _destroy_session()")

2. Audio Decoder

Decodes the binary frame format from MeetStream. See Live Audio Capture & Frame Decoding for the full specification.

1from typing import Optional, Tuple
2
3def decode_audio_frame(data: bytes) -> Optional[Tuple[str, str, bytes]]:
4 """Decode a MeetStream binary audio frame.
5
6 Returns (speaker_id, speaker_name, pcm_bytes) or None.
7 """
8 if len(data) < 5 or data[0] != 0x01:
9 return None
10 sid_len = int.from_bytes(data[1:3], "little")
11 speaker_id = data[3 : 3 + sid_len].decode("utf-8")
12 off = 3 + sid_len
13 sname_len = int.from_bytes(data[off : off + 2], "little")
14 off += 2
15 speaker_name = data[off : off + sname_len].decode("utf-8")
16 off += sname_len
17 return speaker_id, speaker_name, data[off:]

3. Audio Resampler

Meeting audio arrives at 48 kHz. Most AI models expect a different rate (e.g., 24 kHz for OpenAI Realtime, 16 kHz for Whisper). Resample before feeding your pipeline, and resample back to 48 kHz before sending to the meeting.

1import numpy as np
2
3def resample_pcm16(pcm_bytes: bytes, src_hz: int, dst_hz: int) -> bytes:
4 """Resample PCM16 LE audio between sample rates."""
5 if src_hz == dst_hz:
6 return pcm_bytes
7 x = np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32)
8 n_out = int(len(x) * dst_hz / src_hz)
9 t_src = np.linspace(0, 1, len(x), endpoint=False)
10 t_dst = np.linspace(0, 1, n_out, endpoint=False)
11 y = np.interp(t_dst, t_src, x)
12 return np.clip(y, -32768, 32767).astype(np.int16).tobytes()
13
14INCOMING_RATE = 48000 # MeetStream sends 48kHz
15MODEL_RATE = 24000 # Your model's expected rate
16OUTGOING_RATE = 48000 # MeetStream expects 48kHz back

4. Speaker Filter

Filter out MeetStream’s own audio to prevent echo/feedback loops. The display name appears in the speaker_name field.

1IGNORED_SPEAKERS = {
2 "My Bot Name",
3 "Meeting Assistant",
4}
5
6AGENT_KEYWORDS = ["bot", "agent", "assistant", "ai"]
7
8def should_ignore_speaker(speaker_name: str) -> bool:
9 if speaker_name in IGNORED_SPEAKERS:
10 return True
11 lower = speaker_name.lower()
12 return any(kw in lower for kw in AGENT_KEYWORDS)

5. Command Sender

Helper functions to build outbound commands for the control channel. See Meeting Control & Command Patterns for the full command reference.

1import base64
2
3async def safe_send(ws: WebSocket, payload: dict):
4 """Send JSON to a WebSocket, swallowing errors."""
5 try:
6 if ws.client_state == WebSocketState.CONNECTED:
7 await ws.send_text(json.dumps(payload))
8 except Exception as e:
9 logger.warning(f"WebSocket send failed: {e}")
10
11
12async def send_audio_to_meeting(ws: WebSocket, bot_id: str, pcm_bytes: bytes, sample_rate: int = 48000):
13 await safe_send(ws, {
14 "command": "sendaudio",
15 "bot_id": bot_id,
16 "audiochunk": base64.b64encode(pcm_bytes).decode("utf-8"),
17 "sample_rate": sample_rate,
18 "encoding": "pcm16",
19 "channels": 1,
20 "endianness": "little",
21 })
22
23
24async def send_chat_to_meeting(ws: WebSocket, bot_id: str, text: str):
25 await safe_send(ws, {
26 "command": "sendmsg",
27 "bot_id": bot_id,
28 "message": text,
29 "msg": text,
30 })
31
32
33async def send_interrupt_to_meeting(ws: WebSocket, bot_id: str):
34 await safe_send(ws, {
35 "command": "interrupt",
36 "bot_id": bot_id,
37 "action": "clear_audio_queue",
38 })

WebSocket Endpoints

Audio Endpoint: /bridge/audio

Receives binary audio frames from the meeting. Decodes speaker metadata, filters out MeetStream’s own audio, and routes PCM to your AI pipeline.

1from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2
3app = FastAPI()
4manager = SessionManager()
5
6
7@app.websocket("/bridge/audio")
8async def audio_endpoint(websocket: WebSocket):
9 await websocket.accept()
10 bot_id = None
11
12 try:
13 while True:
14 raw = await websocket.receive()
15
16 # Text frames: handshake or legacy JSON audio
17 if "text" in raw and raw["text"]:
18 data = json.loads(raw["text"])
19
20 # Handshake
21 if data.get("type") == "ready":
22 bot_id = data.get("bot_id")
23 if not bot_id:
24 await websocket.close(code=1003)
25 return
26 manager.audio_ws[bot_id] = websocket
27 await manager.ensure_session(bot_id)
28 await safe_send(websocket, {
29 "type": "ack",
30 "message": f"Audio channel bound to {bot_id}",
31 })
32 continue
33
34 # Legacy JSON audio (for backward compatibility)
35 if data.get("type") == "PCMChunk" and bot_id:
36 speaker = data.get("speakerName", "")
37 if should_ignore_speaker(speaker):
38 continue
39 audio_b64 = data.get("audioData")
40 if audio_b64:
41 pcm = base64.b64decode(audio_b64)
42 pcm_resampled = resample_pcm16(pcm, INCOMING_RATE, MODEL_RATE)
43 session = manager.sessions.get(bot_id)
44 if session:
45 await session.send_audio(pcm_resampled)
46 continue
47
48 # Binary frames: current binary audio protocol
49 if "bytes" in raw and raw["bytes"] and bot_id:
50 result = decode_audio_frame(raw["bytes"])
51 if result is None:
52 continue
53 speaker_id, speaker_name, pcm_bytes = result
54
55 if should_ignore_speaker(speaker_name):
56 continue
57
58 if pcm_bytes:
59 pcm_resampled = resample_pcm16(pcm_bytes, INCOMING_RATE, MODEL_RATE)
60 session = manager.sessions.get(bot_id)
61 if session:
62 await session.send_audio(pcm_resampled)
63
64 except WebSocketDisconnect:
65 pass
66 except Exception as e:
67 logger.error(f"[{bot_id}] Audio endpoint error: {e}")
68 finally:
69 if bot_id:
70 manager.audio_ws.pop(bot_id, None)
71 await manager.maybe_cleanup(bot_id)

Control Endpoint: /bridge

Receives the MeetStream handshake and inbound commands (usermsg, interrupt). Your AI pipeline sends outbound commands (sendaudio, sendmsg, sendchat, interrupt) over this same connection.

1@app.websocket("/bridge")
2async def control_endpoint(websocket: WebSocket):
3 await websocket.accept()
4 bot_id = None
5
6 try:
7 # Wait for handshake
8 init = json.loads(await websocket.receive_text())
9 if init.get("type") != "ready" or not init.get("bot_id"):
10 await websocket.close(code=1003)
11 return
12
13 bot_id = init["bot_id"]
14 manager.control_ws[bot_id] = websocket
15 await manager.ensure_session(bot_id)
16
17 await safe_send(websocket, {
18 "command": "ack",
19 "bot_id": bot_id,
20 "message": f"Control channel bound to {bot_id}",
21 })
22
23 # Start the event pump that forwards AI output to MeetStream
24 asyncio.create_task(event_pump(bot_id))
25
26 # Main loop: receive inbound commands from MeetStream
27 while True:
28 data = json.loads(await websocket.receive_text())
29 command = data.get("command")
30
31 if command == "usermsg":
32 text = data.get("message", "")
33 if text:
34 session = manager.sessions.get(bot_id)
35 if session:
36 await session.send_text(text)
37
38 elif command == "interrupt":
39 session = manager.sessions.get(bot_id)
40 if session and hasattr(session, "interrupt"):
41 await session.interrupt()
42
43 except WebSocketDisconnect:
44 pass
45 except Exception as e:
46 logger.error(f"[{bot_id}] Control endpoint error: {e}")
47 finally:
48 if bot_id:
49 manager.control_ws.pop(bot_id, None)
50 await manager.maybe_cleanup(bot_id)

Event Pump

The event pump is a background task that continuously reads output from your AI pipeline and forwards it to the meeting through the control WebSocket. This is the outbound half of the bridge.

1async def event_pump(bot_id: str):
2 """Forward AI pipeline output to the MeetStream control channel."""
3 session = manager.sessions.get(bot_id)
4 if not session:
5 return
6
7 try:
8 async for event in session:
9
10 ws = manager.control_ws.get(bot_id)
11 if not ws or ws.client_state != WebSocketState.CONNECTED:
12 continue
13
14 # AI produced audio → send to meeting
15 if event.type == "audio":
16 pcm_model = event.audio_bytes # at MODEL_RATE
17 pcm_out = resample_pcm16(pcm_model, MODEL_RATE, OUTGOING_RATE)
18 await send_audio_to_meeting(ws, bot_id, pcm_out, OUTGOING_RATE)
19
20 # AI speech was interrupted → clear the playback queue
21 elif event.type == "audio_interrupted":
22 await send_interrupt_to_meeting(ws, bot_id)
23
24 # AI produced a text response → send as chat
25 elif event.type == "text_response":
26 await send_chat_to_meeting(ws, bot_id, event.text)
27
28 except Exception as e:
29 logger.error(f"[{bot_id}] Event pump error: {e}")

Note: The event types above (audio, audio_interrupted, text_response) are illustrative. Replace them with whatever events your AI framework emits.


Audio Flow Summary

Audio Flow


Full Working Skeleton

A minimal but complete bridge server. Replace the YourAISession class with your actual AI pipeline.

1import asyncio
2import base64
3import json
4import logging
5from contextlib import asynccontextmanager
6from typing import Any, Dict, Optional, Tuple
7
8import numpy as np
9from fastapi import FastAPI, WebSocket, WebSocketDisconnect
10from starlette.websockets import WebSocketState
11
12logging.basicConfig(level=logging.INFO)
13logger = logging.getLogger("bridge")
14
15# ── Configuration ─────────────────────────────────────────────────────────────
16
17INCOMING_RATE = 48000 # MeetStream sends 48kHz
18MODEL_RATE = 24000 # Your AI model's expected sample rate
19OUTGOING_RATE = 48000 # MeetStream expects 48kHz back
20
21IGNORED_SPEAKERS = {"My Bot Name"}
22AGENT_KEYWORDS = ["bot", "agent", "assistant"]
23
24
25# ── Audio utilities ───────────────────────────────────────────────────────────
26
27def decode_audio_frame(data: bytes) -> Optional[Tuple[str, str, bytes]]:
28 if len(data) < 5 or data[0] != 0x01:
29 return None
30 sid_len = int.from_bytes(data[1:3], "little")
31 sid = data[3 : 3 + sid_len].decode("utf-8")
32 off = 3 + sid_len
33 sname_len = int.from_bytes(data[off : off + 2], "little")
34 off += 2
35 sname = data[off : off + sname_len].decode("utf-8")
36 off += sname_len
37 return sid, sname, data[off:]
38
39
40def resample_pcm16(pcm_bytes: bytes, src_hz: int, dst_hz: int) -> bytes:
41 if src_hz == dst_hz:
42 return pcm_bytes
43 x = np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32)
44 n_out = int(len(x) * dst_hz / src_hz)
45 t_src = np.linspace(0, 1, len(x), endpoint=False)
46 t_dst = np.linspace(0, 1, n_out, endpoint=False)
47 y = np.interp(t_dst, t_src, x)
48 return np.clip(y, -32768, 32767).astype(np.int16).tobytes()
49
50
51def should_ignore_speaker(name: str) -> bool:
52 if name in IGNORED_SPEAKERS:
53 return True
54 lower = name.lower()
55 return any(kw in lower for kw in AGENT_KEYWORDS)
56
57
58# ── Safe WebSocket send ───────────────────────────────────────────────────────
59
60async def safe_send(ws: WebSocket, payload: dict):
61 try:
62 if ws.client_state == WebSocketState.CONNECTED:
63 await ws.send_text(json.dumps(payload))
64 except Exception as e:
65 logger.warning(f"send failed: {e}")
66
67
68# ── Your AI session (replace this) ───────────────────────────────────────────
69
70class YourAISession:
71 """
72 Stub. Replace with your actual AI pipeline.
73
74 This could be:
75 - An OpenAI Realtime API session
76 - A Whisper STT + GPT + TTS pipeline
77 - A LiveKit agent session
78 - Any other audio-in/audio-out system
79 """
80
81 async def send_audio(self, pcm_bytes: bytes):
82 """Feed PCM audio into your pipeline."""
83 pass
84
85 async def send_text(self, text: str):
86 """Feed user text into your pipeline."""
87 pass
88
89 async def interrupt(self):
90 """Signal the pipeline to stop generating."""
91 pass
92
93 async def events(self):
94 """Yield output events from your pipeline.
95
96 Expected event shapes (adapt to your framework):
97 {"type": "audio", "pcm_bytes": bytes}
98 {"type": "audio_interrupted"}
99 {"type": "text", "content": str}
100 """
101 while True:
102 await asyncio.sleep(1) # replace with actual event stream
103 return
104
105
106# ── Session Manager ───────────────────────────────────────────────────────────
107
108class SessionManager:
109 def __init__(self):
110 self.sessions: Dict[str, YourAISession] = {}
111 self.audio_ws: Dict[str, WebSocket] = {}
112 self.control_ws: Dict[str, WebSocket] = {}
113 self._locks: Dict[str, asyncio.Lock] = {}
114
115 def _lock(self, bot_id: str) -> asyncio.Lock:
116 if bot_id not in self._locks:
117 self._locks[bot_id] = asyncio.Lock()
118 return self._locks[bot_id]
119
120 async def ensure_session(self, bot_id: str):
121 async with self._lock(bot_id):
122 if bot_id in self.sessions:
123 return
124 self.sessions[bot_id] = YourAISession()
125 asyncio.create_task(self._event_pump(bot_id))
126 logger.info(f"[{bot_id}] Session created")
127
128 async def close_session(self, bot_id: str):
129 async with self._lock(bot_id):
130 self.sessions.pop(bot_id, None)
131 logger.info(f"[{bot_id}] Session closed")
132
133 async def maybe_cleanup(self, bot_id: str):
134 if bot_id not in self.audio_ws and bot_id not in self.control_ws:
135 await self.close_session(bot_id)
136
137 async def _event_pump(self, bot_id: str):
138 """Background task: forward AI output → MeetStream control WS."""
139 session = self.sessions.get(bot_id)
140 if not session:
141 return
142
143 try:
144 async for event in session.events():
145 ws = self.control_ws.get(bot_id)
146 if not ws or ws.client_state != WebSocketState.CONNECTED:
147 continue
148
149 etype = event.get("type")
150
151 if etype == "audio":
152 pcm_out = resample_pcm16(event["pcm_bytes"], MODEL_RATE, OUTGOING_RATE)
153 await safe_send(ws, {
154 "command": "sendaudio",
155 "bot_id": bot_id,
156 "audiochunk": base64.b64encode(pcm_out).decode("utf-8"),
157 "sample_rate": OUTGOING_RATE,
158 "encoding": "pcm16",
159 "channels": 1,
160 "endianness": "little",
161 })
162
163 elif etype == "audio_interrupted":
164 await safe_send(ws, {
165 "command": "interrupt",
166 "bot_id": bot_id,
167 "action": "clear_audio_queue",
168 })
169
170 elif etype == "text":
171 await safe_send(ws, {
172 "command": "sendmsg",
173 "bot_id": bot_id,
174 "message": event["content"],
175 "msg": event["content"],
176 })
177
178 except Exception as e:
179 logger.error(f"[{bot_id}] Event pump error: {e}")
180
181
182manager = SessionManager()
183
184
185# ── FastAPI Application ───────────────────────────────────────────────────────
186
187@asynccontextmanager
188async def lifespan(app: FastAPI):
189 yield
190
191app = FastAPI(title="MeetStream Bridge", lifespan=lifespan)
192
193
194@app.get("/health")
195async def health():
196 return {"status": "healthy"}
197
198
199@app.websocket("/bridge/audio")
200async def audio_endpoint(websocket: WebSocket):
201 """Receives live meeting audio from MeetStream."""
202 await websocket.accept()
203 bot_id = None
204
205 try:
206 while True:
207 raw = await websocket.receive()
208
209 # Text frame: handshake or legacy JSON
210 if "text" in raw and raw["text"]:
211 data = json.loads(raw["text"])
212
213 if data.get("type") == "ready":
214 bot_id = data.get("bot_id")
215 if not bot_id:
216 await websocket.close(code=1003)
217 return
218 manager.audio_ws[bot_id] = websocket
219 await manager.ensure_session(bot_id)
220 await safe_send(websocket, {
221 "type": "ack",
222 "message": f"Audio channel bound to {bot_id}",
223 })
224
225 elif data.get("type") == "PCMChunk" and bot_id:
226 speaker = data.get("speakerName", "")
227 if should_ignore_speaker(speaker):
228 continue
229 b64 = data.get("audioData")
230 if b64:
231 pcm = base64.b64decode(b64)
232 pcm_r = resample_pcm16(pcm, INCOMING_RATE, MODEL_RATE)
233 session = manager.sessions.get(bot_id)
234 if session:
235 await session.send_audio(pcm_r)
236 continue
237
238 # Binary frame: current protocol
239 if "bytes" in raw and raw["bytes"] and bot_id:
240 result = decode_audio_frame(raw["bytes"])
241 if result is None:
242 continue
243 _, speaker_name, pcm_bytes = result
244 if should_ignore_speaker(speaker_name):
245 continue
246 if pcm_bytes:
247 pcm_r = resample_pcm16(pcm_bytes, INCOMING_RATE, MODEL_RATE)
248 session = manager.sessions.get(bot_id)
249 if session:
250 await session.send_audio(pcm_r)
251
252 except WebSocketDisconnect:
253 pass
254 except Exception as e:
255 logger.error(f"[{bot_id}] Audio error: {e}")
256 finally:
257 if bot_id:
258 manager.audio_ws.pop(bot_id, None)
259 await manager.maybe_cleanup(bot_id)
260
261
262@app.websocket("/bridge")
263async def control_endpoint(websocket: WebSocket):
264 """Two-way command channel with MeetStream."""
265 await websocket.accept()
266 bot_id = None
267
268 try:
269 init = json.loads(await websocket.receive_text())
270 if init.get("type") != "ready" or not init.get("bot_id"):
271 await websocket.close(code=1003)
272 return
273
274 bot_id = init["bot_id"]
275 manager.control_ws[bot_id] = websocket
276 await manager.ensure_session(bot_id)
277
278 await safe_send(websocket, {
279 "command": "ack",
280 "bot_id": bot_id,
281 "message": f"Control channel bound to {bot_id}",
282 })
283
284 while True:
285 data = json.loads(await websocket.receive_text())
286 command = data.get("command")
287
288 if command == "usermsg":
289 text = data.get("message", "")
290 if text:
291 session = manager.sessions.get(bot_id)
292 if session:
293 await session.send_text(text)
294
295 elif command == "interrupt":
296 session = manager.sessions.get(bot_id)
297 if session:
298 await session.interrupt()
299
300 except WebSocketDisconnect:
301 pass
302 except Exception as e:
303 logger.error(f"[{bot_id}] Control error: {e}")
304 finally:
305 if bot_id:
306 manager.control_ws.pop(bot_id, None)
307 await manager.maybe_cleanup(bot_id)
308
309
310if __name__ == "__main__":
311 import uvicorn
312 uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True)

Run:

$pip install fastapi uvicorn numpy
$uvicorn server:app --host 0.0.0.0 --port 8000

Connecting MeetStream

When creating a MeetStream session, point both WebSocket URLs at your bridge server:

1{
2 "meeting_url": "https://meet.google.com/abc-defg-hij",
3 "bot_name": "My Assistant",
4 "live_audio_required": {
5 "websocket_url": "wss://your-bridge.com/bridge/audio"
6 },
7 "socket_connection_url": {
8 "websocket_url": "wss://your-bridge.com/bridge"
9 }
10}

Both connections carry the same bot_id in their handshake, so the session manager can link them together.


Design Decisions

Why two separate WebSocket connections?

Separation of concerns. The audio channel is high-throughput binary data (hundreds of frames per minute). The control channel is low-frequency JSON commands. Splitting them avoids head-of-line blocking and makes it easier to handle each independently.

Why does the audio channel support both binary and JSON?

Backward compatibility. Older MeetStream versions send audio as JSON PCMChunk messages with base64-encoded audio. Current versions send binary frames (significantly more efficient). The bridge should accept both.

Why lock on session creation?

Both WebSocket connections race to ensure_session(). Without a lock, two AI sessions could be created for the same bot_id. The async lock ensures exactly one session is created.

Why clean up only when both connections close?

A single reconnecting WebSocket shouldn’t destroy the running session. The session stays alive as long as at least one channel is connected.

Why resample instead of sending at the model’s native rate?

MeetStream captures audio at 48 kHz (the standard WebAudio and Zoom SDK rate). Your model may need 16 kHz or 24 kHz. On the return path, MeetStream’s virtual speaker operates at 48 kHz. The bridge handles both conversions so neither MeetStream nor your model needs to know about the other’s sample rate.