Examples

Python

Table of Contents

  1. Install Dependencies
  2. API Client Setup
  3. Obtaining a Ticket and Connecting via WebSocket
  4. Real-Time Speech Translation
  5. Task Export (Audio / Transcript Download)
  6. Audio Import
  7. Loading History (SSE)
  8. Broadcast Viewer
  9. Error Handling
  10. Webhook Handling
  11. Async Version (asyncio + aiohttp)

Install Dependencies

pip install requests websockets sseclient-py aiohttp

API Client Setup

A wrapper around requests that authenticates uniformly via the X-API-Key header.

import requests
from typing import Optional

class VasApiClient:
    """VAS API Client - authenticates via the X-API-Key header"""

    def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": api_key,
        })

    def _url(self, path: str) -> str:
        return f"{self.base_url}{path}"

    def _handle_response(self, response: requests.Response) -> dict:
        if not response.ok:
            error = response.json() if response.content else {}
            raise VasApiError(
                message=error.get("message", f"HTTP {response.status_code}"),
                status_code=response.status_code,
                error_code=error.get("error_code"),
            )
        return response.json()

    # === Authentication API ===

    def get_ticket(self) -> dict:
        """Obtain a one-time Ticket for the WebSocket connection"""
        response = self.session.post(self._url("/auth/ticket"))
        return self._handle_response(response)

    # === Tasks API ===

    def get_tasks(self, status: str = "completed") -> dict:
        """Get the task list"""
        response = self.session.get(
            self._url("/tasks"),
            params={"status": status},
        )
        return self._handle_response(response)

    def delete_task(self, task_id: str) -> dict:
        """Delete a task"""
        response = self.session.delete(self._url(f"/tasks/{task_id}"))
        return self._handle_response(response)

    def pin_task(self, task_id: str, is_pinned: bool) -> dict:
        """Update the pinned status"""
        response = self.session.put(
            self._url(f"/tasks/{task_id}/pin"),
            json={"is_pinned": is_pinned},
        )
        return self._handle_response(response)

    def mark_as_read(self, task_id: str) -> dict:
        """Mark as read"""
        response = self.session.put(self._url(f"/tasks/{task_id}/read"))
        return self._handle_response(response)

    def rename_task(self, task_id: str, name: str) -> dict:
        """Update the task name"""
        response = self.session.patch(
            self._url(f"/tasks/{task_id}/name"),
            json={"name": name},
        )
        return self._handle_response(response)

    # === Imports API ===

    def check_quota(self, duration_ms: int) -> dict:
        """Check the upload budget"""
        response = self.session.post(
            self._url("/imports/check-quota"),
            json={"duration_ms": duration_ms},
        )
        return self._handle_response(response)

    def upload_audio(
        self,
        file_path: str,
        transcription_languages: list,
        recognition_mode: str = "single",
        translation_languages: Optional[list] = None,
        summary_template: Optional[str] = None,
        terminology: Optional[dict] = None,
        callback_url: Optional[str] = None,
    ) -> dict:
        """Upload an audio file"""
        import json

        with open(file_path, "rb") as f:
            files = {"file": f}
            data = {
                "transcription_languages": json.dumps(transcription_languages),
                "recognition_mode": recognition_mode,
            }

            if translation_languages:
                data["translation_languages"] = json.dumps(translation_languages)
            if summary_template:
                data["summary_template"] = summary_template
            if terminology:
                data["terminology"] = json.dumps(terminology)
            if callback_url:
                data["callback_url"] = callback_url

            response = self.session.post(
                self._url("/imports"),
                files=files,
                data=data,
            )

        return self._handle_response(response)

    def get_import_status(self, import_id: str) -> dict:
        """Query the import status"""
        response = self.session.get(self._url(f"/imports/{import_id}"))
        return self._handle_response(response)

    def get_imports(self, per_page: int = 20) -> dict:
        """Get the import list"""
        response = self.session.get(
            self._url("/imports"),
            params={"per_page": per_page},
        )
        return self._handle_response(response)

    # === Broadcasts API ===

    def create_broadcast(self, options: dict) -> dict:
        """Create a broadcast"""
        response = self.session.post(self._url("/broadcasts"), json=options)
        return self._handle_response(response)

    def get_broadcast(self, broadcast_id: str) -> dict:
        """Query the broadcast status"""
        response = self.session.get(self._url(f"/broadcasts/{broadcast_id}"))
        return self._handle_response(response)

    def get_broadcasts(self, page: int = 1, per_page: int = 20) -> dict:
        """Get the broadcast list"""
        response = self.session.get(
            self._url("/broadcasts"),
            params={"page": page, "per_page": per_page},
        )
        return self._handle_response(response)

    def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
        """Update the broadcast settings"""
        response = self.session.patch(
            self._url(f"/broadcasts/{broadcast_id}"),
            json=options,
        )
        return self._handle_response(response)

    def revoke_broadcast(self, broadcast_id: str) -> dict:
        """Revoke a broadcast"""
        response = self.session.delete(self._url(f"/broadcasts/{broadcast_id}"))
        return self._handle_response(response)

    # === TTS API ===

    def get_tts_voices(self, language: str) -> dict:
        """Get the TTS voice list"""
        response = self.session.get(
            self._url("/tts/voices"),
            params={"language": language},
        )
        return self._handle_response(response)

    def download_tts_sample(self, voice_name: str, output_path: str):
        """Download a TTS voice sample"""
        response = self.session.get(self._url(f"/tts/voices/{voice_name}/sample"))
        if response.ok:
            with open(output_path, "wb") as f:
                f.write(response.content)
        else:
            raise VasApiError(f"Download failed: HTTP {response.status_code}", response.status_code)

    # === Speakers API ===

    def rename_speaker(self, recording_id: str, speaker_id: str, new_label: str) -> dict:
        """Rename a speaker globally"""
        response = self.session.patch(
            self._url(f"/recordings/{recording_id}/speakers/rename"),
            json={"speaker_id": speaker_id, "new_label": new_label},
        )
        return self._handle_response(response)

    def reassign_speaker(self, recording_id: str, sid: int, target_speaker_id: str) -> dict:
        """Reassign the speaker of a single sentence"""
        response = self.session.patch(
            self._url(f"/recordings/{recording_id}/speakers/reassign"),
            json={"sid": sid, "target_speaker_id": target_speaker_id},
        )
        return self._handle_response(response)

    # === Summary Templates API ===

    def get_summary_templates(self) -> dict:
        """Get the summary template list"""
        response = self.session.get(self._url("/summary-templates"))
        return self._handle_response(response)

    # === Audio API ===

    def download_audio(self, task_id: str, output_path: str):
        """Download the recording audio of a task"""
        response = self.session.get(self._url(f"/sse/audio/{task_id}"))
        if response.ok:
            with open(output_path, "wb") as f:
                f.write(response.content)
        else:
            raise VasApiError(f"Download failed: HTTP {response.status_code}", response.status_code)

