Python
Table of Contents
- Install Dependencies
- API Client Setup
- Obtaining a Ticket and Connecting via WebSocket
- Real-Time Speech Translation
- Task Export (Audio / Transcript Download)
- Audio Import
- Loading History (SSE)
- Broadcast Viewer
- Error Handling
- Webhook Handling
- Async Version (asyncio + aiohttp)
Install Dependencies
pip install requests websockets sseclient-py aiohttp
API Client Setup
A wrapper around requests that authenticates uniformly via the X-API-Key header.
import requests
from typing import Optional
class VasApiClient:
"""VAS API Client - authenticates via the X-API-Key header"""
def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
})
def _url(self, path: str) -> str:
return f"{self.base_url}{path}"
def _handle_response(self, response: requests.Response) -> dict:
if not response.ok:
error = response.json() if response.content else {}
raise VasApiError(
message=error.get("message", f"HTTP {response.status_code}"),
status_code=response.status_code,
error_code=error.get("error_code"),
)
return response.json()
# === Authentication API ===
def get_ticket(self) -> dict:
"""Obtain a one-time Ticket for the WebSocket connection"""
response = self.session.post(self._url("/auth/ticket"))
return self._handle_response(response)
# === Tasks API ===
def get_tasks(self, status: str = "completed") -> dict:
"""Get the task list"""
response = self.session.get(
self._url("/tasks"),
params={"status": status},
)
return self._handle_response(response)
def delete_task(self, task_id: str) -> dict:
"""Delete a task"""
response = self.session.delete(self._url(f"/tasks/{task_id}"))
return self._handle_response(response)
def pin_task(self, task_id: str, is_pinned: bool) -> dict:
"""Update the pinned status"""
response = self.session.put(
self._url(f"/tasks/{task_id}/pin"),
json={"is_pinned": is_pinned},
)
return self._handle_response(response)
def mark_as_read(self, task_id: str) -> dict:
"""Mark as read"""
response = self.session.put(self._url(f"/tasks/{task_id}/read"))
return self._handle_response(response)
def rename_task(self, task_id: str, name: str) -> dict:
"""Update the task name"""
response = self.session.patch(
self._url(f"/tasks/{task_id}/name"),
json={"name": name},
)
return self._handle_response(response)
# === Imports API ===
def check_quota(self, duration_ms: int) -> dict:
"""Check the upload budget"""
response = self.session.post(
self._url("/imports/check-quota"),
json={"duration_ms": duration_ms},
)
return self._handle_response(response)
def upload_audio(
self,
file_path: str,
transcription_languages: list,
recognition_mode: str = "single",
translation_languages: Optional[list] = None,
summary_template: Optional[str] = None,
terminology: Optional[dict] = None,
callback_url: Optional[str] = None,
) -> dict:
"""Upload an audio file"""
import json
with open(file_path, "rb") as f:
files = {"file": f}
data = {
"transcription_languages": json.dumps(transcription_languages),
"recognition_mode": recognition_mode,
}
if translation_languages:
data["translation_languages"] = json.dumps(translation_languages)
if summary_template:
data["summary_template"] = summary_template
if terminology:
data["terminology"] = json.dumps(terminology)
if callback_url:
data["callback_url"] = callback_url
response = self.session.post(
self._url("/imports"),
files=files,
data=data,
)
return self._handle_response(response)
def get_import_status(self, import_id: str) -> dict:
"""Query the import status"""
response = self.session.get(self._url(f"/imports/{import_id}"))
return self._handle_response(response)
def get_imports(self, per_page: int = 20) -> dict:
"""Get the import list"""
response = self.session.get(
self._url("/imports"),
params={"per_page": per_page},
)
return self._handle_response(response)
# === Broadcasts API ===
def create_broadcast(self, options: dict) -> dict:
"""Create a broadcast"""
response = self.session.post(self._url("/broadcasts"), json=options)
return self._handle_response(response)
def get_broadcast(self, broadcast_id: str) -> dict:
"""Query the broadcast status"""
response = self.session.get(self._url(f"/broadcasts/{broadcast_id}"))
return self._handle_response(response)
def get_broadcasts(self, page: int = 1, per_page: int = 20) -> dict:
"""Get the broadcast list"""
response = self.session.get(
self._url("/broadcasts"),
params={"page": page, "per_page": per_page},
)
return self._handle_response(response)
def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
"""Update the broadcast settings"""
response = self.session.patch(
self._url(f"/broadcasts/{broadcast_id}"),
json=options,
)
return self._handle_response(response)
def revoke_broadcast(self, broadcast_id: str) -> dict:
"""Revoke a broadcast"""
response = self.session.delete(self._url(f"/broadcasts/{broadcast_id}"))
return self._handle_response(response)
# === TTS API ===
def get_tts_voices(self, language: str) -> dict:
"""Get the TTS voice list"""
response = self.session.get(
self._url("/tts/voices"),
params={"language": language},
)
return self._handle_response(response)
def download_tts_sample(self, voice_name: str, output_path: str):
"""Download a TTS voice sample"""
response = self.session.get(self._url(f"/tts/voices/{voice_name}/sample"))
if response.ok:
with open(output_path, "wb") as f:
f.write(response.content)
else:
raise VasApiError(f"Download failed: HTTP {response.status_code}", response.status_code)
# === Speakers API ===
def rename_speaker(self, recording_id: str, speaker_id: str, new_label: str) -> dict:
"""Rename a speaker globally"""
response = self.session.patch(
self._url(f"/recordings/{recording_id}/speakers/rename"),
json={"speaker_id": speaker_id, "new_label": new_label},
)
return self._handle_response(response)
def reassign_speaker(self, recording_id: str, sid: int, target_speaker_id: str) -> dict:
"""Reassign the speaker of a single sentence"""
response = self.session.patch(
self._url(f"/recordings/{recording_id}/speakers/reassign"),
json={"sid": sid, "target_speaker_id": target_speaker_id},
)
return self._handle_response(response)
# === Summary Templates API ===
def get_summary_templates(self) -> dict:
"""Get the summary template list"""
response = self.session.get(self._url("/summary-templates"))
return self._handle_response(response)
# === Audio API ===
def download_audio(self, task_id: str, output_path: str):
"""Download the recording audio of a task"""
response = self.session.get(self._url(f"/sse/audio/{task_id}"))
if response.ok:
with open(output_path, "wb") as f:
f.write(response.content)
else:
raise VasApiError(f"Download failed: HTTP {response.status_code}", response.status_code)
# Usage example
if __name__ == "__main__":
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
# Get the task list
tasks = api.get_tasks()
print(f"There are {len(tasks['tasks'])} tasks")
for task in tasks["tasks"]:
print(f" [{task['task_id'][:8]}] {task['title']} ({task['duration_formatted']})")
Obtaining a Ticket and Connecting via WebSocket
The WebSocket uses a Ticket-based authentication mechanism; the Ticket is passed through Sec-WebSocket-Protocol.
import asyncio
import websockets
import json
async def connect_websocket(api: VasApiClient):
"""Obtain a Ticket and establish a WebSocket connection"""
# Step 1: Obtain a one-time Ticket
result = api.get_ticket()
ticket = result["ticket"]
print(f"Ticket obtained, valid for {result['expires_in']} seconds")
# Step 2: Connect to the WebSocket using the Ticket
ws = await websockets.connect(
"wss://vas-poc.vurbo.ai/ws",
subprotocols=[f"ticket.{ticket}"],
)
print("WebSocket connected successfully")
return ws
# Usage example
async def main():
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
ws = await connect_websocket(api)
# Send ping
await ws.send(json.dumps({
"type": "health",
"data": {"action": "ping"},
}))
# Receive pong
response = await ws.recv()
msg = json.loads(response)
print(f"Received: {msg}")
await ws.close()
asyncio.run(main())
Real-Time Speech Translation
A complete VurboClient class using the websockets library.
import asyncio
import websockets
import json
import base64
from typing import Callable, Optional, List
class VurboClient:
"""VAS WebSocket client - authenticates via the Ticket mechanism"""
def __init__(self, api: VasApiClient):
self.api = api
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.on_result: Optional[Callable] = None
self.on_error: Optional[Callable] = None
self.on_task_complete: Optional[Callable] = None
self.on_session_started: Optional[Callable] = None
self.on_tts_ready: Optional[Callable] = None
self._heartbeat_task: Optional[asyncio.Task] = None
async def connect(self):
"""Connect to the WebSocket using the Ticket mechanism"""
result = self.api.get_ticket()
ticket = result["ticket"]
self.ws = await websockets.connect(
"wss://vas-poc.vurbo.ai/ws",
subprotocols=[f"ticket.{ticket}"],
)
print("Vurbo.ai connected successfully")
# Start the heartbeat
self._heartbeat_task = asyncio.create_task(self._heartbeat())
async def _heartbeat(self):
"""Send a ping every 30 seconds"""
while True:
try:
await asyncio.sleep(30)
if self.ws and self.ws.open:
await self.ws.send(json.dumps({
"type": "health",
"data": {"action": "ping"},
}))
except Exception:
break
async def start_translation(
self,
source_lang: List[str],
target_lang: Optional[List[str]] = None,
recording_type: str = "transcribe",
recognition_mode: str = "single",
summary_template: str = "meeting",
audio_format: str = "pcm",
tts_enabled: bool = False,
tts_language: Optional[str] = None,
tts_voice: Optional[str] = None,
):
"""Start speech translation"""
data = {
"action": "start",
"transcription_languages": source_lang,
"translation_languages": target_lang or [],
"type": recording_type,
"recognition_mode": recognition_mode,
"audio_format": audio_format,
}
if recording_type == "transcribe":
data["summary_template"] = summary_template
if tts_enabled:
data["tts_enabled"] = True
data["tts_language"] = tts_language
data["tts_voice"] = tts_voice
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": data,
}))
async def set_config(
self,
terminology: Optional[dict] = None,
fuzzy_correction: Optional[dict] = None,
translation_dict: Optional[list] = None,
):
"""Configure the terminology list, fuzzy-term correction, and translation dictionary"""
data = {"action": "config"}
if terminology:
data["terminology"] = terminology
if fuzzy_correction:
data["fuzzy_correction"] = fuzzy_correction
if translation_dict:
data["translation_dict"] = translation_dict
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": data,
}))
async def send_audio(self, audio_data: bytes):
"""Send audio data (automatically Base64-encoded)"""
base64_audio = base64.b64encode(audio_data).decode("utf-8")
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "audio", "payload": base64_audio},
}))
async def pause(self):
"""Pause the translation"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "pause"},
}))
async def resume(self):
"""Resume the translation"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "resume"},
}))
async def stop(self):
"""Stop the translation"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "stop"},
}))
async def set_name(self, name: str):
"""Set the recording name"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "set_name", "name": name},
}))
async def switch_language(self, target_languages: List[str]):
"""Switch the translation language"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "switch_language",
"translation_languages": target_languages,
},
}))
async def rename_speaker(self, speaker_id: str, new_label: str):
"""Rename a speaker globally"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "rename_speaker",
"speaker_id": speaker_id,
"new_label": new_label,
},
}))
async def reassign_speaker(self, sid: int, target_speaker_id: str):
"""Reassign the speaker of a single sentence"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "reassign_speaker",
"sid": sid,
"target_speaker_id": target_speaker_id,
},
}))
async def merge_speakers(self, source_speaker_id: str, target_speaker_id: str):
"""Merge speakers"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "merge_speakers",
"source_speaker_id": source_speaker_id,
"target_speaker_id": target_speaker_id,
},
}))
async def tts_play(self, sid: int, length: int = 1):
"""Play TTS"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "tts_play", "sid": sid, "length": length},
}))
async def tts_stop(self):
"""Stop TTS"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "tts_stop"},
}))
async def receive_messages(self):
"""Continuously receive and process messages"""
async for message in self.ws:
msg = json.loads(message)
await self._handle_message(msg)
async def _handle_message(self, msg: dict):
"""Handle an incoming message"""
if msg["type"] == "error":
print(f"Error: {msg['data']['message']}")
if self.on_error:
self.on_error(msg["data"])
return
if msg["type"] == "voice-translation":
data = msg["data"]
action = data.get("action")
if action == "session_started":
if self.on_session_started:
self.on_session_started(data)
elif action == "result":
if self.on_result:
self.on_result(
origin=data.get("origin"),
translations=data.get("translations"),
)
elif action == "task_complete":
if self.on_task_complete:
self.on_task_complete(data.get("task_id"))
elif action == "tts_ready":
if self.on_tts_ready:
self.on_tts_ready(data)
async def disconnect(self):
"""Close the connection"""
if self._heartbeat_task:
self._heartbeat_task.cancel()
if self.ws:
await self.ws.close()
Usage Example: Real-Time Translation
import asyncio
async def main():
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
client = VurboClient(api)
# Set up callbacks
def on_session_started(data):
print(f"Session started: {data['session_id']}")
print(f"Recording ID: {data['recording_id']}")
def on_result(origin, translations):
if origin and origin.get("is_final"):
print(f"[{origin['start_time']}] {origin['text']}")
if translations:
for lang, result in translations.items():
if result.get("is_final"):
print(f" Translation ({lang}): {result['text']}")
def on_task_complete(task_id):
print(f"Task complete: {task_id}")
client.on_session_started = on_session_started
client.on_result = on_result
client.on_task_complete = on_task_complete
# Connect
await client.connect()
# (Optional) configure the terminology list
await client.set_config(
terminology={
"zh-TW": [
{"term": "speaker diarization", "boost": 1.5},
{"term": "real-time transcription", "boost": 1.5},
],
},
)
# Start translation
await client.start_translation(
source_lang=["zh-TW"],
target_lang=["en-US"],
recording_type="transcribe",
recognition_mode="multi_speaker",
summary_template="meeting",
)
# Receive messages
await client.receive_messages()
asyncio.run(main())
Usage Example: Microphone Recording
import asyncio
import pyaudio
async def record_and_translate():
"""Record from the microphone and translate in real time"""
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
client = VurboClient(api)
sentences = []
def on_result(origin, translations):
if origin and origin.get("is_final"):
sentence = {
"sid": origin["sid"],
"text": origin["text"],
"time": origin["start_time"],
}
if translations:
first_lang = list(translations.keys())[0]
sentence["translation"] = translations[first_lang]["text"]
sentences.append(sentence)
print(f"[{sentence['time']}] {sentence['text']}")
if sentence.get("translation"):
print(f" Translation: {sentence['translation']}")
def on_task_complete(task_id):
print(f"\nTask complete: {task_id}")
print(f"{len(sentences)} sentences total")
client.on_result = on_result
client.on_task_complete = on_task_complete
# Connect and start
await client.connect()
await client.start_translation(
source_lang=["zh-TW"],
target_lang=["en-US"],
)
# Start microphone recording
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=4096,
)
print("Recording started... (press Ctrl+C to stop)")
async def send_audio_loop():
while True:
data = stream.read(4096, exception_on_overflow=False)
await client.send_audio(data)
await asyncio.sleep(0)
try:
await asyncio.gather(
send_audio_loop(),
client.receive_messages(),
)
except KeyboardInterrupt:
print("\nStopping recording...")
await client.stop()
# Wait for the task_complete event
try:
await asyncio.wait_for(client.receive_messages(), timeout=30)
except asyncio.TimeoutError:
pass
finally:
stream.stop_stream()
stream.close()
audio.terminate()
await client.disconnect()
asyncio.run(record_and_translate())
Task Export (Audio / Transcript Download)
Download the audio or transcript of a completed task. Transcripts support five formats — txt, srt, sbv, vtt, csv — and the content includes the original text and all translation languages.
import re
from urllib.parse import unquote
from pathlib import Path
def _parse_filename(content_disposition: str, fallback: str) -> str:
"""Extract the RFC 5987-encoded UTF-8 filename from Content-Disposition."""
match = re.search(r"filename\*=UTF-8''([^;]+)", content_disposition or "")
return unquote(match.group(1)) if match else fallback
def download_audio(api: VasApiClient, task_id: str, output_dir: str = ".") -> str:
"""Download the task audio. Returns the saved file path."""
response = api.session.get(
f"{api.base_url}/api/v1/tasks/{task_id}/audio/export",
stream=True,
timeout=120,
)
response.raise_for_status()
filename = _parse_filename(
response.headers.get("Content-Disposition", ""),
fallback=f"audio-{task_id}.m4a",
)
output_path = Path(output_dir) / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return str(output_path)
def download_transcript(
api: VasApiClient,
task_id: str,
format: str = "srt",
output_dir: str = ".",
) -> str:
"""Download the transcript. format supports txt / srt / sbv / vtt / csv."""
if format not in {"txt", "srt", "sbv", "vtt", "csv"}:
raise ValueError(f"Unsupported format: {format}")
response = api.session.get(
f"{api.base_url}/api/v1/tasks/{task_id}/transcript/export",
params={"format": format},
stream=True,
timeout=60,
)
response.raise_for_status()
filename = _parse_filename(
response.headers.get("Content-Disposition", ""),
fallback=f"transcript-{task_id}.{format}",
)
output_path = Path(output_dir) / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return str(output_path)
# Usage example
audio_path = download_audio(client, task_id="550e8400-e29b-41d4-a716-446655440000")
print(f"Audio downloaded: {audio_path}")
srt_path = download_transcript(
client,
task_id="550e8400-e29b-41d4-a716-446655440000",
format="srt",
)
print(f"Subtitles downloaded: {srt_path}")
# The CSV version can be passed directly to pandas for analysis
# import pandas as pd
# csv_path = download_transcript(client, task_id, format="csv")
# df = pd.read_csv(csv_path)
Note: Export requires
processing_status = completed; if it is not ready, a422 recording_transcript_not_readyis returned.
Audio Import
Upload an audio file and poll for the processing status.
import time
def import_audio(
api: VasApiClient,
file_path: str,
transcription_languages: list = None,
translation_languages: list = None,
recognition_mode: str = "multi_speaker",
summary_template: str = "meeting",
) -> dict:
"""Upload an audio file and wait for processing to complete"""
transcription_languages = transcription_languages or ["zh-TW"]
translation_languages = translation_languages or ["en-US"]
# Step 1: Upload the audio file
print(f"Uploading audio: {file_path}")
result = api.upload_audio(
file_path=file_path,
transcription_languages=transcription_languages,
translation_languages=translation_languages,
recognition_mode=recognition_mode,
summary_template=summary_template,
)
import_id = result["data"]["import_id"]
print(f"Import ID: {import_id}")
# Step 2: Poll the status
while True:
status_result = api.get_import_status(import_id)
data = status_result["data"]
status = data["status"]
stage = data["stage"]
progress = data["progress"]
print(f"Status: {status} | Stage: {stage} | Progress: {progress}%")
if status == "completed":
task_id = data["task_id"]
print(f"Processing complete! Task ID: {task_id}")
return {"import_id": import_id, "task_id": task_id}
if status == "failed":
raise VasApiError(
message=f"Processing failed: {data['error_message']}",
status_code=500,
error_code=data["error_code"],
)
time.sleep(5) # Query once every 5 seconds
# Usage example
if __name__ == "__main__":
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
# Check the budget first
quota = api.check_quota(3600000) # 1-hour audio file
if quota["data"]["allowed"]:
print(f"Sufficient budget, ${quota['data']['remaining_budget']} USD remaining")
result = import_audio(
api,
file_path="meeting.mp3",
transcription_languages=["zh-TW"],
translation_languages=["en-US", "ja-JP"],
recognition_mode="multi_speaker",
summary_template="meeting",
)
print(f"You can use the Task ID to load history: {result['task_id']}")
else:
print("Insufficient budget")
Loading History (SSE)
Use sseclient-py to handle the SSE stream; authentication is done via the X-API-Key header.
import requests
import sseclient
import json
def load_history(task_id: str, api_key: str) -> dict:
"""Load the history"""
url = f"https://vas-poc.vurbo.ai/api/v1/sse/history/transcribe/{task_id}"
response = requests.get(
url,
headers={"X-API-Key": api_key},
stream=True,
)
client = sseclient.SSEClient(response)
metadata = None
sentences = []
summary = None
for event in client.events():
if event.event == "init_metadata":
metadata = json.loads(event.data)
print(f"Task: {metadata['title']}")
elif event.event == "init_sentence":
sentence = json.loads(event.data)
sentences.append(sentence)
elif event.event == "init_summary":
summary = json.loads(event.data)["text"]
elif event.event == "init_done":
break
return {
"metadata": metadata,
"sentences": sentences,
"summary": summary,
}
def retranslate(task_id: str, target_lang: str, api_key: str) -> list:
"""Retranslate the full text"""
url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/{task_id}?targetLang={target_lang}"
response = requests.get(
url,
headers={"X-API-Key": api_key},
stream=True,
)
client = sseclient.SSEClient(response)
results = []
for event in client.events():
if event.event == "translation":
data = json.loads(event.data)
results.append(data)
print(f"Sentence {data['sid']}: {data['text']}")
elif event.event == "done":
break
return results
def retranslate_summary(task_id: str, target_lang: str, api_key: str) -> str:
"""Retranslate the summary"""
url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/summary/{task_id}?targetLang={target_lang}"
response = requests.get(
url,
headers={"X-API-Key": api_key},
stream=True,
)
client = sseclient.SSEClient(response)
final_text = ""
for event in client.events():
if event.event == "summary_translation":
data = json.loads(event.data)
if data.get("is_final"):
final_text = data["text"]
elif event.event == "done":
break
return final_text
# Usage example
if __name__ == "__main__":
api_key = "vas_YOUR_API_KEY_HERE_32_CHARACTERS"
task_id = "550e8400-e29b-41d4-a716-446655440000"
# Load history
history = load_history(task_id, api_key)
print(f"\n{len(history['sentences'])} sentences total")
for s in history["sentences"]:
print(f" [{s['start_time']}] {s['origin']}")
if s.get("translations"):
for lang, text in s["translations"].items():
print(f" [{lang}] {text}")
if history["summary"]:
print(f"\nSummary: {history['summary'][:100]}...")
# Retranslate into Japanese
print("\nRetranslating into Japanese...")
translations = retranslate(task_id, "ja-JP", api_key)
print(f"Translation complete, {len(translations)} sentences total")
Broadcast Viewer
Viewers receive real-time subtitles and translations via SSE.
import requests
import sseclient
import json
class BroadcastViewer:
"""Broadcast viewer"""
def __init__(self, token: str):
self.token = token
self.base_url = "https://vas-poc.vurbo.ai"
def get_broadcast_info(self) -> dict:
"""Get the broadcast's public info (no authentication required)"""
response = requests.get(
f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}"
)
return response.json()
def verify_password(self, password: str) -> dict:
"""Verify the password"""
response = requests.post(
f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}/verify",
json={"password": password},
)
return response.json()
def connect(
self,
lang: str = None,
viewer_access_token: str = None,
tts_voice: str = None,
on_sentence=None,
on_translation=None,
on_tts_ready=None,
on_announcement=None,
on_standby=None,
on_live=None,
on_ended=None,
):
"""Connect via SSE to receive real-time subtitles"""
params = {}
if lang:
params["lang"] = lang
if viewer_access_token:
params["viewer_access_token"] = viewer_access_token
if tts_voice:
params["tts_voice"] = tts_voice
url = f"{self.base_url}/broadcast/{self.token}/text"
response = requests.get(url, params=params, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data) if event.data else {}
if event.event == "sentence" and on_sentence:
on_sentence(data)
elif event.event == "translation" and on_translation:
on_translation(data)
elif event.event == "tts_ready" and on_tts_ready:
on_tts_ready(data)
elif event.event == "announcement" and on_announcement:
on_announcement(data)
elif event.event == "standby" and on_standby:
on_standby(data)
elif event.event == "live" and on_live:
on_live()
elif event.event == "ended":
if on_ended:
on_ended()
break
# Usage example
if __name__ == "__main__":
viewer = BroadcastViewer("a3f9")
# Get the broadcast info
info = viewer.get_broadcast_info()
print(f"Broadcast name: {info['data']['name']}")
print(f"Password required: {info['data']['requires_password']}")
print(f"Available languages: {info['data']['translation_languages']}")
# If password verification is required
viewer_access_token = None
if info["data"]["requires_password"]:
verify_result = viewer.verify_password("mySecret123")
viewer_access_token = verify_result["data"]["viewer_access_token"]
print("Password verified successfully")
# Receive real-time subtitles
def on_sentence(data):
print(f"[Original] {data['text']}")
def on_translation(data):
print(f"[Translation] {data['text']}")
def on_announcement(data):
print(f"[Announcement] {data['message']}")
def on_ended():
print("Broadcast ended")
print("Receiving subtitles...")
viewer.connect(
lang="en-US",
viewer_access_token=viewer_access_token,
on_sentence=on_sentence,
on_translation=on_translation,
on_announcement=on_announcement,
on_ended=on_ended,
)
Webhook Handling
Receive and verify VAS Webhook events.
import hmac
import hashlib
import json
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_here"
def verify_signature(timestamp, body, signature):
"""Verify the Webhook signature"""
signature_payload = f"{timestamp}.{body}"
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(),
signature_payload.encode(),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(signature, expected)
@app.route("/webhooks/vas", methods=["POST"])
def handle_webhook():
timestamp = request.headers.get("X-VAS-Timestamp", "")
signature = request.headers.get("X-VAS-Signature", "")
raw_body = request.get_data(as_text=True)
if not verify_signature(timestamp, raw_body, signature):
return jsonify({"error": "Invalid signature"}), 401
payload = request.get_json()
event = payload["event"]
data = payload["data"]
if event == "recording.completed":
print(f"Recording completed: task_id={data['task_id']}")
elif event == "import.completed":
print(f"Import completed: import_id={data['import_id']}")
elif event in ("recording.failed", "import.failed"):
print(f"Processing failed: {data.get('error') or data.get('error_message')}")
return jsonify({"received": True}), 200
For the complete Webhook guide, see Webhook Callback Guide.
Error Handling
class VasApiError(Exception):
"""VAS API error"""
def __init__(self, message: str, status_code: int = None, error_code: str = None):
super().__init__(message)
self.status_code = status_code
self.error_code = error_code
def __str__(self):
parts = [self.args[0]]
if self.error_code:
parts.insert(0, f"[{self.error_code}]")
if self.status_code:
parts.insert(0, f"HTTP {self.status_code}")
return " ".join(parts)
def handle_api_error(error: VasApiError):
"""Unified error handling"""
if error.error_code in ("auth_missing_api_key", "auth_invalid_api_key", "auth_key_expired"):
print(f"Authentication failed: {error}")
print("Please check that your API Key is correct")
elif error.error_code == "auth_budget_exceeded":
print(f"Budget exceeded: {error}")
print("Please wait for next month's budget reset or adjust the budget")
elif error.error_code in ("recording_not_found", "import_not_found"):
print(f"Resource not found: {error}")
elif error.error_code == "broadcast_session_not_found":
print(f"Broadcast not found: {error}")
elif error.error_code == "broadcast_cannot_revoke":
print(f"Only broadcasts in the pending state can be revoked: {error}")
elif error.error_code == "validation_failed":
print(f"Parameter validation failed: {error}")
elif error.error_code == "import_file_too_large":
print(f"File too large: {error}")
print("Please compress or split the file (500MB limit)")
elif error.error_code == "import_invalid_format":
print(f"Unsupported format: {error}")
print("Supported formats: mp3, wav, m4a")
elif error.error_code == "ticket_invalid":
print(f"Ticket invalid: {error}")
print("Please obtain a new Ticket")
else:
print(f"API error: {error}")
# Usage example
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
try:
tasks = api.get_tasks()
except VasApiError as e:
handle_api_error(e)
except requests.exceptions.ConnectionError:
print("Network connection failed; please check your network settings")
except requests.exceptions.Timeout:
print("Request timed out; please try again later")
Async Version (asyncio + aiohttp)
A complete asynchronous API Client, suitable for high-performance scenarios or those that require concurrent processing.
import asyncio
import aiohttp
import json
from typing import Optional
class AsyncVasApiClient:
"""Asynchronous VAS API Client"""
def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
self.api_key = api_key
self.base_url = base_url
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={"X-API-Key": self.api_key},
)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
async def _request(self, method: str, path: str, **kwargs) -> dict:
session = await self._get_session()
url = f"{self.base_url}{path}"
async with session.request(method, url, **kwargs) as response:
if not response.ok:
error = await response.json()
raise VasApiError(
message=error.get("message", f"HTTP {response.status}"),
status_code=response.status,
error_code=error.get("error_code"),
)
return await response.json()
# === Authentication ===
async def get_ticket(self) -> dict:
return await self._request("POST", "/auth/ticket")
# === Tasks ===
async def get_tasks(self) -> dict:
return await self._request("GET", "/tasks")
async def delete_task(self, task_id: str) -> dict:
return await self._request("DELETE", f"/tasks/{task_id}")
async def pin_task(self, task_id: str, is_pinned: bool) -> dict:
return await self._request("PUT", f"/tasks/{task_id}/pin", json={"is_pinned": is_pinned})
async def mark_as_read(self, task_id: str) -> dict:
return await self._request("PUT", f"/tasks/{task_id}/read")
async def rename_task(self, task_id: str, name: str) -> dict:
return await self._request("PATCH", f"/tasks/{task_id}/name", json={"name": name})
# === Imports ===
async def check_quota(self, duration_ms: int) -> dict:
return await self._request("POST", "/imports/check-quota", json={"duration_ms": duration_ms})
async def upload_audio(self, file_path: str, options: dict) -> dict:
session = await self._get_session()
url = f"{self.base_url}/imports"
form = aiohttp.FormData()
form.add_field("file", open(file_path, "rb"), filename=file_path.split("/")[-1])
form.add_field("transcription_languages", json.dumps(options.get("transcription_languages", ["zh-TW"])))
form.add_field("recognition_mode", options.get("recognition_mode", "single"))
if "translation_languages" in options:
form.add_field("translation_languages", json.dumps(options["translation_languages"]))
if "summary_template" in options:
form.add_field("summary_template", options["summary_template"])
async with session.post(url, data=form) as response:
if not response.ok:
error = await response.json()
raise VasApiError(
message=error.get("message", f"HTTP {response.status}"),
status_code=response.status,
error_code=error.get("error_code"),
)
return await response.json()
async def get_import_status(self, import_id: str) -> dict:
return await self._request("GET", f"/imports/{import_id}")
# === Broadcasts ===
async def create_broadcast(self, options: dict) -> dict:
return await self._request("POST", "/broadcasts", json=options)
async def get_broadcast(self, broadcast_id: str) -> dict:
return await self._request("GET", f"/broadcasts/{broadcast_id}")
async def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
return await self._request("PATCH", f"/broadcasts/{broadcast_id}", json=options)
async def revoke_broadcast(self, broadcast_id: str) -> dict:
return await self._request("DELETE", f"/broadcasts/{broadcast_id}")
# === TTS ===
async def get_tts_voices(self, language: str) -> dict:
return await self._request("GET", "/tts/voices", params={"language": language})
# === Summary Templates ===
async def get_summary_templates(self) -> dict:
return await self._request("GET", "/summary-templates")
# === SSE History (asynchronous) ===
async def load_history(self, task_id: str) -> dict:
"""Load the history asynchronously"""
session = await self._get_session()
url = f"{self.base_url}/sse/history/transcribe/{task_id}"
metadata = None
sentences = []
summary = None
async with session.get(url) as response:
buffer = ""
current_event = ""
async for chunk in response.content.iter_any():
buffer += chunk.decode("utf-8")
lines = buffer.split("\n")
buffer = lines.pop()
for line in lines:
if line.startswith("event: "):
current_event = line[7:]
elif line.startswith("data: "):
data = json.loads(line[6:])
if current_event == "init_metadata":
metadata = data
elif current_event == "init_sentence":
sentences.append(data)
elif current_event == "init_summary":
summary = data.get("text")
elif current_event == "init_done":
return {
"metadata": metadata,
"sentences": sentences,
"summary": summary,
}
return {"metadata": metadata, "sentences": sentences, "summary": summary}
async def retranslate(self, task_id: str, target_lang: str) -> list:
"""Retranslate asynchronously"""
session = await self._get_session()
url = f"{self.base_url}/sse/retranslate/{task_id}"
results = []
async with session.get(url, params={"targetLang": target_lang}) as response:
buffer = ""
current_event = ""
async for chunk in response.content.iter_any():
buffer += chunk.decode("utf-8")
lines = buffer.split("\n")
buffer = lines.pop()
for line in lines:
if line.startswith("event: "):
current_event = line[7:]
elif line.startswith("data: "):
data = json.loads(line[6:])
if current_event == "translation":
results.append(data)
elif current_event == "done":
return results
return results
# Usage example: concurrent operations
async def main():
api = AsyncVasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
try:
# Fetch multiple resources concurrently
tasks_result, templates_result, tts_languages = await asyncio.gather(
api.get_tasks(),
api.get_summary_templates(),
api.get_tts_languages(),
)
print(f"Task count: {len(tasks_result['tasks'])}")
print(f"Summary templates: {[t['name'] for t in templates_result['data']]}")
print(f"TTS language count: {tts_languages['data']['total']}")
# Load history
if tasks_result["tasks"]:
task_id = tasks_result["tasks"][0]["task_id"]
history = await api.load_history(task_id)
print(f"\nHistory: {len(history['sentences'])} sentences")
except VasApiError as e:
handle_api_error(e)
finally:
await api.close()
asyncio.run(main())
Async Upload and Polling
async def async_import_audio(api: AsyncVasApiClient, file_path: str, options: dict) -> dict:
"""Upload an audio file asynchronously and wait for processing to complete"""
# Upload
result = await api.upload_audio(file_path, options)
import_id = result["data"]["import_id"]
print(f"Import ID: {import_id}")
# Poll
while True:
status_result = await api.get_import_status(import_id)
data = status_result["data"]
print(f"Status: {data['status']} | Stage: {data['stage']} | Progress: {data['progress']}%")
if data["status"] == "completed":
return {"import_id": import_id, "task_id": data["task_id"]}
if data["status"] == "failed":
raise VasApiError(
message=f"Processing failed: {data['error_message']}",
status_code=500,
error_code=data["error_code"],
)
await asyncio.sleep(5)
Version: V1.5.7 Last Updated: 2026-05-20