Enhance video generation API: add polling URL, video URLs, and error handling; implement polling status endpoint with tests

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
2026-04-27 19:05:20 +02:00
parent 4edadd7623
commit 3b807c0f75
4 changed files with 99 additions and 15 deletions
+14 -2
View File
@@ -50,6 +50,9 @@ async def chat_completion(
"max_tokens": max_tokens,
}
async with httpx.AsyncClient(timeout=60) as client:
resp = client.build_request(
"POST", f"{base_url}/chat/completions", headers=_headers(), json=payload
)
response = await client.send(resp)
response.raise_for_status()
return response.json()
@@ -90,7 +93,7 @@ async def generate_video(
payload["duration_seconds"] = duration_seconds
async with httpx.AsyncClient(timeout=120) as client:
resp = client.build_request(
"POST", f"{base_url}/video/generations", headers=_headers(), json=payload
"POST", f"{base_url}/videos", headers=_headers(), json=payload
)
response = await client.send(resp)
response.raise_for_status()
@@ -116,8 +119,17 @@ async def generate_video_from_image(
payload["duration_seconds"] = duration_seconds
async with httpx.AsyncClient(timeout=120) as client:
resp = client.build_request(
"POST", f"{base_url}/video/generations/from-image", headers=_headers(), json=payload
"POST", f"{base_url}/videos", headers=_headers(), json=payload
)
response = await client.send(resp)
response.raise_for_status()
return response.json()
async def poll_video_status(polling_url: str) -> dict[str, Any]:
"""Check the status of a video generation job via its polling_url."""
async with httpx.AsyncClient(timeout=15) as client:
resp = client.build_request("GET", polling_url, headers=_headers())
response = await client.send(resp)
response.raise_for_status()
return response.json()