# Usage example
if __name__ == "__main__":
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

    # Get the task list
    tasks = api.get_tasks()
    print(f"There are {len(tasks['tasks'])} tasks")

    for task in tasks["tasks"]:
        print(f"  [{task['task_id'][:8]}] {task['title']} ({task['duration_formatted']})")

Obtaining a Ticket and Connecting via WebSocket

The WebSocket uses a Ticket-based authentication mechanism; the Ticket is passed through Sec-WebSocket-Protocol.

import asyncio
import websockets
import json

async def connect_websocket(api: VasApiClient):
    """Obtain a Ticket and establish a WebSocket connection"""
    # Step 1: Obtain a one-time Ticket
    result = api.get_ticket()
    ticket = result["ticket"]
    print(f"Ticket obtained, valid for {result['expires_in']} seconds")

    # Step 2: Connect to the WebSocket using the Ticket
    ws = await websockets.connect(
        "wss://vas-poc.vurbo.ai/ws",
        subprotocols=[f"ticket.{ticket}"],
    )
    print("WebSocket connected successfully")

    return ws

# Usage example
async def main():
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
    ws = await connect_websocket(api)

    # Send ping
    await ws.send(json.dumps({
        "type": "health",
        "data": {"action": "ping"},
    }))

    # Receive pong
    response = await ws.recv()
    msg = json.loads(response)
    print(f"Received: {msg}")

    await ws.close()

asyncio.run(main())

Real-Time Speech Translation

A complete VurboClient class using the websockets library.

import asyncio
import websockets
import json
import base64
from typing import Callable, Optional, List

