範例

Python

目錄

  1. 安裝依賴
  2. API Client 設定
  3. Ticket 取得與 WebSocket 連線
  4. 即時語音翻譯
  5. 任務匯出(音檔/逐字稿下載)
  6. 音檔匯入
  7. 歷史紀錄載入(SSE)
  8. 廣播觀眾端
  9. 錯誤處理
  10. Webhook 處理
  11. 非同步版本(asyncio + aiohttp)

安裝依賴

pip install requests websockets sseclient-py aiohttp

API Client 設定

使用 requests 封裝,統一透過 X-API-Key header 認證。

import requests
from typing import Optional

class VasApiClient:
    """VAS API Client - 使用 X-API-Key header 認證"""

    def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": api_key,
        })

    def _url(self, path: str) -> str:
        return f"{self.base_url}{path}"

    def _handle_response(self, response: requests.Response) -> dict:
        if not response.ok:
            error = response.json() if response.content else {}
            raise VasApiError(
                message=error.get("message", f"HTTP {response.status_code}"),
                status_code=response.status_code,
                error_code=error.get("error_code"),
            )
        return response.json()

    # === 認證 API ===

    def get_ticket(self) -> dict:
        """取得 WebSocket 連線用的一次性 Ticket"""
        response = self.session.post(self._url("/auth/ticket"))
        return self._handle_response(response)

    # === Tasks API ===

    def get_tasks(self, status: str = "completed") -> dict:
        """取得任務列表"""
        response = self.session.get(
            self._url("/tasks"),
            params={"status": status},
        )
        return self._handle_response(response)

    def delete_task(self, task_id: str) -> dict:
        """刪除任務"""
        response = self.session.delete(self._url(f"/tasks/{task_id}"))
        return self._handle_response(response)

    def pin_task(self, task_id: str, is_pinned: bool) -> dict:
        """更新釘選狀態"""
        response = self.session.put(
            self._url(f"/tasks/{task_id}/pin"),
            json={"is_pinned": is_pinned},
        )
        return self._handle_response(response)

    def mark_as_read(self, task_id: str) -> dict:
        """標記已讀"""
        response = self.session.put(self._url(f"/tasks/{task_id}/read"))
        return self._handle_response(response)

    def rename_task(self, task_id: str, name: str) -> dict:
        """更新任務名稱"""
        response = self.session.patch(
            self._url(f"/tasks/{task_id}/name"),
            json={"name": name},
        )
        return self._handle_response(response)

    # === Imports API ===

    def check_quota(self, duration_ms: int) -> dict:
        """檢查上傳預算"""
        response = self.session.post(
            self._url("/imports/check-quota"),
            json={"duration_ms": duration_ms},
        )
        return self._handle_response(response)

    def upload_audio(
        self,
        file_path: str,
        transcription_languages: list,
        recognition_mode: str = "single",
        translation_languages: Optional[list] = None,
        summary_template: Optional[str] = None,
        terminology: Optional[dict] = None,
        callback_url: Optional[str] = None,
    ) -> dict:
        """上傳音檔"""
        import json

        with open(file_path, "rb") as f:
            files = {"file": f}
            data = {
                "transcription_languages": json.dumps(transcription_languages),
                "recognition_mode": recognition_mode,
            }

            if translation_languages:
                data["translation_languages"] = json.dumps(translation_languages)
            if summary_template:
                data["summary_template"] = summary_template
            if terminology:
                data["terminology"] = json.dumps(terminology)
            if callback_url:
                data["callback_url"] = callback_url

            response = self.session.post(
                self._url("/imports"),
                files=files,
                data=data,
            )

        return self._handle_response(response)

    def get_import_status(self, import_id: str) -> dict:
        """查詢匯入狀態"""
        response = self.session.get(self._url(f"/imports/{import_id}"))
        return self._handle_response(response)

    def get_imports(self, per_page: int = 20) -> dict:
        """取得匯入列表"""
        response = self.session.get(
            self._url("/imports"),
            params={"per_page": per_page},
        )
        return self._handle_response(response)

    # === Broadcasts API ===

    def create_broadcast(self, options: dict) -> dict:
        """建立廣播"""
        response = self.session.post(self._url("/broadcasts"), json=options)
        return self._handle_response(response)

    def get_broadcast(self, broadcast_id: str) -> dict:
        """查詢廣播狀態"""
        response = self.session.get(self._url(f"/broadcasts/{broadcast_id}"))
        return self._handle_response(response)

    def get_broadcasts(self, page: int = 1, per_page: int = 20) -> dict:
        """取得廣播列表"""
        response = self.session.get(
            self._url("/broadcasts"),
            params={"page": page, "per_page": per_page},
        )
        return self._handle_response(response)

    def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
        """更新廣播設定"""
        response = self.session.patch(
            self._url(f"/broadcasts/{broadcast_id}"),
            json=options,
        )
        return self._handle_response(response)

    def revoke_broadcast(self, broadcast_id: str) -> dict:
        """撤銷廣播"""
        response = self.session.delete(self._url(f"/broadcasts/{broadcast_id}"))
        return self._handle_response(response)

    # === TTS API ===

    def get_tts_voices(self, language: str) -> dict:
        """取得 TTS 語音列表"""
        response = self.session.get(
            self._url("/tts/voices"),
            params={"language": language},
        )
        return self._handle_response(response)

    def download_tts_sample(self, voice_name: str, output_path: str):
        """下載 TTS 語音示範"""
        response = self.session.get(self._url(f"/tts/voices/{voice_name}/sample"))
        if response.ok:
            with open(output_path, "wb") as f:
                f.write(response.content)
        else:
            raise VasApiError(f"下載失敗: HTTP {response.status_code}", response.status_code)

    # === Speakers API ===

    def rename_speaker(self, recording_id: str, speaker_id: str, new_label: str) -> dict:
        """全域重命名說話者"""
        response = self.session.patch(
            self._url(f"/recordings/{recording_id}/speakers/rename"),
            json={"speaker_id": speaker_id, "new_label": new_label},
        )
        return self._handle_response(response)

    def reassign_speaker(self, recording_id: str, sid: int, target_speaker_id: str) -> dict:
        """重新指派單句語者"""
        response = self.session.patch(
            self._url(f"/recordings/{recording_id}/speakers/reassign"),
            json={"sid": sid, "target_speaker_id": target_speaker_id},
        )
        return self._handle_response(response)

    # === Summary Templates API ===

    def get_summary_templates(self) -> dict:
        """取得摘要模板列表"""
        response = self.session.get(self._url("/summary-templates"))
        return self._handle_response(response)

    # === Audio API ===

    def download_audio(self, task_id: str, output_path: str):
        """下載任務錄音音訊"""
        response = self.session.get(self._url(f"/sse/audio/{task_id}"))
        if response.ok:
            with open(output_path, "wb") as f:
                f.write(response.content)
        else:
            raise VasApiError(f"下載失敗: HTTP {response.status_code}", response.status_code)

