Production deployments: Connect to the WebSocket API from a backend server to keep your API key secure. For end-user delivery, stream the generated audio through your own transport layer.
WebSocket Integration
Quick Example (Python)
This minimal example connects to Oris Voice, sends text, and saves the resulting audio as a WAV file.
import asyncio
import json
import struct
import time
import wave
import os
import websockets
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("OJIN_API_KEY", "")
CONFIG_ID = os.getenv("OJIN_CONFIG_ID", "")
WS_URL = f"wss://models.ojin.ai/realtime?config_id={CONFIG_ID}"
# Oris Voice outputs 24 kHz, 16-bit PCM mono
SAMPLE_RATE = 24000
SAMPLE_WIDTH = 2
CHANNELS = 1
def build_text_message(text):
"""Build a binary InteractionInput for text."""
payload = text.encode("utf-8")
header = struct.pack("!BQI", 0, int(time.time() * 1000), 0) # 0 = TEXT
return header + payload
def parse_response(data):
"""Parse a binary InteractionResponse, extract audio payloads."""
fmt = "!B16sQIII"
hdr_size = struct.calcsize(fmt)
is_final, _, _, _, _, num_payloads = struct.unpack(fmt, data[:hdr_size])
offset = hdr_size
audio_chunks = []
for _ in range(num_payloads):
size, ptype = struct.unpack("!IB", data[offset:offset + 5])
offset += 5
if ptype == 1 and size > 0: # 1 = audio
audio_chunks.append(data[offset:offset + size])
offset += size
return bool(is_final), audio_chunks
async def synthesize(text, output_path="output.wav"):
headers = websockets.Headers()
headers["Authorization"] = API_KEY
async with websockets.connect(
WS_URL,
additional_headers=headers,
open_timeout=None,
ping_timeout=None,
) as ws:
# 1. Wait for sessionReady
while True:
msg = await ws.recv()
if isinstance(msg, str):
parsed = json.loads(msg)
if parsed.get("type") == "sessionReady":
break
if parsed.get("type") == "errorResponse":
raise RuntimeError(parsed["payload"]["message"])
# 2. Send text input (binary) + endInteraction (JSON)
await ws.send(build_text_message(text))
await ws.send(json.dumps({
"type": "endInteraction",
"payload": {"timestamp": int(time.time() * 1000)},
}))
# 3. Collect audio chunks
audio_data = []
while True:
msg = await ws.recv()
if isinstance(msg, str):
parsed = json.loads(msg)
if parsed.get("type") == "errorResponse":
raise RuntimeError(parsed["payload"]["message"])
continue
is_final, chunks = parse_response(msg)
audio_data.extend(chunks)
if is_final:
break
# 4. Write WAV file
pcm = b"".join(audio_data)
with wave.open(output_path, "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(pcm)
duration = len(pcm) / (SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS)
print(f"Saved {output_path} ({duration:.2f}s)")
asyncio.run(synthesize("Hello, welcome to Ojin text to speech!"))
# Inside the receive loop, play each chunk immediately:
while True:
msg = await ws.recv()
if isinstance(msg, str):
# Handle JSON messages (errors, etc.)
continue
is_final, chunks = parse_response(msg)
for chunk in chunks:
play_audio(chunk) # Feed to your audio output (e.g., pyaudio, sounddevice)
if is_final:
break