class VurboClient:
    """VAS WebSocket client - authenticates via the Ticket mechanism"""

    def __init__(self, api: VasApiClient):
        self.api = api
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.on_result: Optional[Callable] = None
        self.on_error: Optional[Callable] = None
        self.on_task_complete: Optional[Callable] = None
        self.on_session_started: Optional[Callable] = None
        self.on_tts_ready: Optional[Callable] = None
        self._heartbeat_task: Optional[asyncio.Task] = None

    async def connect(self):
        """Connect to the WebSocket using the Ticket mechanism"""
        result = self.api.get_ticket()
        ticket = result["ticket"]

        self.ws = await websockets.connect(
            "wss://vas-poc.vurbo.ai/ws",
            subprotocols=[f"ticket.{ticket}"],
        )
        print("Vurbo.ai connected successfully")

        # Start the heartbeat
        self._heartbeat_task = asyncio.create_task(self._heartbeat())

    async def _heartbeat(self):
        """Send a ping every 30 seconds"""
        while True:
            try:
                await asyncio.sleep(30)
                if self.ws and self.ws.open:
                    await self.ws.send(json.dumps({
                        "type": "health",
                        "data": {"action": "ping"},
                    }))
            except Exception:
                break

    async def start_translation(
        self,
        source_lang: List[str],
        target_lang: Optional[List[str]] = None,
        recording_type: str = "transcribe",
        recognition_mode: str = "single",
        summary_template: str = "meeting",
        audio_format: str = "pcm",
        tts_enabled: bool = False,
        tts_language: Optional[str] = None,
        tts_voice: Optional[str] = None,
    ):
        """Start speech translation"""
        data = {
            "action": "start",
            "transcription_languages": source_lang,
            "translation_languages": target_lang or [],
            "type": recording_type,
            "recognition_mode": recognition_mode,
            "audio_format": audio_format,
        }

        if recording_type == "transcribe":
            data["summary_template"] = summary_template

        if tts_enabled:
            data["tts_enabled"] = True
            data["tts_language"] = tts_language
            data["tts_voice"] = tts_voice

        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": data,
        }))

    async def set_config(
        self,
        terminology: Optional[dict] = None,
        fuzzy_correction: Optional[dict] = None,
        translation_dict: Optional[list] = None,
    ):
        """Configure the terminology list, fuzzy-term correction, and translation dictionary"""
        data = {"action": "config"}
        if terminology:
            data["terminology"] = terminology
        if fuzzy_correction:
            data["fuzzy_correction"] = fuzzy_correction
        if translation_dict:
            data["translation_dict"] = translation_dict

        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": data,
        }))

    async def send_audio(self, audio_data: bytes):
        """Send audio data (automatically Base64-encoded)"""
        base64_audio = base64.b64encode(audio_data).decode("utf-8")
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "audio", "payload": base64_audio},
        }))

    async def pause(self):
        """Pause the translation"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "pause"},
        }))

    async def resume(self):
        """Resume the translation"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "resume"},
        }))

    async def stop(self):
        """Stop the translation"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "stop"},
        }))

    async def set_name(self, name: str):
        """Set the recording name"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "set_name", "name": name},
        }))

    async def switch_language(self, target_languages: List[str]):
        """Switch the translation language"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "switch_language",
                "translation_languages": target_languages,
            },
        }))

    async def rename_speaker(self, speaker_id: str, new_label: str):
        """Rename a speaker globally"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "rename_speaker",
                "speaker_id": speaker_id,
                "new_label": new_label,
            },
        }))

    async def reassign_speaker(self, sid: int, target_speaker_id: str):
        """Reassign the speaker of a single sentence"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "reassign_speaker",
                "sid": sid,
                "target_speaker_id": target_speaker_id,
            },
        }))

    async def merge_speakers(self, source_speaker_id: str, target_speaker_id: str):
        """Merge speakers"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "merge_speakers",
                "source_speaker_id": source_speaker_id,
                "target_speaker_id": target_speaker_id,
            },
        }))

    async def tts_play(self, sid: int, length: int = 1):
        """Play TTS"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "tts_play", "sid": sid, "length": length},
        }))

    async def tts_stop(self):
        """Stop TTS"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "tts_stop"},
        }))

    async def receive_messages(self):
        """Continuously receive and process messages"""
        async for message in self.ws:
            msg = json.loads(message)
            await self._handle_message(msg)

    async def _handle_message(self, msg: dict):
        """Handle an incoming message"""
        if msg["type"] == "error":
            print(f"Error: {msg['data']['message']}")
            if self.on_error:
                self.on_error(msg["data"])
            return

        if msg["type"] == "voice-translation":
            data = msg["data"]
            action = data.get("action")

            if action == "session_started":
                if self.on_session_started:
                    self.on_session_started(data)

            elif action == "result":
                if self.on_result:
                    self.on_result(
                        origin=data.get("origin"),
                        translations=data.get("translations"),
                    )

            elif action == "task_complete":
                if self.on_task_complete:
                    self.on_task_complete(data.get("task_id"))

            elif action == "tts_ready":
                if self.on_tts_ready:
                    self.on_tts_ready(data)

    async def disconnect(self):
        """Close the connection"""
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
        if self.ws:
            await self.ws.close()

Usage Example: Real-Time Translation

import asyncio

