範例
Python
目錄
- 安裝依賴
- API Client 設定
- Ticket 取得與 WebSocket 連線
- 即時語音翻譯
- 任務匯出(音檔/逐字稿下載)
- 音檔匯入
- 歷史紀錄載入(SSE)
- 廣播觀眾端
- 錯誤處理
- Webhook 處理
- 非同步版本(asyncio + aiohttp)
安裝依賴
pip install requests websockets sseclient-py aiohttp
API Client 設定
使用 requests 封裝,統一透過 X-API-Key header 認證。
import requests
from typing import Optional
class VasApiClient:
"""VAS API Client - 使用 X-API-Key header 認證"""
def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
})
def _url(self, path: str) -> str:
return f"{self.base_url}{path}"
def _handle_response(self, response: requests.Response) -> dict:
if not response.ok:
error = response.json() if response.content else {}
raise VasApiError(
message=error.get("message", f"HTTP {response.status_code}"),
status_code=response.status_code,
error_code=error.get("error_code"),
)
return response.json()
# === 認證 API ===
def get_ticket(self) -> dict:
"""取得 WebSocket 連線用的一次性 Ticket"""
response = self.session.post(self._url("/auth/ticket"))
return self._handle_response(response)
# === Tasks API ===
def get_tasks(self, status: str = "completed") -> dict:
"""取得任務列表"""
response = self.session.get(
self._url("/tasks"),
params={"status": status},
)
return self._handle_response(response)
def delete_task(self, task_id: str) -> dict:
"""刪除任務"""
response = self.session.delete(self._url(f"/tasks/{task_id}"))
return self._handle_response(response)
def pin_task(self, task_id: str, is_pinned: bool) -> dict:
"""更新釘選狀態"""
response = self.session.put(
self._url(f"/tasks/{task_id}/pin"),
json={"is_pinned": is_pinned},
)
return self._handle_response(response)
def mark_as_read(self, task_id: str) -> dict:
"""標記已讀"""
response = self.session.put(self._url(f"/tasks/{task_id}/read"))
return self._handle_response(response)
def rename_task(self, task_id: str, name: str) -> dict:
"""更新任務名稱"""
response = self.session.patch(
self._url(f"/tasks/{task_id}/name"),
json={"name": name},
)
return self._handle_response(response)
# === Imports API ===
def check_quota(self, duration_ms: int) -> dict:
"""檢查上傳預算"""
response = self.session.post(
self._url("/imports/check-quota"),
json={"duration_ms": duration_ms},
)
return self._handle_response(response)
def upload_audio(
self,
file_path: str,
transcription_languages: list,
recognition_mode: str = "single",
translation_languages: Optional[list] = None,
summary_template: Optional[str] = None,
terminology: Optional[dict] = None,
callback_url: Optional[str] = None,
) -> dict:
"""上傳音檔"""
import json
with open(file_path, "rb") as f:
files = {"file": f}
data = {
"transcription_languages": json.dumps(transcription_languages),
"recognition_mode": recognition_mode,
}
if translation_languages:
data["translation_languages"] = json.dumps(translation_languages)
if summary_template:
data["summary_template"] = summary_template
if terminology:
data["terminology"] = json.dumps(terminology)
if callback_url:
data["callback_url"] = callback_url
response = self.session.post(
self._url("/imports"),
files=files,
data=data,
)
return self._handle_response(response)
def get_import_status(self, import_id: str) -> dict:
"""查詢匯入狀態"""
response = self.session.get(self._url(f"/imports/{import_id}"))
return self._handle_response(response)
def get_imports(self, per_page: int = 20) -> dict:
"""取得匯入列表"""
response = self.session.get(
self._url("/imports"),
params={"per_page": per_page},
)
return self._handle_response(response)
# === Broadcasts API ===
def create_broadcast(self, options: dict) -> dict:
"""建立廣播"""
response = self.session.post(self._url("/broadcasts"), json=options)
return self._handle_response(response)
def get_broadcast(self, broadcast_id: str) -> dict:
"""查詢廣播狀態"""
response = self.session.get(self._url(f"/broadcasts/{broadcast_id}"))
return self._handle_response(response)
def get_broadcasts(self, page: int = 1, per_page: int = 20) -> dict:
"""取得廣播列表"""
response = self.session.get(
self._url("/broadcasts"),
params={"page": page, "per_page": per_page},
)
return self._handle_response(response)
def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
"""更新廣播設定"""
response = self.session.patch(
self._url(f"/broadcasts/{broadcast_id}"),
json=options,
)
return self._handle_response(response)
def revoke_broadcast(self, broadcast_id: str) -> dict:
"""撤銷廣播"""
response = self.session.delete(self._url(f"/broadcasts/{broadcast_id}"))
return self._handle_response(response)
# === TTS API ===
def get_tts_voices(self, language: str) -> dict:
"""取得 TTS 語音列表"""
response = self.session.get(
self._url("/tts/voices"),
params={"language": language},
)
return self._handle_response(response)
def download_tts_sample(self, voice_name: str, output_path: str):
"""下載 TTS 語音示範"""
response = self.session.get(self._url(f"/tts/voices/{voice_name}/sample"))
if response.ok:
with open(output_path, "wb") as f:
f.write(response.content)
else:
raise VasApiError(f"下載失敗: HTTP {response.status_code}", response.status_code)
# === Speakers API ===
def rename_speaker(self, recording_id: str, speaker_id: str, new_label: str) -> dict:
"""全域重命名說話者"""
response = self.session.patch(
self._url(f"/recordings/{recording_id}/speakers/rename"),
json={"speaker_id": speaker_id, "new_label": new_label},
)
return self._handle_response(response)
def reassign_speaker(self, recording_id: str, sid: int, target_speaker_id: str) -> dict:
"""重新指派單句語者"""
response = self.session.patch(
self._url(f"/recordings/{recording_id}/speakers/reassign"),
json={"sid": sid, "target_speaker_id": target_speaker_id},
)
return self._handle_response(response)
# === Summary Templates API ===
def get_summary_templates(self) -> dict:
"""取得摘要模板列表"""
response = self.session.get(self._url("/summary-templates"))
return self._handle_response(response)
# === Audio API ===
def download_audio(self, task_id: str, output_path: str):
"""下載任務錄音音訊"""
response = self.session.get(self._url(f"/sse/audio/{task_id}"))
if response.ok:
with open(output_path, "wb") as f:
f.write(response.content)
else:
raise VasApiError(f"下載失敗: HTTP {response.status_code}", response.status_code)
# 使用範例
if __name__ == "__main__":
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
# 取得任務列表
tasks = api.get_tasks()
print(f"共有 {len(tasks['tasks'])} 個任務")
for task in tasks["tasks"]:
print(f" [{task['task_id'][:8]}] {task['title']} ({task['duration_formatted']})")
Ticket 取得與 WebSocket 連線
WebSocket 使用 Ticket 機制認證,Ticket 透過 Sec-WebSocket-Protocol 傳遞。
import asyncio
import websockets
import json
async def connect_websocket(api: VasApiClient):
"""取得 Ticket 並建立 WebSocket 連線"""
# 步驟 1:取得一次性 Ticket
result = api.get_ticket()
ticket = result["ticket"]
print(f"取得 Ticket,有效期 {result['expires_in']} 秒")
# 步驟 2:使用 Ticket 連線 WebSocket
ws = await websockets.connect(
"wss://vas-poc.vurbo.ai/ws",
subprotocols=[f"ticket.{ticket}"],
)
print("WebSocket 連線成功")
return ws
# 使用範例
async def main():
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
ws = await connect_websocket(api)
# 發送 ping
await ws.send(json.dumps({
"type": "health",
"data": {"action": "ping"},
}))
# 接收 pong
response = await ws.recv()
msg = json.loads(response)
print(f"收到: {msg}")
await ws.close()
asyncio.run(main())
即時語音翻譯
完整的 VurboClient 類別,使用 websockets 庫。
import asyncio
import websockets
import json
import base64
from typing import Callable, Optional, List
class VurboClient:
"""VAS WebSocket 客戶端 - 使用 Ticket 機制認證"""
def __init__(self, api: VasApiClient):
self.api = api
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.on_result: Optional[Callable] = None
self.on_error: Optional[Callable] = None
self.on_task_complete: Optional[Callable] = None
self.on_session_started: Optional[Callable] = None
self.on_tts_ready: Optional[Callable] = None
self._heartbeat_task: Optional[asyncio.Task] = None
async def connect(self):
"""使用 Ticket 機制連線 WebSocket"""
result = self.api.get_ticket()
ticket = result["ticket"]
self.ws = await websockets.connect(
"wss://vas-poc.vurbo.ai/ws",
subprotocols=[f"ticket.{ticket}"],
)
print("Vurbo.ai 連線成功")
# 啟動心跳
self._heartbeat_task = asyncio.create_task(self._heartbeat())
async def _heartbeat(self):
"""每 30 秒發送一次 ping"""
while True:
try:
await asyncio.sleep(30)
if self.ws and self.ws.open:
await self.ws.send(json.dumps({
"type": "health",
"data": {"action": "ping"},
}))
except Exception:
break
async def start_translation(
self,
source_lang: List[str],
target_lang: Optional[List[str]] = None,
recording_type: str = "transcribe",
recognition_mode: str = "single",
summary_template: str = "meeting",
audio_format: str = "pcm",
tts_enabled: bool = False,
tts_language: Optional[str] = None,
tts_voice: Optional[str] = None,
):
"""開始語音翻譯"""
data = {
"action": "start",
"transcription_languages": source_lang,
"translation_languages": target_lang or [],
"type": recording_type,
"recognition_mode": recognition_mode,
"audio_format": audio_format,
}
if recording_type == "transcribe":
data["summary_template"] = summary_template
if tts_enabled:
data["tts_enabled"] = True
data["tts_language"] = tts_language
data["tts_voice"] = tts_voice
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": data,
}))
async def set_config(
self,
terminology: Optional[dict] = None,
fuzzy_correction: Optional[dict] = None,
translation_dict: Optional[list] = None,
):
"""設定術語庫、模糊詞校正、翻譯字典"""
data = {"action": "config"}
if terminology:
data["terminology"] = terminology
if fuzzy_correction:
data["fuzzy_correction"] = fuzzy_correction
if translation_dict:
data["translation_dict"] = translation_dict
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": data,
}))
async def send_audio(self, audio_data: bytes):
"""傳送音訊資料(自動 Base64 編碼)"""
base64_audio = base64.b64encode(audio_data).decode("utf-8")
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "audio", "payload": base64_audio},
}))
async def pause(self):
"""暫停翻譯"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "pause"},
}))
async def resume(self):
"""恢復翻譯"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "resume"},
}))
async def stop(self):
"""停止翻譯"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "stop"},
}))
async def set_name(self, name: str):
"""設定錄音名稱"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "set_name", "name": name},
}))
async def switch_language(self, target_languages: List[str]):
"""切換翻譯語言"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "switch_language",
"translation_languages": target_languages,
},
}))
async def rename_speaker(self, speaker_id: str, new_label: str):
"""全域重命名說話者"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "rename_speaker",
"speaker_id": speaker_id,
"new_label": new_label,
},
}))
async def reassign_speaker(self, sid: int, target_speaker_id: str):
"""重新指派單句語者"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "reassign_speaker",
"sid": sid,
"target_speaker_id": target_speaker_id,
},
}))
async def merge_speakers(self, source_speaker_id: str, target_speaker_id: str):
"""合併語者"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {
"action": "merge_speakers",
"source_speaker_id": source_speaker_id,
"target_speaker_id": target_speaker_id,
},
}))
async def tts_play(self, sid: int, length: int = 1):
"""播放 TTS"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "tts_play", "sid": sid, "length": length},
}))
async def tts_stop(self):
"""停止 TTS"""
await self.ws.send(json.dumps({
"type": "voice-translation",
"data": {"action": "tts_stop"},
}))
async def receive_messages(self):
"""持續接收並處理訊息"""
async for message in self.ws:
msg = json.loads(message)
await self._handle_message(msg)
async def _handle_message(self, msg: dict):
"""處理收到的訊息"""
if msg["type"] == "error":
print(f"錯誤: {msg['data']['message']}")
if self.on_error:
self.on_error(msg["data"])
return
if msg["type"] == "voice-translation":
data = msg["data"]
action = data.get("action")
if action == "session_started":
if self.on_session_started:
self.on_session_started(data)
elif action == "result":
if self.on_result:
self.on_result(
origin=data.get("origin"),
translations=data.get("translations"),
)
elif action == "task_complete":
if self.on_task_complete:
self.on_task_complete(data.get("task_id"))
elif action == "tts_ready":
if self.on_tts_ready:
self.on_tts_ready(data)
async def disconnect(self):
"""中斷連線"""
if self._heartbeat_task:
self._heartbeat_task.cancel()
if self.ws:
await self.ws.close()
使用範例:即時翻譯
import asyncio
async def main():
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
client = VurboClient(api)
# 設定回調
def on_session_started(data):
print(f"Session 已啟動: {data['session_id']}")
print(f"錄音 ID: {data['recording_id']}")
def on_result(origin, translations):
if origin and origin.get("is_final"):
print(f"[{origin['start_time']}] {origin['text']}")
if translations:
for lang, result in translations.items():
if result.get("is_final"):
print(f" 翻譯 ({lang}): {result['text']}")
def on_task_complete(task_id):
print(f"任務完成: {task_id}")
client.on_session_started = on_session_started
client.on_result = on_result
client.on_task_complete = on_task_complete
# 連線
await client.connect()
# (可選)設定術語庫
await client.set_config(
terminology={
"zh-TW": [
{"term": "語者分離", "boost": 1.5},
{"term": "即時轉錄", "boost": 1.5},
],
},
)
# 開始翻譯
await client.start_translation(
source_lang=["zh-TW"],
target_lang=["en-US"],
recording_type="transcribe",
recognition_mode="multi_speaker",
summary_template="meeting",
)
# 接收訊息
await client.receive_messages()
asyncio.run(main())
使用範例:麥克風錄音
import asyncio
import pyaudio
async def record_and_translate():
"""使用麥克風錄音並即時翻譯"""
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
client = VurboClient(api)
sentences = []
def on_result(origin, translations):
if origin and origin.get("is_final"):
sentence = {
"sid": origin["sid"],
"text": origin["text"],
"time": origin["start_time"],
}
if translations:
first_lang = list(translations.keys())[0]
sentence["translation"] = translations[first_lang]["text"]
sentences.append(sentence)
print(f"[{sentence['time']}] {sentence['text']}")
if sentence.get("translation"):
print(f" 翻譯: {sentence['translation']}")
def on_task_complete(task_id):
print(f"\n任務完成: {task_id}")
print(f"共 {len(sentences)} 句")
client.on_result = on_result
client.on_task_complete = on_task_complete
# 連線並開始
await client.connect()
await client.start_translation(
source_lang=["zh-TW"],
target_lang=["en-US"],
)
# 開始麥克風錄音
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=4096,
)
print("開始錄音...(按 Ctrl+C 停止)")
async def send_audio_loop():
while True:
data = stream.read(4096, exception_on_overflow=False)
await client.send_audio(data)
await asyncio.sleep(0)
try:
await asyncio.gather(
send_audio_loop(),
client.receive_messages(),
)
except KeyboardInterrupt:
print("\n停止錄音...")
await client.stop()
# 等待 task_complete 事件
try:
await asyncio.wait_for(client.receive_messages(), timeout=30)
except asyncio.TimeoutError:
pass
finally:
stream.stop_stream()
stream.close()
audio.terminate()
await client.disconnect()
asyncio.run(record_and_translate())
任務匯出(音檔/逐字稿下載)
下載已完成任務的音檔或逐字稿。逐字稿支援 txt、srt、sbv、vtt、csv 五種格式,內容包含原文與所有翻譯語言。
import re
from urllib.parse import unquote
from pathlib import Path
def _parse_filename(content_disposition: str, fallback: str) -> str:
"""從 Content-Disposition 取得 RFC 5987 編碼的 UTF-8 檔名。"""
match = re.search(r"filename\*=UTF-8''([^;]+)", content_disposition or "")
return unquote(match.group(1)) if match else fallback
def download_audio(api: VasApiClient, task_id: str, output_dir: str = ".") -> str:
"""下載任務音檔。回傳儲存的檔案路徑。"""
response = api.session.get(
f"{api.base_url}/api/v1/tasks/{task_id}/audio/export",
stream=True,
timeout=120,
)
response.raise_for_status()
filename = _parse_filename(
response.headers.get("Content-Disposition", ""),
fallback=f"audio-{task_id}.m4a",
)
output_path = Path(output_dir) / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return str(output_path)
def download_transcript(
api: VasApiClient,
task_id: str,
format: str = "srt",
output_dir: str = ".",
) -> str:
"""下載逐字稿。format 支援 txt / srt / sbv / vtt / csv。"""
if format not in {"txt", "srt", "sbv", "vtt", "csv"}:
raise ValueError(f"Unsupported format: {format}")
response = api.session.get(
f"{api.base_url}/api/v1/tasks/{task_id}/transcript/export",
params={"format": format},
stream=True,
timeout=60,
)
response.raise_for_status()
filename = _parse_filename(
response.headers.get("Content-Disposition", ""),
fallback=f"transcript-{task_id}.{format}",
)
output_path = Path(output_dir) / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return str(output_path)
# 使用範例
audio_path = download_audio(client, task_id="550e8400-e29b-41d4-a716-446655440000")
print(f"音檔已下載:{audio_path}")
srt_path = download_transcript(
client,
task_id="550e8400-e29b-41d4-a716-446655440000",
format="srt",
)
print(f"字幕已下載:{srt_path}")
# CSV 版本可直接交給 pandas 分析
# import pandas as pd
# csv_path = download_transcript(client, task_id, format="csv")
# df = pd.read_csv(csv_path)
注意:匯出需
processing_status = completed;未就緒會回422 recording_transcript_not_ready。
音檔匯入
上傳音檔並輪詢處理狀態。
import time
def import_audio(
api: VasApiClient,
file_path: str,
transcription_languages: list = None,
translation_languages: list = None,
recognition_mode: str = "multi_speaker",
summary_template: str = "meeting",
) -> dict:
"""上傳音檔並等待處理完成"""
transcription_languages = transcription_languages or ["zh-TW"]
translation_languages = translation_languages or ["en-US"]
# 步驟 1:上傳音檔
print(f"上傳音檔: {file_path}")
result = api.upload_audio(
file_path=file_path,
transcription_languages=transcription_languages,
translation_languages=translation_languages,
recognition_mode=recognition_mode,
summary_template=summary_template,
)
import_id = result["data"]["import_id"]
print(f"匯入 ID: {import_id}")
# 步驟 2:輪詢狀態
while True:
status_result = api.get_import_status(import_id)
data = status_result["data"]
status = data["status"]
stage = data["stage"]
progress = data["progress"]
print(f"狀態: {status} | 階段: {stage} | 進度: {progress}%")
if status == "completed":
task_id = data["task_id"]
print(f"處理完成!Task ID: {task_id}")
return {"import_id": import_id, "task_id": task_id}
if status == "failed":
raise VasApiError(
message=f"處理失敗: {data['error_message']}",
status_code=500,
error_code=data["error_code"],
)
time.sleep(5) # 每 5 秒查詢一次
# 使用範例
if __name__ == "__main__":
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
# 先檢查預算
quota = api.check_quota(3600000) # 1 小時音檔
if quota["data"]["allowed"]:
print(f"預算充足,剩餘 ${quota['data']['remaining_budget']} USD")
result = import_audio(
api,
file_path="meeting.mp3",
transcription_languages=["zh-TW"],
translation_languages=["en-US", "ja-JP"],
recognition_mode="multi_speaker",
summary_template="meeting",
)
print(f"可使用 Task ID 載入歷史紀錄: {result['task_id']}")
else:
print("預算不足")
歷史紀錄載入(SSE)
使用 sseclient-py 處理 SSE 串流,認證透過 X-API-Key header。
import requests
import sseclient
import json
def load_history(task_id: str, api_key: str) -> dict:
"""載入歷史紀錄"""
url = f"https://vas-poc.vurbo.ai/api/v1/sse/history/transcribe/{task_id}"
response = requests.get(
url,
headers={"X-API-Key": api_key},
stream=True,
)
client = sseclient.SSEClient(response)
metadata = None
sentences = []
summary = None
for event in client.events():
if event.event == "init_metadata":
metadata = json.loads(event.data)
print(f"任務: {metadata['title']}")
elif event.event == "init_sentence":
sentence = json.loads(event.data)
sentences.append(sentence)
elif event.event == "init_summary":
summary = json.loads(event.data)["text"]
elif event.event == "init_done":
break
return {
"metadata": metadata,
"sentences": sentences,
"summary": summary,
}
def retranslate(task_id: str, target_lang: str, api_key: str) -> list:
"""重新翻譯全文"""
url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/{task_id}?targetLang={target_lang}"
response = requests.get(
url,
headers={"X-API-Key": api_key},
stream=True,
)
client = sseclient.SSEClient(response)
results = []
for event in client.events():
if event.event == "translation":
data = json.loads(event.data)
results.append(data)
print(f"句子 {data['sid']}: {data['text']}")
elif event.event == "done":
break
return results
def retranslate_summary(task_id: str, target_lang: str, api_key: str) -> str:
"""重新翻譯摘要"""
url = f"https://vas-poc.vurbo.ai/api/v1/sse/retranslate/summary/{task_id}?targetLang={target_lang}"
response = requests.get(
url,
headers={"X-API-Key": api_key},
stream=True,
)
client = sseclient.SSEClient(response)
final_text = ""
for event in client.events():
if event.event == "summary_translation":
data = json.loads(event.data)
if data.get("is_final"):
final_text = data["text"]
elif event.event == "done":
break
return final_text
# 使用範例
if __name__ == "__main__":
api_key = "vas_YOUR_API_KEY_HERE_32_CHARACTERS"
task_id = "550e8400-e29b-41d4-a716-446655440000"
# 載入歷史
history = load_history(task_id, api_key)
print(f"\n共有 {len(history['sentences'])} 句")
for s in history["sentences"]:
print(f" [{s['start_time']}] {s['origin']}")
if s.get("translations"):
for lang, text in s["translations"].items():
print(f" [{lang}] {text}")
if history["summary"]:
print(f"\n摘要: {history['summary'][:100]}...")
# 重新翻譯為日文
print("\n重新翻譯為日文...")
translations = retranslate(task_id, "ja-JP", api_key)
print(f"翻譯完成,共 {len(translations)} 句")
廣播觀眾端
觀眾透過 SSE 接收即時字幕和翻譯。
import requests
import sseclient
import json
class BroadcastViewer:
"""廣播觀眾端"""
def __init__(self, token: str):
self.token = token
self.base_url = "https://vas-poc.vurbo.ai"
def get_broadcast_info(self) -> dict:
"""取得廣播公開資訊(不需認證)"""
response = requests.get(
f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}"
)
return response.json()
def verify_password(self, password: str) -> dict:
"""密碼驗證"""
response = requests.post(
f"{self.base_url}/api/v1/viewer/broadcasts/{self.token}/verify",
json={"password": password},
)
return response.json()
def connect(
self,
lang: str = None,
viewer_access_token: str = None,
tts_voice: str = None,
on_sentence=None,
on_translation=None,
on_tts_ready=None,
on_announcement=None,
on_standby=None,
on_live=None,
on_ended=None,
):
"""連線 SSE 接收即時字幕"""
params = {}
if lang:
params["lang"] = lang
if viewer_access_token:
params["viewer_access_token"] = viewer_access_token
if tts_voice:
params["tts_voice"] = tts_voice
url = f"{self.base_url}/broadcast/{self.token}/text"
response = requests.get(url, params=params, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data) if event.data else {}
if event.event == "sentence" and on_sentence:
on_sentence(data)
elif event.event == "translation" and on_translation:
on_translation(data)
elif event.event == "tts_ready" and on_tts_ready:
on_tts_ready(data)
elif event.event == "announcement" and on_announcement:
on_announcement(data)
elif event.event == "standby" and on_standby:
on_standby(data)
elif event.event == "live" and on_live:
on_live()
elif event.event == "ended":
if on_ended:
on_ended()
break
# 使用範例
if __name__ == "__main__":
viewer = BroadcastViewer("a3f9")
# 取得廣播資訊
info = viewer.get_broadcast_info()
print(f"廣播名稱: {info['data']['name']}")
print(f"需要密碼: {info['data']['requires_password']}")
print(f"可用語言: {info['data']['translation_languages']}")
# 若需要密碼驗證
viewer_access_token = None
if info["data"]["requires_password"]:
verify_result = viewer.verify_password("mySecret123")
viewer_access_token = verify_result["data"]["viewer_access_token"]
print("密碼驗證成功")
# 接收即時字幕
def on_sentence(data):
print(f"[原文] {data['text']}")
def on_translation(data):
print(f"[翻譯] {data['text']}")
def on_announcement(data):
print(f"[公告] {data['message']}")
def on_ended():
print("廣播已結束")
print("開始接收字幕...")
viewer.connect(
lang="en-US",
viewer_access_token=viewer_access_token,
on_sentence=on_sentence,
on_translation=on_translation,
on_announcement=on_announcement,
on_ended=on_ended,
)
Webhook 處理
接收並驗證 VAS Webhook 事件。
import hmac
import hashlib
import json
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_here"
def verify_signature(timestamp, body, signature):
"""驗證 Webhook 簽名"""
signature_payload = f"{timestamp}.{body}"
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(),
signature_payload.encode(),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(signature, expected)
@app.route("/webhooks/vas", methods=["POST"])
def handle_webhook():
timestamp = request.headers.get("X-VAS-Timestamp", "")
signature = request.headers.get("X-VAS-Signature", "")
raw_body = request.get_data(as_text=True)
if not verify_signature(timestamp, raw_body, signature):
return jsonify({"error": "Invalid signature"}), 401
payload = request.get_json()
event = payload["event"]
data = payload["data"]
if event == "recording.completed":
print(f"錄音完成: task_id={data['task_id']}")
elif event == "import.completed":
print(f"匯入完成: import_id={data['import_id']}")
elif event in ("recording.failed", "import.failed"):
print(f"處理失敗: {data.get('error') or data.get('error_message')}")
return jsonify({"received": True}), 200
完整的 Webhook 指南請參考 Webhook 回呼指南。
錯誤處理
class VasApiError(Exception):
"""VAS API 錯誤"""
def __init__(self, message: str, status_code: int = None, error_code: str = None):
super().__init__(message)
self.status_code = status_code
self.error_code = error_code
def __str__(self):
parts = [self.args[0]]
if self.error_code:
parts.insert(0, f"[{self.error_code}]")
if self.status_code:
parts.insert(0, f"HTTP {self.status_code}")
return " ".join(parts)
def handle_api_error(error: VasApiError):
"""統一錯誤處理"""
if error.error_code in ("auth_missing_api_key", "auth_invalid_api_key", "auth_key_expired"):
print(f"認證失敗: {error}")
print("請檢查 API Key 是否正確")
elif error.error_code == "auth_budget_exceeded":
print(f"預算已超額: {error}")
print("請等待下月預算重置或調整預算")
elif error.error_code in ("recording_not_found", "import_not_found"):
print(f"資源不存在: {error}")
elif error.error_code == "broadcast_session_not_found":
print(f"廣播不存在: {error}")
elif error.error_code == "broadcast_cannot_revoke":
print(f"僅 pending 狀態的廣播可撤銷: {error}")
elif error.error_code == "validation_failed":
print(f"參數驗證失敗: {error}")
elif error.error_code == "import_file_too_large":
print(f"檔案過大: {error}")
print("請壓縮或分割檔案(上限 500MB)")
elif error.error_code == "import_invalid_format":
print(f"不支援的格式: {error}")
print("支援格式: mp3, wav, m4a")
elif error.error_code == "ticket_invalid":
print(f"Ticket 無效: {error}")
print("請重新取得 Ticket")
else:
print(f"API 錯誤: {error}")
# 使用範例
api = VasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
try:
tasks = api.get_tasks()
except VasApiError as e:
handle_api_error(e)
except requests.exceptions.ConnectionError:
print("網路連線失敗,請檢查網路設定")
except requests.exceptions.Timeout:
print("請求逾時,請稍後重試")
非同步版本(asyncio + aiohttp)
完整的非同步 API Client,適用於高效能或需要並行處理的場景。
import asyncio
import aiohttp
import json
from typing import Optional
class AsyncVasApiClient:
"""非同步 VAS API Client"""
def __init__(self, api_key: str, base_url: str = "https://vas-poc.vurbo.ai/api/v1"):
self.api_key = api_key
self.base_url = base_url
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={"X-API-Key": self.api_key},
)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
async def _request(self, method: str, path: str, **kwargs) -> dict:
session = await self._get_session()
url = f"{self.base_url}{path}"
async with session.request(method, url, **kwargs) as response:
if not response.ok:
error = await response.json()
raise VasApiError(
message=error.get("message", f"HTTP {response.status}"),
status_code=response.status,
error_code=error.get("error_code"),
)
return await response.json()
# === 認證 ===
async def get_ticket(self) -> dict:
return await self._request("POST", "/auth/ticket")
# === Tasks ===
async def get_tasks(self) -> dict:
return await self._request("GET", "/tasks")
async def delete_task(self, task_id: str) -> dict:
return await self._request("DELETE", f"/tasks/{task_id}")
async def pin_task(self, task_id: str, is_pinned: bool) -> dict:
return await self._request("PUT", f"/tasks/{task_id}/pin", json={"is_pinned": is_pinned})
async def mark_as_read(self, task_id: str) -> dict:
return await self._request("PUT", f"/tasks/{task_id}/read")
async def rename_task(self, task_id: str, name: str) -> dict:
return await self._request("PATCH", f"/tasks/{task_id}/name", json={"name": name})
# === Imports ===
async def check_quota(self, duration_ms: int) -> dict:
return await self._request("POST", "/imports/check-quota", json={"duration_ms": duration_ms})
async def upload_audio(self, file_path: str, options: dict) -> dict:
session = await self._get_session()
url = f"{self.base_url}/imports"
form = aiohttp.FormData()
form.add_field("file", open(file_path, "rb"), filename=file_path.split("/")[-1])
form.add_field("transcription_languages", json.dumps(options.get("transcription_languages", ["zh-TW"])))
form.add_field("recognition_mode", options.get("recognition_mode", "single"))
if "translation_languages" in options:
form.add_field("translation_languages", json.dumps(options["translation_languages"]))
if "summary_template" in options:
form.add_field("summary_template", options["summary_template"])
async with session.post(url, data=form) as response:
if not response.ok:
error = await response.json()
raise VasApiError(
message=error.get("message", f"HTTP {response.status}"),
status_code=response.status,
error_code=error.get("error_code"),
)
return await response.json()
async def get_import_status(self, import_id: str) -> dict:
return await self._request("GET", f"/imports/{import_id}")
# === Broadcasts ===
async def create_broadcast(self, options: dict) -> dict:
return await self._request("POST", "/broadcasts", json=options)
async def get_broadcast(self, broadcast_id: str) -> dict:
return await self._request("GET", f"/broadcasts/{broadcast_id}")
async def update_broadcast(self, broadcast_id: str, options: dict) -> dict:
return await self._request("PATCH", f"/broadcasts/{broadcast_id}", json=options)
async def revoke_broadcast(self, broadcast_id: str) -> dict:
return await self._request("DELETE", f"/broadcasts/{broadcast_id}")
# === TTS ===
async def get_tts_voices(self, language: str) -> dict:
return await self._request("GET", "/tts/voices", params={"language": language})
# === Summary Templates ===
async def get_summary_templates(self) -> dict:
return await self._request("GET", "/summary-templates")
# === SSE 歷史紀錄(非同步) ===
async def load_history(self, task_id: str) -> dict:
"""非同步載入歷史紀錄"""
session = await self._get_session()
url = f"{self.base_url}/sse/history/transcribe/{task_id}"
metadata = None
sentences = []
summary = None
async with session.get(url) as response:
buffer = ""
current_event = ""
async for chunk in response.content.iter_any():
buffer += chunk.decode("utf-8")
lines = buffer.split("\n")
buffer = lines.pop()
for line in lines:
if line.startswith("event: "):
current_event = line[7:]
elif line.startswith("data: "):
data = json.loads(line[6:])
if current_event == "init_metadata":
metadata = data
elif current_event == "init_sentence":
sentences.append(data)
elif current_event == "init_summary":
summary = data.get("text")
elif current_event == "init_done":
return {
"metadata": metadata,
"sentences": sentences,
"summary": summary,
}
return {"metadata": metadata, "sentences": sentences, "summary": summary}
async def retranslate(self, task_id: str, target_lang: str) -> list:
"""非同步重新翻譯"""
session = await self._get_session()
url = f"{self.base_url}/sse/retranslate/{task_id}"
results = []
async with session.get(url, params={"targetLang": target_lang}) as response:
buffer = ""
current_event = ""
async for chunk in response.content.iter_any():
buffer += chunk.decode("utf-8")
lines = buffer.split("\n")
buffer = lines.pop()
for line in lines:
if line.startswith("event: "):
current_event = line[7:]
elif line.startswith("data: "):
data = json.loads(line[6:])
if current_event == "translation":
results.append(data)
elif current_event == "done":
return results
return results
# 使用範例:並行操作
async def main():
api = AsyncVasApiClient("vas_YOUR_API_KEY_HERE_32_CHARACTERS")
try:
# 並行取得多項資料
tasks_result, templates_result, tts_languages = await asyncio.gather(
api.get_tasks(),
api.get_summary_templates(),
api.get_tts_languages(),
)
print(f"任務數: {len(tasks_result['tasks'])}")
print(f"摘要模板: {[t['name'] for t in templates_result['data']]}")
print(f"TTS 語言數: {tts_languages['data']['total']}")
# 載入歷史紀錄
if tasks_result["tasks"]:
task_id = tasks_result["tasks"][0]["task_id"]
history = await api.load_history(task_id)
print(f"\n歷史紀錄: {len(history['sentences'])} 句")
except VasApiError as e:
handle_api_error(e)
finally:
await api.close()
asyncio.run(main())
非同步上傳並輪詢
async def async_import_audio(api: AsyncVasApiClient, file_path: str, options: dict) -> dict:
"""非同步上傳音檔並等待處理完成"""
# 上傳
result = await api.upload_audio(file_path, options)
import_id = result["data"]["import_id"]
print(f"匯入 ID: {import_id}")
# 輪詢
while True:
status_result = await api.get_import_status(import_id)
data = status_result["data"]
print(f"狀態: {data['status']} | 階段: {data['stage']} | 進度: {data['progress']}%")
if data["status"] == "completed":
return {"import_id": import_id, "task_id": data["task_id"]}
if data["status"] == "failed":
raise VasApiError(
message=f"處理失敗: {data['error_message']}",
status_code=500,
error_code=data["error_code"],
)
await asyncio.sleep(5)
版本:V1.5.7 最後更新:2026-05-20