# 使用範例
if __name__ == "__main__":
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

    # 取得任務列表
    tasks = api.get_tasks()
    print(f"共有 {len(tasks['tasks'])} 個任務")

    for task in tasks["tasks"]:
        print(f"  [{task['task_id'][:8]}] {task['title']} ({task['duration_formatted']})")

Ticket 取得與 WebSocket 連線

WebSocket 使用 Ticket 機制認證,Ticket 透過 Sec-WebSocket-Protocol 傳遞。

import asyncio
import websockets
import json

async def connect_websocket(api: VasApiClient):
    """取得 Ticket 並建立 WebSocket 連線"""
    # 步驟 1:取得一次性 Ticket
    result = api.get_ticket()
    ticket = result["ticket"]
    print(f"取得 Ticket,有效期 {result['expires_in']} 秒")

    # 步驟 2:使用 Ticket 連線 WebSocket
    ws = await websockets.connect(
        "wss://vas-poc.vurbo.ai/ws",
        subprotocols=[f"ticket.{ticket}"],
    )
    print("WebSocket 連線成功")

    return ws

# 使用範例
async def main():
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
    ws = await connect_websocket(api)

    # 發送 ping
    await ws.send(json.dumps({
        "type": "health",
        "data": {"action": "ping"},
    }))

    # 接收 pong
    response = await ws.recv()
    msg = json.loads(response)
    print(f"收到: {msg}")

    await ws.close()

asyncio.run(main())

即時語音翻譯

完整的 VurboClient 類別,使用 websockets 庫。

import asyncio
import websockets
import json
import base64
from typing import Callable, Optional, List