async def main():
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
    client = VurboClient(api)

    # Set up callbacks
    def on_session_started(data):
        print(f"Session started: {data['session_id']}")
        print(f"Recording ID: {data['recording_id']}")

    def on_result(origin, translations):
        if origin and origin.get("is_final"):
            print(f"[{origin['start_time']}] {origin['text']}")
        if translations:
            for lang, result in translations.items():
                if result.get("is_final"):
                    print(f"  Translation ({lang}): {result['text']}")

    def on_task_complete(task_id):
        print(f"Task complete: {task_id}")

    client.on_session_started = on_session_started
    client.on_result = on_result
    client.on_task_complete = on_task_complete

    # Connect
    await client.connect()

    # (Optional) configure the terminology list
    await client.set_config(
        terminology={
            "zh-TW": [
                {"term": "speaker diarization", "boost": 1.5},
                {"term": "real-time transcription", "boost": 1.5},
            ],
        },
    )

    # Start translation
    await client.start_translation(
        source_lang=["zh-TW"],
        target_lang=["en-US"],
        recording_type="transcribe",
        recognition_mode="multi_speaker",
        summary_template="meeting",
    )

    # Receive messages
    await client.receive_messages()

asyncio.run(main())

Usage Example: Microphone Recording

import asyncio
import pyaudio

async def record_and_translate():
    """Record from the microphone and translate in real time"""
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
    client = VurboClient(api)

    sentences = []

    def on_result(origin, translations):
        if origin and origin.get("is_final"):
            sentence = {
                "sid": origin["sid"],
                "text": origin["text"],
                "time": origin["start_time"],
            }
            if translations:
                first_lang = list(translations.keys())[0]
                sentence["translation"] = translations[first_lang]["text"]
            sentences.append(sentence)
            print(f"[{sentence['time']}] {sentence['text']}")
            if sentence.get("translation"):
                print(f"  Translation: {sentence['translation']}")

    def on_task_complete(task_id):
        print(f"\nTask complete: {task_id}")
        print(f"{len(sentences)} sentences total")

    client.on_result = on_result
    client.on_task_complete = on_task_complete

    # Connect and start
    await client.connect()
    await client.start_translation(
        source_lang=["zh-TW"],
        target_lang=["en-US"],
    )

    # Start microphone recording
    audio = pyaudio.PyAudio()
    stream = audio.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=16000,
        input=True,
        frames_per_buffer=4096,
    )

    print("Recording started... (press Ctrl+C to stop)")

    async def send_audio_loop():
        while True:
            data = stream.read(4096, exception_on_overflow=False)
            await client.send_audio(data)
            await asyncio.sleep(0)

    try:
        await asyncio.gather(
            send_audio_loop(),
            client.receive_messages(),
        )
    except KeyboardInterrupt:
        print("\nStopping recording...")
        await client.stop()

        # Wait for the task_complete event
        try:
            await asyncio.wait_for(client.receive_messages(), timeout=30)
        except asyncio.TimeoutError:
            pass
    finally:
        stream.stop_stream()
        stream.close()
        audio.terminate()
        await client.disconnect()

asyncio.run(record_and_translate())

Task Export (Audio / Transcript Download)

Download the audio or transcript of a completed task. Transcripts support five formats — txt, srt, sbv, vtt, csv — and the content includes the original text and all translation languages.

import re
from urllib.parse import unquote
from pathlib import Path

def _parse_filename(content_disposition: str, fallback: str) -> str:
    """Extract the RFC 5987-encoded UTF-8 filename from Content-Disposition."""
    match = re.search(r"filename\*=UTF-8''([^;]+)", content_disposition or "")
    return unquote(match.group(1)) if match else fallback

def download_audio(api: VasApiClient, task_id: str, output_dir: str = ".") -> str:
    """Download the task audio. Returns the saved file path."""
    response = api.session.get(
        f"{api.base_url}/api/v1/tasks/{task_id}/audio/export",
        stream=True,
        timeout=120,
    )
    response.raise_for_status()

    filename = _parse_filename(
        response.headers.get("Content-Disposition", ""),
        fallback=f"audio-{task_id}.m4a",
    )
    output_path = Path(output_dir) / filename
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    return str(output_path)

def download_transcript(
    api: VasApiClient,
    task_id: str,
    format: str = "srt",
    output_dir: str = ".",
) -> str:
    """Download the transcript. format supports txt / srt / sbv / vtt / csv."""
    if format not in {"txt", "srt", "sbv", "vtt", "csv"}:
        raise ValueError(f"Unsupported format: {format}")

    response = api.session.get(
        f"{api.base_url}/api/v1/tasks/{task_id}/transcript/export",
        params={"format": format},
        stream=True,
        timeout=60,
    )
    response.raise_for_status()

    filename = _parse_filename(
        response.headers.get("Content-Disposition", ""),
        fallback=f"transcript-{task_id}.{format}",
    )
    output_path = Path(output_dir) / filename
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    return str(output_path)

# Usage example
audio_path = download_audio(client, task_id="550e8400-e29b-41d4-a716-446655440000")
print(f"Audio downloaded: {audio_path}")

srt_path = download_transcript(
    client,
    task_id="550e8400-e29b-41d4-a716-446655440000",
    format="srt",
)
print(f"Subtitles downloaded: {srt_path}")

# The CSV version can be passed directly to pandas for analysis
# import pandas as pd
# csv_path = download_transcript(client, task_id, format="csv")
# df = pd.read_csv(csv_path)

Note: Export requires processing_status = completed; if it is not ready, a 422 recording_transcript_not_ready is returned.


Audio Import

Upload an audio file and poll for the processing status.

import time

def import_audio(
    api: VasApiClient,
    file_path: str,
    transcription_languages: list = None,
    translation_languages: list = None,
    recognition_mode: str = "multi_speaker",
    summary_template: str = "meeting",
) -> dict:
    """Upload an audio file and wait for processing to complete"""

    transcription_languages = transcription_languages or ["zh-TW"]
    translation_languages = translation_languages or ["en-US"]

    # Step 1: Upload the audio file
    print(f"Uploading audio: {file_path}")
    result = api.upload_audio(
        file_path=file_path,
        transcription_languages=transcription_languages,
        translation_languages=translation_languages,
        recognition_mode=recognition_mode,
        summary_template=summary_template,
    )

    import_id = result["data"]["import_id"]
    print(f"Import ID: {import_id}")

    # Step 2: Poll the status
    while True:
        status_result = api.get_import_status(import_id)
        data = status_result["data"]

        status = data["status"]
        stage = data["stage"]
        progress = data["progress"]

        print(f"Status: {status} | Stage: {stage} | Progress: {progress}%")

        if status == "completed":
            task_id = data["task_id"]
            print(f"Processing complete! Task ID: {task_id}")
            return {"import_id": import_id, "task_id": task_id}

        if status == "failed":
            raise VasApiError(
                message=f"Processing failed: {data['error_message']}",
                status_code=500,
                error_code=data["error_code"],
            )

        time.sleep(5)  # Query once every 5 seconds

# Usage example
if __name__ == "__main__":
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

    # Check the budget first
    quota = api.check_quota(3600000)  # 1-hour audio file
    if quota["data"]["allowed"]:
        print(f"Sufficient budget, ${quota['data']['remaining_budget']} USD remaining")

        result = import_audio(
            api,
            file_path="meeting.mp3",
            transcription_languages=["zh-TW"],
            translation_languages=["en-US", "ja-JP"],
            recognition_mode="multi_speaker",
            summary_template="meeting",
        )
        print(f"You can use the Task ID to load history: {result['task_id']}")
    else:
        print("Insufficient budget")

Loading History (SSE)

Use sseclient-py to handle the SSE stream; authentication is done via the X-API-Key header.

import requests
import sseclient
import json

def load_history(task_id: str, api_key: str) -> dict:
    """Load the history"""
    url = f"https://vas-poc.vurbo.ai/api/v1/sse/history/transcribe/{task_id}"

    response = requests.get(
        url,
        headers={"X-API-Key": api_key},
        stream=True,
    )
    client = sseclient.SSEClient(response)

    metadata = None
    sentences = []
    summary = None

    for event in client.events():
        if event.event == "init_metadata":
            metadata = json.loads(event.data)
            print(f"Task: {metadata['title']}")

        elif event.event == "init_sentence":
            sentence = json.loads(event.data)
            sentences.append(sentence)

        elif event.event == "init_summary":
            summary = json.loads(event.data)["text"]

        elif event.event == "init_done":
            break

    return {
        "metadata": metadata,
        "sentences": sentences,
        "summary": summary,
    }

def retranslate(task_id: str, target_lang: str, api_key: str) -> list:
    """Retranslate the full text"""
    url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/{task_id}?targetLang={target_lang}"

    response = requests.get(
        url,
        headers={"X-API-Key": api_key},
        stream=True,
    )
    client = sseclient.SSEClient(response)

    results = []

    for event in client.events():
        if event.event == "translation":
            data = json.loads(event.data)
            results.append(data)
            print(f"Sentence {data['sid']}: {data['text']}")

        elif event.event == "done":
            break

    return results

def retranslate_summary(task_id: str, target_lang: str, api_key: str) -> str:
    """Retranslate the summary"""
    url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/summary/{task_id}?targetLang={target_lang}"

    response = requests.get(
        url,
        headers={"X-API-Key": api_key},
        stream=True,
    )
    client = sseclient.SSEClient(response)

    final_text = ""

    for event in client.events():
        if event.event == "summary_translation":
            data = json.loads(event.data)
            if data.get("is_final"):
                final_text = data["text"]

        elif event.event == "done":
            break

    return final_text