class VurboClient:
    """VAS WebSocket 客戶端 - 使用 Ticket 機制認證"""

    def __init__(self, api: VasApiClient):
        self.api = api
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.on_result: Optional[Callable] = None
        self.on_error: Optional[Callable] = None
        self.on_task_complete: Optional[Callable] = None
        self.on_session_started: Optional[Callable] = None
        self.on_tts_ready: Optional[Callable] = None
        self._heartbeat_task: Optional[asyncio.Task] = None

    async def connect(self):
        """使用 Ticket 機制連線 WebSocket"""
        result = self.api.get_ticket()
        ticket = result["ticket"]

        self.ws = await websockets.connect(
            "wss://vas-poc.vurbo.ai/ws",
            subprotocols=[f"ticket.{ticket}"],
        )
        print("Vurbo.ai 連線成功")

        # 啟動心跳
        self._heartbeat_task = asyncio.create_task(self._heartbeat())

    async def _heartbeat(self):
        """每 30 秒發送一次 ping"""
        while True:
            try:
                await asyncio.sleep(30)
                if self.ws and self.ws.open:
                    await self.ws.send(json.dumps({
                        "type": "health",
                        "data": {"action": "ping"},
                    }))
            except Exception:
                break

    async def start_translation(
        self,
        source_lang: List[str],
        target_lang: Optional[List[str]] = None,
        recording_type: str = "transcribe",
        recognition_mode: str = "single",
        summary_template: str = "meeting",
        audio_format: str = "pcm",
        tts_enabled: bool = False,
        tts_language: Optional[str] = None,
        tts_voice: Optional[str] = None,
    ):
        """開始語音翻譯"""
        data = {
            "action": "start",
            "transcription_languages": source_lang,
            "translation_languages": target_lang or [],
            "type": recording_type,
            "recognition_mode": recognition_mode,
            "audio_format": audio_format,
        }

        if recording_type == "transcribe":
            data["summary_template"] = summary_template

        if tts_enabled:
            data["tts_enabled"] = True
            data["tts_language"] = tts_language
            data["tts_voice"] = tts_voice

        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": data,
        }))

    async def set_config(
        self,
        terminology: Optional[dict] = None,
        fuzzy_correction: Optional[dict] = None,
        translation_dict: Optional[list] = None,
    ):
        """設定術語庫、模糊詞校正、翻譯字典"""
        data = {"action": "config"}
        if terminology:
            data["terminology"] = terminology
        if fuzzy_correction:
            data["fuzzy_correction"] = fuzzy_correction
        if translation_dict:
            data["translation_dict"] = translation_dict

        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": data,
        }))

    async def send_audio(self, audio_data: bytes):
        """傳送音訊資料(自動 Base64 編碼)"""
        base64_audio = base64.b64encode(audio_data).decode("utf-8")
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "audio", "payload": base64_audio},
        }))

    async def pause(self):
        """暫停翻譯"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "pause"},
        }))

    async def resume(self):
        """恢復翻譯"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "resume"},
        }))

    async def stop(self):
        """停止翻譯"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "stop"},
        }))

    async def set_name(self, name: str):
        """設定錄音名稱"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "set_name", "name": name},
        }))

    async def switch_language(self, target_languages: List[str]):
        """切換翻譯語言"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "switch_language",
                "translation_languages": target_languages,
            },
        }))

    async def rename_speaker(self, speaker_id: str, new_label: str):
        """全域重命名說話者"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "rename_speaker",
                "speaker_id": speaker_id,
                "new_label": new_label,
            },
        }))

    async def reassign_speaker(self, sid: int, target_speaker_id: str):
        """重新指派單句語者"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "reassign_speaker",
                "sid": sid,
                "target_speaker_id": target_speaker_id,
            },
        }))

    async def merge_speakers(self, source_speaker_id: str, target_speaker_id: str):
        """合併語者"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {
                "action": "merge_speakers",
                "source_speaker_id": source_speaker_id,
                "target_speaker_id": target_speaker_id,
            },
        }))

    async def tts_play(self, sid: int, length: int = 1):
        """播放 TTS"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "tts_play", "sid": sid, "length": length},
        }))

    async def tts_stop(self):
        """停止 TTS"""
        await self.ws.send(json.dumps({
            "type": "voice-translation",
            "data": {"action": "tts_stop"},
        }))

    async def receive_messages(self):
        """持續接收並處理訊息"""
        async for message in self.ws:
            msg = json.loads(message)
            await self._handle_message(msg)

    async def _handle_message(self, msg: dict):
        """處理收到的訊息"""
        if msg["type"] == "error":
            print(f"錯誤: {msg['data']['message']}")
            if self.on_error:
                self.on_error(msg["data"])
            return

        if msg["type"] == "voice-translation":
            data = msg["data"]
            action = data.get("action")

            if action == "session_started":
                if self.on_session_started:
                    self.on_session_started(data)

            elif action == "result":
                if self.on_result:
                    self.on_result(
                        origin=data.get("origin"),
                        translations=data.get("translations"),
                    )

            elif action == "task_complete":
                if self.on_task_complete:
                    self.on_task_complete(data.get("task_id"))

            elif action == "tts_ready":
                if self.on_tts_ready:
                    self.on_tts_ready(data)

    async def disconnect(self):
        """中斷連線"""
        if self._heartbeat_task:
            self._heartbeat_task.cancel()
        if self.ws:
            await self.ws.close()

使用範例:即時翻譯

import asyncio

async def main():
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
    client = VurboClient(api)

    # 設定回調
    def on_session_started(data):
        print(f"Session 已啟動: {data['session_id']}")
        print(f"錄音 ID: {data['recording_id']}")

    def on_result(origin, translations):
        if origin and origin.get("is_final"):
            print(f"[{origin['start_time']}] {origin['text']}")
        if translations:
            for lang, result in translations.items():
                if result.get("is_final"):
                    print(f"  翻譯 ({lang}): {result['text']}")

    def on_task_complete(task_id):
        print(f"任務完成: {task_id}")

    client.on_session_started = on_session_started
    client.on_result = on_result
    client.on_task_complete = on_task_complete

    # 連線
    await client.connect()

    # (可選)設定術語庫
    await client.set_config(
        terminology={
            "zh-TW": [
                {"term": "語者分離", "boost": 1.5},
                {"term": "即時轉錄", "boost": 1.5},
            ],
        },
    )

    # 開始翻譯
    await client.start_translation(
        source_lang=["zh-TW"],
        target_lang=["en-US"],
        recording_type="transcribe",
        recognition_mode="multi_speaker",
        summary_template="meeting",
    )

    # 接收訊息
    await client.receive_messages()

asyncio.run(main())

使用範例:麥克風錄音

import asyncio
import pyaudio

async def record_and_translate():
    """使用麥克風錄音並即時翻譯"""
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
    client = VurboClient(api)

    sentences = []

    def on_result(origin, translations):
        if origin and origin.get("is_final"):
            sentence = {
                "sid": origin["sid"],
                "text": origin["text"],
                "time": origin["start_time"],
            }
            if translations:
                first_lang = list(translations.keys())[0]
                sentence["translation"] = translations[first_lang]["text"]
            sentences.append(sentence)
            print(f"[{sentence['time']}] {sentence['text']}")
            if sentence.get("translation"):
                print(f"  翻譯: {sentence['translation']}")

    def on_task_complete(task_id):
        print(f"\n任務完成: {task_id}")
        print(f"共 {len(sentences)} 句")

    client.on_result = on_result
    client.on_task_complete = on_task_complete

    # 連線並開始
    await client.connect()
    await client.start_translation(
        source_lang=["zh-TW"],
        target_lang=["en-US"],
    )

    # 開始麥克風錄音
    audio = pyaudio.PyAudio()
    stream = audio.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=16000,
        input=True,
        frames_per_buffer=4096,
    )

    print("開始錄音...(按 Ctrl+C 停止)")

    async def send_audio_loop():
        while True:
            data = stream.read(4096, exception_on_overflow=False)
            await client.send_audio(data)
            await asyncio.sleep(0)

    try:
        await asyncio.gather(
            send_audio_loop(),
            client.receive_messages(),
        )
    except KeyboardInterrupt:
        print("\n停止錄音...")
        await client.stop()

        # 等待 task_complete 事件
        try:
            await asyncio.wait_for(client.receive_messages(), timeout=30)
        except asyncio.TimeoutError:
            pass
    finally:
        stream.stop_stream()
        stream.close()
        audio.terminate()
        await client.disconnect()

asyncio.run(record_and_translate())

任務匯出(音檔/逐字稿下載)

下載已完成任務的音檔或逐字稿。逐字稿支援 txtsrtsbvvttcsv 五種格式,內容包含原文與所有翻譯語言。

import re
from urllib.parse import unquote
from pathlib import Path

def _parse_filename(content_disposition: str, fallback: str) -> str:
    """從 Content-Disposition 取得 RFC 5987 編碼的 UTF-8 檔名。"""
    match = re.search(r"filename\*=UTF-8''([^;]+)", content_disposition or "")
    return unquote(match.group(1)) if match else fallback

def download_audio(api: VasApiClient, task_id: str, output_dir: str = ".") -> str:
    """下載任務音檔。回傳儲存的檔案路徑。"""
    response = api.session.get(
        f"{api.base_url}/api/v1/tasks/{task_id}/audio/export",
        stream=True,
        timeout=120,
    )
    response.raise_for_status()

    filename = _parse_filename(
        response.headers.get("Content-Disposition", ""),
        fallback=f"audio-{task_id}.m4a",
    )
    output_path = Path(output_dir) / filename
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    return str(output_path)

def download_transcript(
    api: VasApiClient,
    task_id: str,
    format: str = "srt",
    output_dir: str = ".",
) -> str:
    """下載逐字稿。format 支援 txt / srt / sbv / vtt / csv。"""
    if format not in {"txt", "srt", "sbv", "vtt", "csv"}:
        raise ValueError(f"Unsupported format: {format}")

    response = api.session.get(
        f"{api.base_url}/api/v1/tasks/{task_id}/transcript/export",
        params={"format": format},
        stream=True,
        timeout=60,
    )
    response.raise_for_status()

    filename = _parse_filename(
        response.headers.get("Content-Disposition", ""),
        fallback=f"transcript-{task_id}.{format}",
    )
    output_path = Path(output_dir) / filename
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(output_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    return str(output_path)

# 使用範例
audio_path = download_audio(client, task_id="550e8400-e29b-41d4-a716-446655440000")
print(f"音檔已下載:{audio_path}")

srt_path = download_transcript(
    client,
    task_id="550e8400-e29b-41d4-a716-446655440000",
    format="srt",
)
print(f"字幕已下載:{srt_path}")

# CSV 版本可直接交給 pandas 分析
# import pandas as pd
# csv_path = download_transcript(client, task_id, format="csv")
# df = pd.read_csv(csv_path)

注意:匯出需 processing_status = completed;未就緒會回 422 recording_transcript_not_ready


音檔匯入

上傳音檔並輪詢處理狀態。

import time

def import_audio(
    api: VasApiClient,
    file_path: str,
    transcription_languages: list = None,
    translation_languages: list = None,
    recognition_mode: str = "multi_speaker",
    summary_template: str = "meeting",
) -> dict:
    """上傳音檔並等待處理完成"""

    transcription_languages = transcription_languages or ["zh-TW"]
    translation_languages = translation_languages or ["en-US"]

    # 步驟 1:上傳音檔
    print(f"上傳音檔: {file_path}")
    result = api.upload_audio(
        file_path=file_path,
        transcription_languages=transcription_languages,
        translation_languages=translation_languages,
        recognition_mode=recognition_mode,
        summary_template=summary_template,
    )

    import_id = result["data"]["import_id"]
    print(f"匯入 ID: {import_id}")

    # 步驟 2:輪詢狀態
    while True:
        status_result = api.get_import_status(import_id)
        data = status_result["data"]

        status = data["status"]
        stage = data["stage"]
        progress = data["progress"]

        print(f"狀態: {status} | 階段: {stage} | 進度: {progress}%")

        if status == "completed":
            task_id = data["task_id"]
            print(f"處理完成!Task ID: {task_id}")
            return {"import_id": import_id, "task_id": task_id}

        if status == "failed":
            raise VasApiError(
                message=f"處理失敗: {data['error_message']}",
                status_code=500,
                error_code=data["error_code"],
            )

        time.sleep(5)  # 每 5 秒查詢一次

# 使用範例
if __name__ == "__main__":
    api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

    # 先檢查預算
    quota = api.check_quota(3600000)  # 1 小時音檔
    if quota["data"]["allowed"]:
        print(f"預算充足,剩餘 ${quota['data']['remaining_budget']} USD")

        result = import_audio(
            api,
            file_path="meeting.mp3",
            transcription_languages=["zh-TW"],
            translation_languages=["en-US", "ja-JP"],
            recognition_mode="multi_speaker",
            summary_template="meeting",
        )
        print(f"可使用 Task ID 載入歷史紀錄: {result['task_id']}")
    else:
        print("預算不足")

歷史紀錄載入(SSE)

使用 sseclient-py 處理 SSE 串流,認證透過 X-API-Key header。

import requests
import sseclient
import json

def load_history(task_id: str, api_key: str) -> dict:
    """載入歷史紀錄"""
    url = f"https://vas-poc.vurbo.ai/api/v1/sse/history/transcribe/{task_id}"

    response = requests.get(
        url,
        headers={"X-API-Key": api_key},
        stream=True,
    )
    client = sseclient.SSEClient(response)

    metadata = None
    sentences = []
    summary = None

    for event in client.events():
        if event.event == "init_metadata":
            metadata = json.loads(event.data)
            print(f"任務: {metadata['title']}")

        elif event.event == "init_sentence":
            sentence = json.loads(event.data)
            sentences.append(sentence)

        elif event.event == "init_summary":
            summary = json.loads(event.data)["text"]

        elif event.event == "init_done":
            break

    return {
        "metadata": metadata,
        "sentences": sentences,
        "summary": summary,
    }

def retranslate(task_id: str, target_lang: str, api_key: str) -> list:
    """重新翻譯全文"""
    url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/{task_id}?targetLang={target_lang}"

    response = requests.get(
        url,
        headers={"X-API-Key": api_key},
        stream=True,
    )
    client = sseclient.SSEClient(response)

    results = []

    for event in client.events():
        if event.event == "translation":
            data = json.loads(event.data)
            results.append(data)
            print(f"句子 {data['sid']}: {data['text']}")

        elif event.event == "done":
            break

    return results

def retranslate_summary(task_id: str, target_lang: str, api_key: str) -> str:
    """重新翻譯摘要"""
    url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/summary/{task_id}?targetLang={target_lang}"

    response = requests.get(
        url,
        headers={"X-API-Key": api_key},
        stream=True,
    )
    client = sseclient.SSEClient(response)

    final_text = ""

    for event in client.events():
        if event.event == "summary_translation":
            data = json.loads(event.data)
            if data.get("is_final"):
                final_text = data["text"]

        elif event.event == "done":
            break

    return final_text

# 使用範例
if __name__ == "__main__":
    api_key = "vas_YOUR_API_KEY_HERE_32_CHARACTERS"
    task_id = "550e8400-e29b-41d4-a716-446655440000"

    # 載入歷史
    history = load_history(task_id, api_key)
    print(f"\n共有 {len(history['sentences'])} 句")

    for s in history["sentences"]:
        print(f"  [{s['start_time']}] {s['origin']}")
        if s.get("translations"):
            for lang, text in s["translations"].items():
                print(f"    [{lang}] {text}")

    if history["summary"]:
        print(f"\n摘要: {history['summary'][:100]}...")

    # 重新翻譯為日文
    print("\n重新翻譯為日文...")
    translations = retranslate(task_id, "ja-JP", api_key)
    print(f"翻譯完成,共 {len(translations)} 句")

廣播觀眾端

觀眾透過 SSE 接收即時字幕和翻譯。

import requests
import sseclient
import json

class BroadcastViewer:
    """廣播觀眾端"""

    def __init__(self, token: str):
        self.token = token
        self.base_url = "https://vas-poc.vurbo.ai"

    def get_broadcast_info(self) -> dict:
        """取得廣播公開資訊(不需認證)"""
        response = requests.get(
            f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}"
        )
        return response.json()

    def verify_password(self, password: str) -> dict:
        """密碼驗證"""
        response = requests.post(
            f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}/verify",
            json={"password": password},
        )
        return response.json()

    def connect(
        self,
        lang: str = None,
        viewer_access_token: str = None,
        tts_voice: str = None,
        on_sentence=None,
        on_translation=None,
        on_tts_ready=None,
        on_announcement=None,
        on_standby=None,
        on_live=None,
        on_ended=None,
    ):
        """連線 SSE 接收即時字幕"""
        params = {}
        if lang:
            params["lang"] = lang
        if viewer_access_token:
            params["viewer_access_token"] = viewer_access_token
        if tts_voice:
            params["tts_voice"] = tts_voice

        url = f"{self.base_url}/broadcast/{self.token}/text"
        response = requests.get(url, params=params, stream=True)
        client = sseclient.SSEClient(response)

        for event in client.events():
            data = json.loads(event.data) if event.data else {}

            if event.event == "sentence" and on_sentence:
                on_sentence(data)
            elif event.event == "translation" and on_translation:
                on_translation(data)
            elif event.event == "tts_ready" and on_tts_ready:
                on_tts_ready(data)
            elif event.event == "announcement" and on_announcement:
                on_announcement(data)
            elif event.event == "standby" and on_standby:
                on_standby(data)
            elif event.event == "live" and on_live:
                on_live()
            elif event.event == "ended":
                if on_ended:
                    on_ended()
                break

# 使用範例
if __name__ == "__main__":
    viewer = BroadcastViewer("a3f9")

    # 取得廣播資訊
    info = viewer.get_broadcast_info()
    print(f"廣播名稱: {info['data']['name']}")
    print(f"需要密碼: {info['data']['requires_password']}")
    print(f"可用語言: {info['data']['translation_languages']}")

    # 若需要密碼驗證
    viewer_access_token = None
    if info["data"]["requires_password"]:
        verify_result = viewer.verify_password("mySecret123")
        viewer_access_token = verify_result["data"]["viewer_access_token"]
        print("密碼驗證成功")

    # 接收即時字幕
    def on_sentence(data):
        print(f"[原文] {data['text']}")

    def on_translation(data):
        print(f"[翻譯] {data['text']}")

    def on_announcement(data):
        print(f"[公告] {data['message']}")

    def on_ended():
        print("廣播已結束")

    print("開始接收字幕...")
    viewer.connect(
        lang="en-US",
        viewer_access_token=viewer_access_token,
        on_sentence=on_sentence,
        on_translation=on_translation,
        on_announcement=on_announcement,
        on_ended=on_ended,
    )

Webhook 處理

接收並驗證 VAS Webhook 事件。

import hmac
import hashlib
import json
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_here"

def verify_signature(timestamp, body, signature):
    """驗證 Webhook 簽名"""
    signature_payload = f"{timestamp}.{body}"
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(),
        signature_payload.encode(),
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(signature, expected)

@app.route("/webhooks/vas", methods=["POST"])
def handle_webhook():
    timestamp = request.headers.get("X-VAS-Timestamp", "")
    signature = request.headers.get("X-VAS-Signature", "")
    raw_body = request.get_data(as_text=True)

    if not verify_signature(timestamp, raw_body, signature):
        return jsonify({"error": "Invalid signature"}), 401

    payload = request.get_json()
    event = payload["event"]
    data = payload["data"]

    if event == "recording.completed":
        print(f"錄音完成: task_id={data['task_id']}")
    elif event == "import.completed":
        print(f"匯入完成: import_id={data['import_id']}")
    elif event in ("recording.failed", "import.failed"):
        print(f"處理失敗: {data.get('error') or data.get('error_message')}")

    return jsonify({"received": True}), 200

完整的 Webhook 指南請參考 Webhook 回呼指南


錯誤處理

class VasApiError(Exception):
    """VAS API 錯誤"""

    def __init__(self, message: str, status_code: int = None, error_code: str = None):
        super().__init__(message)
        self.status_code = status_code
        self.error_code = error_code

    def __str__(self):
        parts = [self.args[0]]
        if self.error_code:
            parts.insert(0, f"[{self.error_code}]")
        if self.status_code:
            parts.insert(0, f"HTTP {self.status_code}")
        return " ".join(parts)

def handle_api_error(error: VasApiError):
    """統一錯誤處理"""
    if error.error_code in ("auth_missing_api_key", "auth_invalid_api_key", "auth_key_expired"):
        print(f"認證失敗: {error}")
        print("請檢查 API Key 是否正確")

    elif error.error_code == "auth_budget_exceeded":
        print(f"預算已超額: {error}")
        print("請等待下月預算重置或調整預算")

    elif error.error_code in ("recording_not_found", "import_not_found"):
        print(f"資源不存在: {error}")

    elif error.error_code == "broadcast_session_not_found":
        print(f"廣播不存在: {error}")

    elif error.error_code == "broadcast_cannot_revoke":
        print(f"僅 pending 狀態的廣播可撤銷: {error}")

    elif error.error_code == "validation_failed":
        print(f"參數驗證失敗: {error}")

    elif error.error_code == "import_file_too_large":
        print(f"檔案過大: {error}")
        print("請壓縮或分割檔案(上限 500MB)")

    elif error.error_code == "import_invalid_format":
        print(f"不支援的格式: {error}")
        print("支援格式: mp3, wav, m4a")

    elif error.error_code == "ticket_invalid":
        print(f"Ticket 無效: {error}")
        print("請重新取得 Ticket")

    else:
        print(f"API 錯誤: {error}")

# 使用範例
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

try:
    tasks = api.get_tasks()
except VasApiError as e:
    handle_api_error(e)
except requests.exceptions.ConnectionError:
    print("網路連線失敗,請檢查網路設定")
except requests.exceptions.Timeout:
    print("請求逾時,請稍後重試")

非同步版本(asyncio + aiohttp)

完整的非同步 API Client,適用於高效能或需要並行處理的場景。

import asyncio
import aiohttp
import json
from typing import Optional

class AsyncVasApiClient:
    """非同步 VAS API Client"""

    def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={"X-API-Key": self.api_key},
            )
        return self._session

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()

    async def _request(self, method: str, path: str, **kwargs) -> dict:
        session = await self._get_session()
        url = f"{self.base_url}{path}"

        async with session.request(method, url, **kwargs) as response:
            if not response.ok:
                error = await response.json()
                raise VasApiError(
                    message=error.get("message", f"HTTP {response.status}"),
                    status_code=response.status,
                    error_code=error.get("error_code"),
                )
            return await response.json()

    # === 認證 ===

    async def get_ticket(self) -> dict:
        return await self._request("POST", "/auth/ticket")

    # === Tasks ===

    async def get_tasks(self) -> dict:
        return await self._request("GET", "/tasks")

    async def delete_task(self, task_id: str) -> dict:
        return await self._request("DELETE", f"/tasks/{task_id}")

    async def pin_task(self, task_id: str, is_pinned: bool) -> dict:
        return await self._request("PUT", f"/tasks/{task_id}/pin", json={"is_pinned": is_pinned})

    async def mark_as_read(self, task_id: str) -> dict:
        return await self._request("PUT", f"/tasks/{task_id}/read")

    async def rename_task(self, task_id: str, name: str) -> dict:
        return await self._request("PATCH", f"/tasks/{task_id}/name", json={"name": name})

    # === Imports ===

    async def check_quota(self, duration_ms: int) -> dict:
        return await self._request("POST", "/imports/check-quota", json={"duration_ms": duration_ms})

    async def upload_audio(self, file_path: str, options: dict) -> dict:
        session = await self._get_session()
        url = f"{self.base_url}/imports"

        form = aiohttp.FormData()
        form.add_field("file", open(file_path, "rb"), filename=file_path.split("/")[-1])
        form.add_field("transcription_languages", json.dumps(options.get("transcription_languages", ["zh-TW"])))
        form.add_field("recognition_mode", options.get("recognition_mode", "single"))

        if "translation_languages" in options:
            form.add_field("translation_languages", json.dumps(options["translation_languages"]))
        if "summary_template" in options:
            form.add_field("summary_template", options["summary_template"])

        async with session.post(url, data=form) as response:
            if not response.ok:
                error = await response.json()
                raise VasApiError(
                    message=error.get("message", f"HTTP {response.status}"),
                    status_code=response.status,
                    error_code=error.get("error_code"),
                )
            return await response.json()

    async def get_import_status(self, import_id: str) -> dict:
        return await self._request("GET", f"/imports/{import_id}")

    # === Broadcasts ===

    async def create_broadcast(self, options: dict) -> dict:
        return await self._request("POST", "/broadcasts", json=options)

    async def get_broadcast(self, broadcast_id: str) -> dict:
        return await self._request("GET", f"/broadcasts/{broadcast_id}")

    async def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
        return await self._request("PATCH", f"/broadcasts/{broadcast_id}", json=options)

    async def revoke_broadcast(self, broadcast_id: str) -> dict:
        return await self._request("DELETE", f"/broadcasts/{broadcast_id}")

    # === TTS ===

    async def get_tts_voices(self, language: str) -> dict:
        return await self._request("GET", "/tts/voices", params={"language": language})

    # === Summary Templates ===

    async def get_summary_templates(self) -> dict:
        return await self._request("GET", "/summary-templates")

    # === SSE 歷史紀錄(非同步) ===

    async def load_history(self, task_id: str) -> dict:
        """非同步載入歷史紀錄"""
        session = await self._get_session()
        url = f"{self.base_url}/sse/history/transcribe/{task_id}"

        metadata = None
        sentences = []
        summary = None

        async with session.get(url) as response:
            buffer = ""
            current_event = ""

            async for chunk in response.content.iter_any():
                buffer += chunk.decode("utf-8")
                lines = buffer.split("\n")
                buffer = lines.pop()

                for line in lines:
                    if line.startswith("event: "):
                        current_event = line[7:]
                    elif line.startswith("data: "):
                        data = json.loads(line[6:])

                        if current_event == "init_metadata":
                            metadata = data
                        elif current_event == "init_sentence":
                            sentences.append(data)
                        elif current_event == "init_summary":
                            summary = data.get("text")
                        elif current_event == "init_done":
                            return {
                                "metadata": metadata,
                                "sentences": sentences,
                                "summary": summary,
                            }

        return {"metadata": metadata, "sentences": sentences, "summary": summary}

    async def retranslate(self, task_id: str, target_lang: str) -> list:
        """非同步重新翻譯"""
        session = await self._get_session()
        url = f"{self.base_url}/sse/retranslate/{task_id}"

        results = []

        async with session.get(url, params={"targetLang": target_lang}) as response:
            buffer = ""
            current_event = ""

            async for chunk in response.content.iter_any():
                buffer += chunk.decode("utf-8")
                lines = buffer.split("\n")
                buffer = lines.pop()

                for line in lines:
                    if line.startswith("event: "):
                        current_event = line[7:]
                    elif line.startswith("data: "):
                        data = json.loads(line[6:])
                        if current_event == "translation":
                            results.append(data)
                        elif current_event == "done":
                            return results

        return results

# 使用範例:並行操作
async def main():
    api = AsyncVasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")

    try:
        # 並行取得多項資料
        tasks_result, templates_result, tts_languages = await asyncio.gather(
            api.get_tasks(),
            api.get_summary_templates(),
            api.get_tts_languages(),
        )

        print(f"任務數: {len(tasks_result['tasks'])}")
        print(f"摘要模板: {[t['name'] for t in templates_result['data']]}")
        print(f"TTS 語言數: {tts_languages['data']['total']}")

        # 載入歷史紀錄
        if tasks_result["tasks"]:
            task_id = tasks_result["tasks"][0]["task_id"]
            history = await api.load_history(task_id)
            print(f"\n歷史紀錄: {len(history['sentences'])} 句")

    except VasApiError as e:
        handle_api_error(e)
    finally:
        await api.close()

asyncio.run(main())

非同步上傳並輪詢

async def async_import_audio(api: AsyncVasApiClient, file_path: str, options: dict) -> dict:
    """非同步上傳音檔並等待處理完成"""

    # 上傳
    result = await api.upload_audio(file_path, options)
    import_id = result["data"]["import_id"]
    print(f"匯入 ID: {import_id}")

    # 輪詢
    while True:
        status_result = await api.get_import_status(import_id)
        data = status_result["data"]

        print(f"狀態: {data['status']} | 階段: {data['stage']} | 進度: {data['progress']}%")

        if data["status"] == "completed":
            return {"import_id": import_id, "task_id": data["task_id"]}

        if data["status"] == "failed":
            raise VasApiError(
                message=f"處理失敗: {data['error_message']}",
                status_code=500,
                error_code=data["error_code"],
            )

        await asyncio.sleep(5)

版本:V1.5.7 最後更新:2026-05-20

Copyright © 2026