# Usage example
if __name__ == "__main__":
    api_key = "vas_YOUR_API_KEY_HERE_32_CHARACTERS"
    task_id = "550e8400-e29b-41d4-a716-446655440000"

    # Load history
    history = load_history(task_id, api_key)
    print(f"\n{len(history['sentences'])} sentences total")

    for s in history["sentences"]:
        print(f"  [{s['start_time']}] {s['origin']}")
        if s.get("translations"):
            for lang, text in s["translations"].items():
                print(f"    [{lang}] {text}")

    if history["summary"]:
        print(f"\nSummary: {history['summary'][:100]}...")

    # Retranslate into Japanese
    print("\nRetranslating into Japanese...")
    translations = retranslate(task_id, "ja-JP", api_key)
    print(f"Translation complete, {len(translations)} sentences total")

Broadcast Viewer

Viewers receive real-time subtitles and translations via SSE.

import requests
import sseclient
import json

class BroadcastViewer:
    """Broadcast viewer"""

    def __init__(self, token: str):
        self.token = token
        self.base_url = "https://vas-poc.vurbo.ai"

    def get_broadcast_info(self) -> dict:
        """Get the broadcast's public info (no authentication required)"""
        response = requests.get(
            f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}"
        )
        return response.json()

    def verify_password(self, password: str) -> dict:
        """Verify the password"""
        response = requests.post(
            f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}/verify",
            json={"password": password},
        )
        return response.json()

    def connect(
        self,
        lang: str = None,
        viewer_access_token: str = None,
        tts_voice: str = None,
        on_sentence=None,
        on_translation=None,
        on_tts_ready=None,
        on_announcement=None,
        on_standby=None,
        on_live=None,
        on_ended=None,
    ):
        """Connect via SSE to receive real-time subtitles"""
        params = {}
        if lang:
            params["lang"] = lang
        if viewer_access_token:
            params["viewer_access_token"] = viewer_access_token
        if tts_voice:
            params["tts_voice"] = tts_voice

        url = f"{self.base_url}/broadcast/{self.token}/text"
        response = requests.get(url, params=params, stream=True)
        client = sseclient.SSEClient(response)

        for event in client.events():
            data = json.loads(event.data) if event.data else {}

            if event.event == "sentence" and on_sentence:
                on_sentence(data)
            elif event.event == "translation" and on_translation:
                on_translation(data)
            elif event.event == "tts_ready" and on_tts_ready:
                on_tts_ready(data)
            elif event.event == "announcement" and on_announcement:
                on_announcement(data)
            elif event.event == "standby" and on_standby:
                on_standby(data)
            elif event.event == "live" and on_live:
                on_live()
            elif event.event == "ended":
                if on_ended:
                    on_ended()
                break

# Usage example
if __name__ == "__main__":
    viewer = BroadcastViewer("a3f9")

    # Get the broadcast info
    info = viewer.get_broadcast_info()
    print(f"Broadcast name: {info['data']['name']}")
    print(f"Password required: {info['data']['requires_password']}")
    print(f"Available languages: {info['data']['translation_languages']}")

    # If password verification is required
    viewer_access_token = None
    if info["data"]["requires_password"]:
        verify_result = viewer.verify_password("mySecret123")
        viewer_access_token = verify_result["data"]["viewer_access_token"]
        print("Password verified successfully")

    # Receive real-time subtitles
    def on_sentence(data):
        print(f"[Original] {data['text']}")

    def on_translation(data):
        print(f"[Translation] {data['text']}")

    def on_announcement(data):
        print(f"[Announcement] {data['message']}")

    def on_ended():
        print("Broadcast ended")

    print("Receiving subtitles...")
    viewer.connect(
        lang="en-US",
        viewer_access_token=viewer_access_token,
        on_sentence=on_sentence,
        on_translation=on_translation,
        on_announcement=on_announcement,
        on_ended=on_ended,
    )

Webhook Handling

Receive and verify VAS Webhook events.

import hmac
import hashlib
import json
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_here"

def verify_signature(timestamp, body, signature):
    """Verify the Webhook signature"""
    signature_payload = f"{timestamp}.{body}"
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(),
        signature_payload.encode(),
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(signature, expected)

@app.route("/webhooks/vas", methods=["POST"])
def handle_webhook():
    timestamp = request.headers.get("X-VAS-Timestamp", "")
    signature = request.headers.get("X-VAS-Signature", "")
    raw_body = request.get_data(as_text=True)

    if not verify_signature(timestamp, raw_body, signature):
        return jsonify({"error": "Invalid signature"}), 401

    payload = request.get_json()
    event = payload["event"]
    data = payload["data"]

    if event == "recording.completed":
        print(f"Recording completed: task_id={data['task_id']}")
    elif event == "import.completed":
        print(f"Import completed: import_id={data['import_id']}")
    elif event in ("recording.failed", "import.failed"):
        print(f"Processing failed: {data.get('error') or data.get('error_message')}")

    return jsonify({"received": True}), 200

For the complete Webhook guide, see Webhook Callback Guide.


Error Handling

class VasApiError(Exception):
    """VAS API error"""

    def __init__(self, message: str, status_code: int = None, error_code: str = None):
        super().__init__(message)
        self.status_code = status_code
        self.error_code = error_code

    def __str__(self):
        parts = [self.args[0]]
        if self.error_code:
            parts.insert(0, f"[{self.error_code}]")
        if self.status_code:
            parts.insert(0, f"HTTP {self.status_code}")
        return " ".join(parts)

def handle_api_error(error: VasApiError):
    """Unified error handling"""
    if error.error_code in ("auth_missing_api_key", "auth_invalid_api_key", "auth_key_expired"):
        print(f"Authentication failed: {error}")
        print("Please check that your API Key is correct")

    elif error.error_code == "auth_budget_exceeded":
        print(f"Budget exceeded: {error}")
        print("Please wait for next month's budget reset or adjust the budget")

    elif error.error_code in ("recording_not_found", "import_not_found"):
        print(f"Resource not found: {error}")

    elif error.error_code == "broadcast_session_not_found":
        print(f"Broadcast not found: {error}")

    elif error.error_code == "broadcast_cannot_revoke":
        print(f"Only broadcasts in the pending state can be revoked: {error}")

    elif error.error_code == "validation_failed":
        print(f"Parameter validation failed: {error}")

    elif error.error_code == "import_file_too_large":
        print(f"File too large: {error}")
        print("Please compress or split the file (500MB limit)")

    elif error.error_code == "import_invalid_format":
        print(f"Unsupported format: {error}")
        print("Supported formats: mp3, wav, m4a")

    elif error.error_code == "ticket_invalid":
        print(f"Ticket invalid: {error}")
        print("Please obtain a new Ticket")

    else:
        print(f"API error: {error}")

# Usage example
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

try:
    tasks = api.get_tasks()
except VasApiError as e:
    handle_api_error(e)
except requests.exceptions.ConnectionError:
    print("Network connection failed; please check your network settings")
except requests.exceptions.Timeout:
    print("Request timed out; please try again later")

Async Version (asyncio + aiohttp)

A complete asynchronous API Client, suitable for high-performance scenarios or those that require concurrent processing.

import asyncio
import aiohttp
import json
from typing import Optional

class AsyncVasApiClient:
    """Asynchronous VAS API Client"""

    def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={"X-API-Key": self.api_key},
            )
        return self._session

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()

    async def _request(self, method: str, path: str, **kwargs) -> dict:
        session = await self._get_session()
        url = f"{self.base_url}{path}"

        async with session.request(method, url, **kwargs) as response:
            if not response.ok:
                error = await response.json()
                raise VasApiError(
                    message=error.get("message", f"HTTP {response.status}"),
                    status_code=response.status,
                    error_code=error.get("error_code"),
                )
            return await response.json()

    # === Authentication ===

    async def get_ticket(self) -> dict:
        return await self._request("POST", "/auth/ticket")

    # === Tasks ===

    async def get_tasks(self) -> dict:
        return await self._request("GET", "/tasks")

    async def delete_task(self, task_id: str) -> dict:
        return await self._request("DELETE", f"/tasks/{task_id}")

    async def pin_task(self, task_id: str, is_pinned: bool) -> dict:
        return await self._request("PUT", f"/tasks/{task_id}/pin", json={"is_pinned": is_pinned})

    async def mark_as_read(self, task_id: str) -> dict:
        return await self._request("PUT", f"/tasks/{task_id}/read")

    async def rename_task(self, task_id: str, name: str) -> dict:
        return await self._request("PATCH", f"/tasks/{task_id}/name", json={"name": name})

    # === Imports ===

    async def check_quota(self, duration_ms: int) -> dict:
        return await self._request("POST", "/imports/check-quota", json={"duration_ms": duration_ms})

    async def upload_audio(self, file_path: str, options: dict) -> dict:
        session = await self._get_session()
        url = f"{self.base_url}/imports"

        form = aiohttp.FormData()
        form.add_field("file", open(file_path, "rb"), filename=file_path.split("/")[-1])
        form.add_field("transcription_languages", json.dumps(options.get("transcription_languages", ["zh-TW"])))
        form.add_field("recognition_mode", options.get("recognition_mode", "single"))

        if "translation_languages" in options:
            form.add_field("translation_languages", json.dumps(options["translation_languages"]))
        if "summary_template" in options:
            form.add_field("summary_template", options["summary_template"])

        async with session.post(url, data=form) as response:
            if not response.ok:
                error = await response.json()
                raise VasApiError(
                    message=error.get("message", f"HTTP {response.status}"),
                    status_code=response.status,
                    error_code=error.get("error_code"),
                )
            return await response.json()

    async def get_import_status(self, import_id: str) -> dict:
        return await self._request("GET", f"/imports/{import_id}")

    # === Broadcasts ===

    async def create_broadcast(self, options: dict) -> dict:
        return await self._request("POST", "/broadcasts", json=options)

    async def get_broadcast(self, broadcast_id: str) -> dict:
        return await self._request("GET", f"/broadcasts/{broadcast_id}")

    async def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
        return await self._request("PATCH", f"/broadcasts/{broadcast_id}", json=options)

    async def revoke_broadcast(self, broadcast_id: str) -> dict:
        return await self._request("DELETE", f"/broadcasts/{broadcast_id}")

    # === TTS ===

    async def get_tts_voices(self, language: str) -> dict:
        return await self._request("GET", "/tts/voices", params={"language": language})

    # === Summary Templates ===

    async def get_summary_templates(self) -> dict:
        return await self._request("GET", "/summary-templates")

    # === SSE History (asynchronous) ===

    async def load_history(self, task_id: str) -> dict:
        """Load the history asynchronously"""
        session = await self._get_session()
        url = f"{self.base_url}/sse/history/transcribe/{task_id}"

        metadata = None
        sentences = []
        summary = None

        async with session.get(url) as response:
            buffer = ""
            current_event = ""

            async for chunk in response.content.iter_any():
                buffer += chunk.decode("utf-8")
                lines = buffer.split("\n")
                buffer = lines.pop()

                for line in lines:
                    if line.startswith("event: "):
                        current_event = line[7:]
                    elif line.startswith("data: "):
                        data = json.loads(line[6:])

                        if current_event == "init_metadata":
                            metadata = data
                        elif current_event == "init_sentence":
                            sentences.append(data)
                        elif current_event == "init_summary":
                            summary = data.get("text")
                        elif current_event == "init_done":
                            return {
                                "metadata": metadata,
                                "sentences": sentences,
                                "summary": summary,
                            }

        return {"metadata": metadata, "sentences": sentences, "summary": summary}

    async def retranslate(self, task_id: str, target_lang: str) -> list:
        """Retranslate asynchronously"""
        session = await self._get_session()
        url = f"{self.base_url}/sse/retranslate/{task_id}"

        results = []

        async with session.get(url, params={"targetLang": target_lang}) as response:
            buffer = ""
            current_event = ""

            async for chunk in response.content.iter_any():
                buffer += chunk.decode("utf-8")
                lines = buffer.split("\n")
                buffer = lines.pop()

                for line in lines:
                    if line.startswith("event: "):
                        current_event = line[7:]
                    elif line.startswith("data: "):
                        data = json.loads(line[6:])
                        if current_event == "translation":
                            results.append(data)
                        elif current_event == "done":
                            return results

        return results

# Usage example: concurrent operations
async def main():
    api = AsyncVasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

    try:
        # Fetch multiple resources concurrently
        tasks_result, templates_result, tts_languages = await asyncio.gather(
            api.get_tasks(),
            api.get_summary_templates(),
            api.get_tts_languages(),
        )

        print(f"Task count: {len(tasks_result['tasks'])}")
        print(f"Summary templates: {[t['name'] for t in templates_result['data']]}")
        print(f"TTS language count: {tts_languages['data']['total']}")

        # Load history
        if tasks_result["tasks"]:
            task_id = tasks_result["tasks"][0]["task_id"]
            history = await api.load_history(task_id)
            print(f"\nHistory: {len(history['sentences'])} sentences")

    except VasApiError as e:
        handle_api_error(e)
    finally:
        await api.close()

asyncio.run(main())

Async Upload and Polling

async def async_import_audio(api: AsyncVasApiClient, file_path: str, options: dict) -> dict:
    """Upload an audio file asynchronously and wait for processing to complete"""

    # Upload
    result = await api.upload_audio(file_path, options)
    import_id = result["data"]["import_id"]
    print(f"Import ID: {import_id}")

    # Poll
    while True:
        status_result = await api.get_import_status(import_id)
        data = status_result["data"]

        print(f"Status: {data['status']} | Stage: {data['stage']} | Progress: {data['progress']}%")

        if data["status"] == "completed":
            return {"import_id": import_id, "task_id": data["task_id"]}

        if data["status"] == "failed":
            raise VasApiError(
                message=f"Processing failed: {data['error_message']}",
                status_code=500,
                error_code=data["error_code"],
            )

        await asyncio.sleep(5)

Version: V1.5.7 Last Updated: 2026-05-20

Copyright © 2026