Merge pull request #23737 from BerriAI/litellm_create-character-endpoint-fixes

[Feat] Add create character endpoints and other new videos Endpoints
This commit is contained in:
Sameer Kankute
2026-03-16 19:53:35 +05:30
committed by GitHub
22 changed files with 3093 additions and 26 deletions
@@ -0,0 +1,128 @@
---
slug: video_characters_api
title: "New Video Characters, Edit and Extension API support"
date: 2026-03-16T10:00:00
authors:
- name: Sameer Kankute
title: SWE @ LiteLLM
url: https://www.linkedin.com/in/sameer-kankute/
image_url: https://pbs.twimg.com/profile_images/2001352686994907136/ONgNuSk5_400x400.jpg
- name: Krrish Dholakia
title: "CEO, LiteLLM"
url: https://www.linkedin.com/in/krish-d/
image_url: https://pbs.twimg.com/profile_images/1298587542745358340/DZv3Oj-h_400x400.jpg
- name: Ishaan Jaff
title: "CTO, LiteLLM"
url: https://www.linkedin.com/in/reffajnaahsi/
image_url: https://pbs.twimg.com/profile_images/1613813310264340481/lz54oEiB_400x400.jpg
description: "LiteLLM now supports creating, retrieving, and managing reusable video characters across multiple video generations."
tags: [videos, characters, proxy, routing]
hide_table_of_contents: false
---
LiteLLM now supoports videos character, edit and extension apis.
## What's New
Four new endpoints for video character operations:
- **Create character** - Upload a video to create a reusable asset
- **Get character** - Retrieve character metadata
- **Edit video** - Modify generated videos
- **Extend video** - Continue clips with character consistency
**Available from:** LiteLLM v1.83.0+
## Quick Example
```python
import litellm
# Create character from video
character = litellm.avideo_create_character(
name="Luna",
video=open("luna.mp4", "rb"),
custom_llm_provider="openai",
model="sora-2"
)
print(f"Character: {character.id}")
# Use in generation
video = litellm.avideo(
model="sora-2",
prompt="Luna dances through a magical forest.",
characters=[{"id": character.id}],
seconds="8"
)
# Get character info
fetched = litellm.avideo_get_character(
character_id=character.id,
custom_llm_provider="openai"
)
# Edit with character preserved
edited = litellm.avideo_edit(
video_id=video.id,
prompt="Add warm golden lighting"
)
# Extend sequence
extended = litellm.avideo_extension(
video_id=video.id,
prompt="Luna waves goodbye",
seconds="5"
)
```
## Via Proxy
```bash
# Create character
curl -X POST "http://localhost:4000/v1/videos/characters" \
-H "Authorization: Bearer sk-litellm-key" \
-F "video=@luna.mp4" \
-F "name=Luna"
# Get character
curl -X GET "http://localhost:4000/v1/videos/characters/char_abc123def456" \
-H "Authorization: Bearer sk-litellm-key"
# Edit video
curl -X POST "http://localhost:4000/v1/videos/edits" \
-H "Authorization: Bearer sk-litellm-key" \
-H "Content-Type: application/json" \
-d '{
"video": {"id": "video_xyz789"},
"prompt": "Add warm golden lighting and enhance colors"
}'
# Extend video
curl -X POST "http://localhost:4000/v1/videos/extensions" \
-H "Authorization: Bearer sk-litellm-key" \
-H "Content-Type: application/json" \
-d '{
"video": {"id": "video_xyz789"},
"prompt": "Luna waves goodbye and walks into the sunset",
"seconds": "5"
}'
```
## Managed Character IDs
LiteLLM automatically encodes provider and model metadata into character IDs:
**What happens:**
```
Upload character "Luna" with model "sora-2" on OpenAI
LiteLLM creates: char_abc123def456 (contains provider + model_id)
When you reference it later, LiteLLM decodes automatically
Router knows exactly which deployment to use
```
**Behind the scenes:**
- Character ID format: `character_<base64_encoded_metadata>`
- Metadata includes: provider, model_id, original_character_id
- Transparent to you - just use the ID, LiteLLM handles routing
@@ -135,6 +135,81 @@ curl --location --request POST 'http://localhost:4000/v1/videos/video_id/remix'
}'
```
### Character, Edit, and Extension Routes
OpenAI video routes supported by LiteLLM proxy:
- `POST /v1/videos/characters`
- `GET /v1/videos/characters/{character_id}`
- `POST /v1/videos/edits`
- `POST /v1/videos/extensions`
#### `target_model_names` support on character creation
`POST /v1/videos/characters` supports `target_model_names` for model-based routing (same behavior as video create).
```bash
curl --location 'http://localhost:4000/v1/videos/characters' \
--header 'Authorization: Bearer sk-1234' \
-F 'name=hero' \
-F 'target_model_names=gpt-4' \
-F 'video=@/path/to/character.mp4'
```
When `target_model_names` is used, LiteLLM returns an encoded character ID:
```json
{
"id": "character_...",
"object": "character",
"created_at": 1712697600,
"name": "hero"
}
```
Use that encoded ID directly on get:
```bash
curl --location 'http://localhost:4000/v1/videos/characters/character_...' \
--header 'Authorization: Bearer sk-1234'
```
#### Encoded and non-encoded video IDs for edit/extension
Both routes accept either plain or encoded `video.id`:
- `POST /v1/videos/edits`
- `POST /v1/videos/extensions`
```bash
curl --location 'http://localhost:4000/v1/videos/edits' \
--header 'Authorization: Bearer sk-1234' \
--header 'Content-Type: application/json' \
--data '{
"prompt": "Make this brighter",
"video": { "id": "video_..." }
}'
```
```bash
curl --location 'http://localhost:4000/v1/videos/extensions' \
--header 'Authorization: Bearer sk-1234' \
--header 'Content-Type: application/json' \
--data '{
"prompt": "Continue this scene",
"seconds": "4",
"video": { "id": "video_..." }
}'
```
#### `custom_llm_provider` input sources
For these routes, `custom_llm_provider` may be supplied via:
- header: `custom-llm-provider`
- query: `?custom_llm_provider=...`
- body: `custom_llm_provider` (and `extra_body.custom_llm_provider` where supported)
Test OpenAI video generation request
```bash
+76
View File
@@ -290,6 +290,82 @@ curl --location 'http://localhost:4000/v1/videos' \
--header 'custom-llm-provider: azure'
```
### Character, Edit, and Extension Endpoints
LiteLLM proxy also supports these OpenAI-compatible video routes:
- `POST /v1/videos/characters`
- `GET /v1/videos/characters/{character_id}`
- `POST /v1/videos/edits`
- `POST /v1/videos/extensions`
#### Routing Behavior (`target_model_names`, encoded IDs, and provider overrides)
- `POST /v1/videos/characters` supports `target_model_names` like `POST /v1/videos`.
- When `target_model_names` is provided on character creation, LiteLLM encodes the returned `character_id` with routing metadata.
- `GET /v1/videos/characters/{character_id}` accepts encoded character IDs directly. LiteLLM decodes the ID internally and routes with the correct model/provider metadata.
- `POST /v1/videos/edits` and `POST /v1/videos/extensions` support both:
- plain `video.id`
- encoded `video.id` values returned by LiteLLM
- `custom_llm_provider` can be supplied using the same patterns as other proxy endpoints:
- header: `custom-llm-provider`
- query: `?custom_llm_provider=...`
- body: `custom_llm_provider` (or `extra_body.custom_llm_provider` where applicable)
#### Character create with `target_model_names`
```bash
curl --location 'http://localhost:4000/v1/videos/characters' \
--header 'Authorization: Bearer sk-1234' \
-F 'name=hero' \
-F 'target_model_names=gpt-4' \
-F 'video=@/path/to/character.mp4'
```
Example response (encoded `id`):
```json
{
"id": "character_...",
"object": "character",
"created_at": 1712697600,
"name": "hero"
}
```
#### Get character using encoded `character_id`
```bash
curl --location 'http://localhost:4000/v1/videos/characters/character_...' \
--header 'Authorization: Bearer sk-1234'
```
#### Video edit with encoded `video.id`
```bash
curl --location 'http://localhost:4000/v1/videos/edits' \
--header 'Authorization: Bearer sk-1234' \
--header 'Content-Type: application/json' \
--data '{
"prompt": "Make this brighter",
"video": { "id": "video_..." }
}'
```
#### Video extension with provider override from `extra_body`
```bash
curl --location 'http://localhost:4000/v1/videos/extensions' \
--header 'Authorization: Bearer sk-1234' \
--header 'Content-Type: application/json' \
--data '{
"prompt": "Continue this scene",
"seconds": "4",
"video": { "id": "video_..." },
"extra_body": { "custom_llm_provider": "openai" }
}'
```
Test Azure video generation request
```bash
@@ -11,6 +11,7 @@ from litellm.types.videos.main import VideoCreateOptionalRequestParams
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as _LiteLLMLoggingObj
from litellm.types.videos.main import CharacterObject as _CharacterObject
from litellm.types.videos.main import VideoObject as _VideoObject
from ..chat.transformation import BaseLLMException as _BaseLLMException
@@ -18,10 +19,12 @@ if TYPE_CHECKING:
LiteLLMLoggingObj = _LiteLLMLoggingObj
BaseLLMException = _BaseLLMException
VideoObject = _VideoObject
CharacterObject = _CharacterObject
else:
LiteLLMLoggingObj = Any
BaseLLMException = Any
VideoObject = Any
CharacterObject = Any
class BaseVideoConfig(ABC):
@@ -265,6 +268,118 @@ class BaseVideoConfig(ABC):
) -> VideoObject:
pass
def transform_video_create_character_request(
self,
name: str,
video: Any,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
) -> Tuple[str, list]:
"""
Transform the video create character request into a URL and files list (multipart).
Returns:
Tuple[str, list]: (url, files_list) for the multipart POST request
"""
raise NotImplementedError(
"video create character is not supported for this provider"
)
def transform_video_create_character_response(
self,
raw_response: httpx.Response,
logging_obj: LiteLLMLoggingObj,
) -> CharacterObject:
raise NotImplementedError(
"video create character is not supported for this provider"
)
def transform_video_get_character_request(
self,
character_id: str,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
) -> Tuple[str, Dict]:
"""
Transform the video get character request into a URL and params.
Returns:
Tuple[str, Dict]: (url, params) for the GET request
"""
raise NotImplementedError(
"video get character is not supported for this provider"
)
def transform_video_get_character_response(
self,
raw_response: httpx.Response,
logging_obj: LiteLLMLoggingObj,
) -> CharacterObject:
raise NotImplementedError(
"video get character is not supported for this provider"
)
def transform_video_edit_request(
self,
prompt: str,
video_id: str,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
extra_body: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Dict]:
"""
Transform the video edit request into a URL and JSON data.
Returns:
Tuple[str, Dict]: (url, data) for the POST request
"""
raise NotImplementedError(
"video edit is not supported for this provider"
)
def transform_video_edit_response(
self,
raw_response: httpx.Response,
logging_obj: LiteLLMLoggingObj,
custom_llm_provider: Optional[str] = None,
) -> VideoObject:
raise NotImplementedError(
"video edit is not supported for this provider"
)
def transform_video_extension_request(
self,
prompt: str,
video_id: str,
seconds: str,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
extra_body: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Dict]:
"""
Transform the video extension request into a URL and JSON data.
Returns:
Tuple[str, Dict]: (url, data) for the POST request
"""
raise NotImplementedError(
"video extension is not supported for this provider"
)
def transform_video_extension_response(
self,
raw_response: httpx.Response,
logging_obj: LiteLLMLoggingObj,
custom_llm_provider: Optional[str] = None,
) -> VideoObject:
raise NotImplementedError(
"video extension is not supported for this provider"
)
def get_error_class(
self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers]
) -> BaseLLMException:
@@ -6113,6 +6113,614 @@ class BaseLLMHTTPHandler:
provider_config=video_remix_provider_config,
)
def video_create_character_handler(
self,
name: str,
video: Any,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
_is_async: bool = False,
client=None,
api_key: Optional[str] = None,
):
if _is_async:
return self.async_video_create_character_handler(
name=name,
video=video,
video_provider_config=video_provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=logging_obj,
extra_headers=extra_headers,
timeout=timeout,
client=client,
api_key=api_key,
)
if client is None or not isinstance(client, HTTPHandler):
sync_httpx_client = _get_httpx_client(
params={"ssl_verify": litellm_params.get("ssl_verify", None)}
)
else:
sync_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, files_list = video_provider_config.transform_video_create_character_request(
name=name,
video=video,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
)
logging_obj.pre_call(
input=name,
api_key="",
additional_args={
"complete_input_dict": {"name": name},
"api_base": url,
"headers": headers,
},
)
try:
response = sync_httpx_client.post(
url=url,
headers=headers,
files=files_list,
timeout=timeout,
)
response.raise_for_status()
return video_provider_config.transform_video_create_character_response(
raw_response=response,
logging_obj=logging_obj,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
async def async_video_create_character_handler(
self,
name: str,
video: Any,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
client=None,
api_key: Optional[str] = None,
):
if client is None or not isinstance(client, AsyncHTTPHandler):
async_httpx_client = get_async_httpx_client(
llm_provider=litellm.LlmProviders(custom_llm_provider),
params={"ssl_verify": litellm_params.get("ssl_verify", None)},
)
else:
async_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, files_list = video_provider_config.transform_video_create_character_request(
name=name,
video=video,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
)
logging_obj.pre_call(
input=name,
api_key="",
additional_args={
"complete_input_dict": {"name": name},
"api_base": url,
"headers": headers,
},
)
try:
response = await async_httpx_client.post(
url=url,
headers=headers,
files=files_list,
timeout=timeout,
)
response.raise_for_status()
return video_provider_config.transform_video_create_character_response(
raw_response=response,
logging_obj=logging_obj,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
def video_get_character_handler(
self,
character_id: str,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
_is_async: bool = False,
client=None,
api_key: Optional[str] = None,
):
if _is_async:
return self.async_video_get_character_handler(
character_id=character_id,
video_provider_config=video_provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=logging_obj,
extra_headers=extra_headers,
timeout=timeout,
client=client,
api_key=api_key,
)
if client is None or not isinstance(client, HTTPHandler):
sync_httpx_client = _get_httpx_client(
params={"ssl_verify": litellm_params.get("ssl_verify", None)}
)
else:
sync_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, params = video_provider_config.transform_video_get_character_request(
character_id=character_id,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
)
logging_obj.pre_call(
input=character_id,
api_key="",
additional_args={"api_base": url, "headers": headers},
)
try:
response = sync_httpx_client.get(
url=url,
headers=headers,
params=params
)
response.raise_for_status()
return video_provider_config.transform_video_get_character_response(
raw_response=response,
logging_obj=logging_obj,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
async def async_video_get_character_handler(
self,
character_id: str,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
client=None,
api_key: Optional[str] = None,
):
if client is None or not isinstance(client, AsyncHTTPHandler):
async_httpx_client = get_async_httpx_client(
llm_provider=litellm.LlmProviders(custom_llm_provider),
params={"ssl_verify": litellm_params.get("ssl_verify", None)},
)
else:
async_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, params = video_provider_config.transform_video_get_character_request(
character_id=character_id,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
)
logging_obj.pre_call(
input=character_id,
api_key="",
additional_args={"api_base": url, "headers": headers},
)
try:
response = await async_httpx_client.get(
url=url,
headers=headers,
params=params
)
response.raise_for_status()
return video_provider_config.transform_video_get_character_response(
raw_response=response,
logging_obj=logging_obj,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
def video_edit_handler(
self,
prompt: str,
video_id: str,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
_is_async: bool = False,
client=None,
api_key: Optional[str] = None,
):
if _is_async:
return self.async_video_edit_handler(
prompt=prompt,
video_id=video_id,
video_provider_config=video_provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=logging_obj,
extra_headers=extra_headers,
extra_body=extra_body,
timeout=timeout,
client=client,
api_key=api_key,
)
if client is None or not isinstance(client, HTTPHandler):
sync_httpx_client = _get_httpx_client(
params={"ssl_verify": litellm_params.get("ssl_verify", None)}
)
else:
sync_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, data = video_provider_config.transform_video_edit_request(
prompt=prompt,
video_id=video_id,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
extra_body=extra_body,
)
logging_obj.pre_call(
input=prompt,
api_key="",
additional_args={
"complete_input_dict": data,
"api_base": url,
"headers": headers,
"video_id": video_id,
},
)
try:
response = sync_httpx_client.post(
url=url,
headers=headers,
json=data,
timeout=timeout,
)
response.raise_for_status()
return video_provider_config.transform_video_edit_response(
raw_response=response,
logging_obj=logging_obj,
custom_llm_provider=custom_llm_provider,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
async def async_video_edit_handler(
self,
prompt: str,
video_id: str,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
client=None,
api_key: Optional[str] = None,
):
if client is None or not isinstance(client, AsyncHTTPHandler):
async_httpx_client = get_async_httpx_client(
llm_provider=litellm.LlmProviders(custom_llm_provider),
params={"ssl_verify": litellm_params.get("ssl_verify", None)},
)
else:
async_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, data = video_provider_config.transform_video_edit_request(
prompt=prompt,
video_id=video_id,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
extra_body=extra_body,
)
logging_obj.pre_call(
input=prompt,
api_key="",
additional_args={
"complete_input_dict": data,
"api_base": url,
"headers": headers,
"video_id": video_id,
},
)
try:
response = await async_httpx_client.post(
url=url,
headers=headers,
json=data,
timeout=timeout,
)
response.raise_for_status()
return video_provider_config.transform_video_edit_response(
raw_response=response,
logging_obj=logging_obj,
custom_llm_provider=custom_llm_provider,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
def video_extension_handler(
self,
prompt: str,
video_id: str,
seconds: str,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
_is_async: bool = False,
client=None,
api_key: Optional[str] = None,
):
if _is_async:
return self.async_video_extension_handler(
prompt=prompt,
video_id=video_id,
seconds=seconds,
video_provider_config=video_provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=logging_obj,
extra_headers=extra_headers,
extra_body=extra_body,
timeout=timeout,
client=client,
api_key=api_key,
)
if client is None or not isinstance(client, HTTPHandler):
sync_httpx_client = _get_httpx_client(
params={"ssl_verify": litellm_params.get("ssl_verify", None)}
)
else:
sync_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, data = video_provider_config.transform_video_extension_request(
prompt=prompt,
video_id=video_id,
seconds=seconds,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
extra_body=extra_body,
)
logging_obj.pre_call(
input=prompt,
api_key="",
additional_args={
"complete_input_dict": data,
"api_base": url,
"headers": headers,
"video_id": video_id,
},
)
try:
response = sync_httpx_client.post(
url=url,
headers=headers,
json=data,
timeout=timeout,
)
response.raise_for_status()
return video_provider_config.transform_video_extension_response(
raw_response=response,
logging_obj=logging_obj,
custom_llm_provider=custom_llm_provider,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
async def async_video_extension_handler(
self,
prompt: str,
video_id: str,
seconds: str,
video_provider_config: BaseVideoConfig,
custom_llm_provider: str,
litellm_params,
logging_obj,
extra_headers: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
client=None,
api_key: Optional[str] = None,
):
if client is None or not isinstance(client, AsyncHTTPHandler):
async_httpx_client = get_async_httpx_client(
llm_provider=litellm.LlmProviders(custom_llm_provider),
params={"ssl_verify": litellm_params.get("ssl_verify", None)},
)
else:
async_httpx_client = client
headers = video_provider_config.validate_environment(
api_key=api_key or litellm_params.get("api_key", None),
headers=extra_headers or {},
model="",
)
if extra_headers:
headers.update(extra_headers)
api_base = video_provider_config.get_complete_url(
model="",
api_base=litellm_params.get("api_base", None),
litellm_params=dict(litellm_params),
)
url, data = video_provider_config.transform_video_extension_request(
prompt=prompt,
video_id=video_id,
seconds=seconds,
api_base=api_base,
litellm_params=litellm_params,
headers=headers,
extra_body=extra_body,
)
logging_obj.pre_call(
input=prompt,
api_key="",
additional_args={
"complete_input_dict": data,
"api_base": url,
"headers": headers,
"video_id": video_id,
},
)
try:
response = await async_httpx_client.post(
url=url,
headers=headers,
json=data,
timeout=timeout,
)
response.raise_for_status()
return video_provider_config.transform_video_extension_response(
raw_response=response,
logging_obj=logging_obj,
custom_llm_provider=custom_llm_provider,
)
except Exception as e:
raise self._handle_error(e=e, provider_config=video_provider_config)
def video_list_handler(
self,
after: Optional[str],
+36 -11
View File
@@ -1,29 +1,30 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
import base64
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
import httpx
from httpx._types import RequestFiles
from litellm.types.videos.main import VideoCreateOptionalRequestParams, VideoObject
from litellm.types.router import GenericLiteLLMParams
from litellm.secret_managers.main import get_secret_str
from litellm.types.videos.utils import (
encode_video_id_with_provider,
extract_original_video_id,
)
from litellm.images.utils import ImageEditRequestUtils
import litellm
from litellm.constants import DEFAULT_GOOGLE_VIDEO_DURATION_SECONDS
from litellm.images.utils import ImageEditRequestUtils
from litellm.llms.base_llm.videos.transformation import BaseVideoConfig
from litellm.secret_managers.main import get_secret_str
from litellm.types.llms.gemini import (
GeminiLongRunningOperationResponse,
GeminiVideoGenerationInstance,
GeminiVideoGenerationParameters,
GeminiVideoGenerationRequest,
)
from litellm.constants import DEFAULT_GOOGLE_VIDEO_DURATION_SECONDS
from litellm.llms.base_llm.videos.transformation import BaseVideoConfig
from litellm.types.router import GenericLiteLLMParams
from litellm.types.videos.main import VideoCreateOptionalRequestParams, VideoObject
from litellm.types.videos.utils import (
encode_video_id_with_provider,
extract_original_video_id,
)
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as _LiteLLMLoggingObj
from ...base_llm.chat.transformation import BaseLLMException as _BaseLLMException
LiteLLMLoggingObj = _LiteLLMLoggingObj
@@ -524,6 +525,30 @@ class GeminiVideoConfig(BaseVideoConfig):
"""Video delete is not supported."""
raise NotImplementedError("Video delete is not supported by Google Veo.")
def transform_video_create_character_request(self, name, video, api_base, litellm_params, headers):
raise NotImplementedError("video create character is not supported for Gemini")
def transform_video_create_character_response(self, raw_response, logging_obj):
raise NotImplementedError("video create character is not supported for Gemini")
def transform_video_get_character_request(self, character_id, api_base, litellm_params, headers):
raise NotImplementedError("video get character is not supported for Gemini")
def transform_video_get_character_response(self, raw_response, logging_obj):
raise NotImplementedError("video get character is not supported for Gemini")
def transform_video_edit_request(self, prompt, video_id, api_base, litellm_params, headers, extra_body=None):
raise NotImplementedError("video edit is not supported for Gemini")
def transform_video_edit_response(self, raw_response, logging_obj, custom_llm_provider=None):
raise NotImplementedError("video edit is not supported for Gemini")
def transform_video_extension_request(self, prompt, video_id, seconds, api_base, litellm_params, headers, extra_body=None):
raise NotImplementedError("video extension is not supported for Gemini")
def transform_video_extension_response(self, raw_response, logging_obj, custom_llm_provider=None):
raise NotImplementedError("video extension is not supported for Gemini")
def get_error_class(
self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers]
) -> BaseLLMException:
+185 -2
View File
@@ -1,4 +1,5 @@
from io import BufferedReader
import mimetypes
from io import BufferedReader, BytesIO
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
import httpx
@@ -10,9 +11,14 @@ from litellm.llms.openai.image_edit.transformation import ImageEditRequestUtils
from litellm.secret_managers.main import get_secret_str
from litellm.types.llms.openai import CreateVideoRequest
from litellm.types.router import GenericLiteLLMParams
from litellm.types.videos.main import VideoCreateOptionalRequestParams, VideoObject
from litellm.types.videos.main import (
CharacterObject,
VideoCreateOptionalRequestParams,
VideoObject,
)
from litellm.types.videos.utils import (
encode_video_id_with_provider,
extract_original_character_id,
extract_original_video_id,
)
@@ -46,6 +52,7 @@ class OpenAIVideoConfig(BaseVideoConfig):
"input_reference",
"seconds",
"size",
"characters",
"user",
"extra_headers",
]
@@ -121,6 +128,7 @@ class OpenAIVideoConfig(BaseVideoConfig):
model=model, prompt=prompt, **video_create_optional_request_params
)
request_dict = cast(Dict, video_create_request)
request_dict = self._decode_character_ids_in_create_video_request(request_dict)
# Handle input_reference parameter if provided
_input_reference = video_create_optional_request_params.get("input_reference")
@@ -138,6 +146,35 @@ class OpenAIVideoConfig(BaseVideoConfig):
)
return data_without_files, files_list, api_base
def _decode_character_ids_in_create_video_request(self, request_dict: Dict) -> Dict:
"""
Decode LiteLLM-managed encoded character ids for provider requests.
OpenAI expects character ids like `char_...`. If a caller sends
`character_<base64-encoded-provider-payload>`, convert it back to the
original provider id before forwarding upstream.
"""
raw_characters = request_dict.get("characters")
if not isinstance(raw_characters, list):
return request_dict
decoded_characters: List[Any] = []
for character in raw_characters:
if not isinstance(character, dict):
decoded_characters.append(character)
continue
character_id = character.get("id")
if isinstance(character_id, str):
decoded_character = dict(character)
decoded_character["id"] = extract_original_character_id(character_id)
decoded_characters.append(decoded_character)
else:
decoded_characters.append(character)
request_dict["characters"] = decoded_characters
return request_dict
def transform_video_create_response(
self,
model: str,
@@ -430,6 +467,106 @@ class OpenAIVideoConfig(BaseVideoConfig):
headers=headers,
)
def transform_video_create_character_request(
self,
name: str,
video: Any,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
) -> Tuple[str, list]:
url = f"{api_base.rstrip('/')}/characters"
files_list: List[Tuple[str, Any]] = [("name", (None, name))]
self._add_video_to_files(files_list, video, "video")
return url, files_list
def transform_video_create_character_response(
self,
raw_response: httpx.Response,
logging_obj: Any,
) -> CharacterObject:
return CharacterObject(**raw_response.json())
def transform_video_get_character_request(
self,
character_id: str,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
) -> Tuple[str, Dict]:
url = f"{api_base.rstrip('/')}/characters/{character_id}"
return url, {}
def transform_video_get_character_response(
self,
raw_response: httpx.Response,
logging_obj: Any,
) -> CharacterObject:
return CharacterObject(**raw_response.json())
def transform_video_edit_request(
self,
prompt: str,
video_id: str,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
extra_body: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Dict]:
original_video_id = extract_original_video_id(video_id)
url = f"{api_base.rstrip('/')}/edits"
data: Dict[str, Any] = {"prompt": prompt, "video": {"id": original_video_id}}
if extra_body:
data.update(extra_body)
return url, data
def transform_video_edit_response(
self,
raw_response: httpx.Response,
logging_obj: Any,
custom_llm_provider: Optional[str] = None,
) -> VideoObject:
video_obj = VideoObject(**raw_response.json())
if custom_llm_provider and video_obj.id:
video_obj.id = encode_video_id_with_provider(
video_obj.id, custom_llm_provider, None
)
return video_obj
def transform_video_extension_request(
self,
prompt: str,
video_id: str,
seconds: str,
api_base: str,
litellm_params: GenericLiteLLMParams,
headers: dict,
extra_body: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Dict]:
original_video_id = extract_original_video_id(video_id)
url = f"{api_base.rstrip('/')}/extensions"
data: Dict[str, Any] = {
"prompt": prompt,
"seconds": seconds,
"video": {"id": original_video_id},
}
if extra_body:
data.update(extra_body)
return url, data
def transform_video_extension_response(
self,
raw_response: httpx.Response,
logging_obj: Any,
custom_llm_provider: Optional[str] = None,
) -> VideoObject:
video_obj = VideoObject(**raw_response.json())
if custom_llm_provider and video_obj.id:
video_obj.id = encode_video_id_with_provider(
video_obj.id, custom_llm_provider, None
)
return video_obj
def _add_image_to_files(
self,
files_list: List[Tuple[str, Any]],
@@ -445,3 +582,49 @@ class OpenAIVideoConfig(BaseVideoConfig):
files_list.append(
(field_name, ("input_reference.png", image, image_content_type))
)
def _add_video_to_files(
self,
files_list: List[Tuple[str, Any]],
video: Any,
field_name: str,
) -> None:
"""
Add a video to files with proper video MIME type detection.
This path is used by POST /videos/characters and must send video/mp4,
not image/* content types.
"""
filename = getattr(video, "name", None) or "input_video.mp4"
content_type = self._get_video_content_type(video=video, filename=filename)
files_list.append((field_name, (filename, video, content_type)))
def _get_video_content_type(self, video: Any, filename: str) -> str:
guessed_content_type, _ = mimetypes.guess_type(filename)
if guessed_content_type and guessed_content_type.startswith("video/"):
return guessed_content_type
# Fast-path detection for common MP4 signatures when filename is missing/incorrect.
try:
header_bytes = b""
if isinstance(video, BytesIO):
current_pos = video.tell()
video.seek(0)
header_bytes = video.read(64)
video.seek(current_pos)
elif isinstance(video, BufferedReader):
current_pos = video.tell()
video.seek(0)
header_bytes = video.read(64)
video.seek(current_pos)
elif isinstance(video, bytes):
header_bytes = video[:64]
# MP4 typically includes ftyp in first box.
if b"ftyp" in header_bytes:
return "video/mp4"
except Exception:
pass
# OpenAI create-character currently supports mp4.
return "video/mp4"
@@ -592,6 +592,30 @@ class RunwayMLVideoConfig(BaseVideoConfig):
return video_obj
def transform_video_create_character_request(self, name, video, api_base, litellm_params, headers):
raise NotImplementedError("video create character is not supported for RunwayML")
def transform_video_create_character_response(self, raw_response, logging_obj):
raise NotImplementedError("video create character is not supported for RunwayML")
def transform_video_get_character_request(self, character_id, api_base, litellm_params, headers):
raise NotImplementedError("video get character is not supported for RunwayML")
def transform_video_get_character_response(self, raw_response, logging_obj):
raise NotImplementedError("video get character is not supported for RunwayML")
def transform_video_edit_request(self, prompt, video_id, api_base, litellm_params, headers, extra_body=None):
raise NotImplementedError("video edit is not supported for RunwayML")
def transform_video_edit_response(self, raw_response, logging_obj, custom_llm_provider=None):
raise NotImplementedError("video edit is not supported for RunwayML")
def transform_video_extension_request(self, prompt, video_id, seconds, api_base, litellm_params, headers, extra_body=None):
raise NotImplementedError("video extension is not supported for RunwayML")
def transform_video_extension_response(self, raw_response, logging_obj, custom_llm_provider=None):
raise NotImplementedError("video extension is not supported for RunwayML")
def get_error_class(
self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers]
) -> BaseLLMException:
@@ -624,6 +624,30 @@ class VertexAIVideoConfig(BaseVideoConfig, VertexBase):
"""Video delete is not supported."""
raise NotImplementedError("Video delete is not supported by Vertex AI Veo.")
def transform_video_create_character_request(self, name, video, api_base, litellm_params, headers):
raise NotImplementedError("video create character is not supported for Vertex AI")
def transform_video_create_character_response(self, raw_response, logging_obj):
raise NotImplementedError("video create character is not supported for Vertex AI")
def transform_video_get_character_request(self, character_id, api_base, litellm_params, headers):
raise NotImplementedError("video get character is not supported for Vertex AI")
def transform_video_get_character_response(self, raw_response, logging_obj):
raise NotImplementedError("video get character is not supported for Vertex AI")
def transform_video_edit_request(self, prompt, video_id, api_base, litellm_params, headers, extra_body=None):
raise NotImplementedError("video edit is not supported for Vertex AI")
def transform_video_edit_response(self, raw_response, logging_obj, custom_llm_provider=None):
raise NotImplementedError("video edit is not supported for Vertex AI")
def transform_video_extension_request(self, prompt, video_id, seconds, api_base, litellm_params, headers, extra_body=None):
raise NotImplementedError("video extension is not supported for Vertex AI")
def transform_video_extension_response(self, raw_response, logging_obj, custom_llm_provider=None):
raise NotImplementedError("video extension is not supported for Vertex AI")
def get_error_class(
self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers]
) -> BaseLLMException:
@@ -599,6 +599,10 @@ class ProxyBaseLLMRequestProcessing:
"avideo_status",
"avideo_content",
"avideo_remix",
"avideo_create_character",
"avideo_get_character",
"avideo_edit",
"avideo_extension",
"acreate_container",
"alist_containers",
"aingest",
@@ -850,6 +854,10 @@ class ProxyBaseLLMRequestProcessing:
"avideo_status",
"avideo_content",
"avideo_remix",
"avideo_create_character",
"avideo_get_character",
"avideo_edit",
"avideo_extension",
"acreate_container",
"alist_containers",
"aingest",
+18 -1
View File
@@ -54,6 +54,10 @@ ROUTE_ENDPOINT_MAPPING = {
"avideo_status": "/videos/{video_id}",
"avideo_content": "/videos/{video_id}/content",
"avideo_remix": "/videos/{video_id}/remix",
"avideo_create_character": "/videos/characters",
"avideo_get_character": "/videos/characters/{character_id}",
"avideo_edit": "/videos/edits",
"avideo_extension": "/videos/extensions",
"acreate_realtime_client_secret": "/realtime/client_secrets",
"arealtime_calls": "/realtime/calls",
"acreate_container": "/containers",
@@ -201,6 +205,10 @@ async def route_request( # noqa: PLR0915 - Complex routing function, refactorin
"avideo_status",
"avideo_content",
"avideo_remix",
"avideo_create_character",
"avideo_get_character",
"avideo_edit",
"avideo_extension",
"acreate_container",
"alist_containers",
"aretrieve_container",
@@ -370,6 +378,10 @@ async def route_request( # noqa: PLR0915 - Complex routing function, refactorin
"avideo_status",
"avideo_content",
"avideo_remix",
"avideo_create_character",
"avideo_get_character",
"avideo_edit",
"avideo_extension",
"avector_store_file_list",
"avector_store_file_retrieve",
"avector_store_file_content",
@@ -449,8 +461,13 @@ async def route_request( # noqa: PLR0915 - Complex routing function, refactorin
"avideo_status",
"avideo_content",
"avideo_remix",
"avideo_create_character",
"avideo_get_character",
"avideo_edit",
"avideo_extension",
]:
# Video endpoints: If model is provided (e.g., from decoded video_id), try router first
# Video endpoints: If model is provided (e.g., from decoded video_id or target_model_names),
# try router first to allow for multi-deployment load balancing
try:
return getattr(llm_router, f"{route_type}")(**data)
except Exception:
+431 -2
View File
@@ -3,7 +3,7 @@
from typing import Any, Dict, Optional
import orjson
from fastapi import APIRouter, Depends, File, Request, Response, UploadFile
from fastapi import APIRouter, Depends, File, Form, Request, Response, UploadFile
from fastapi.responses import ORJSONResponse
from litellm.proxy._types import *
@@ -16,7 +16,15 @@ from litellm.proxy.common_utils.openai_endpoint_utils import (
get_custom_llm_provider_from_request_query,
)
from litellm.proxy.image_endpoints.endpoints import batch_to_bytesio
from litellm.types.videos.utils import decode_video_id_with_provider
from litellm.proxy.video_endpoints.utils import (
encode_character_id_in_response,
extract_model_from_target_model_names,
get_custom_provider_from_data,
)
from litellm.types.videos.utils import (
decode_character_id_with_provider,
decode_video_id_with_provider,
)
router = APIRouter()
@@ -504,3 +512,424 @@ async def video_remix(
proxy_logging_obj=proxy_logging_obj,
version=version,
)
@router.post(
"/v1/videos/characters",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
@router.post(
"/videos/characters",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
async def video_create_character(
request: Request,
fastapi_response: Response,
video: UploadFile = File(...),
name: str = Form(...),
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Create a character from an uploaded video file.
Follows the OpenAI Videos API spec:
https://platform.openai.com/docs/api-reference/videos/create-character
Example:
```bash
curl -X POST "http://localhost:4000/v1/videos/characters" \
-H "Authorization: Bearer sk-1234" \
-F "video=@character_video.mp4" \
-F "name=my_character"
```
"""
from litellm.proxy.proxy_server import (
general_settings,
llm_router,
proxy_config,
proxy_logging_obj,
select_data_generator,
user_api_base,
user_max_tokens,
user_model,
user_request_timeout,
user_temperature,
version,
)
data = await _read_request_body(request=request)
video_file = await batch_to_bytesio([video])
if video_file:
data["video"] = video_file[0]
target_model_name = extract_model_from_target_model_names(
data.get("target_model_names")
)
if target_model_name and not data.get("model"):
data["model"] = target_model_name
custom_llm_provider = (
get_custom_llm_provider_from_request_headers(request=request)
or get_custom_llm_provider_from_request_query(request=request)
or get_custom_provider_from_data(data=data)
or "openai"
)
data["custom_llm_provider"] = custom_llm_provider
processor = ProxyBaseLLMRequestProcessing(data=data)
try:
response = await processor.base_process_llm_request(
request=request,
fastapi_response=fastapi_response,
user_api_key_dict=user_api_key_dict,
route_type="avideo_create_character",
proxy_logging_obj=proxy_logging_obj,
llm_router=llm_router,
general_settings=general_settings,
proxy_config=proxy_config,
select_data_generator=select_data_generator,
model=None,
user_model=user_model,
user_temperature=user_temperature,
user_request_timeout=user_request_timeout,
user_max_tokens=user_max_tokens,
user_api_base=user_api_base,
version=version,
)
if target_model_name:
hidden_params = getattr(response, "_hidden_params", {}) or {}
provider_for_encoding = (
hidden_params.get("custom_llm_provider")
or custom_llm_provider
or "openai"
)
model_id_for_encoding = hidden_params.get("model_id") or data.get("model")
response = encode_character_id_in_response(
response=response,
custom_llm_provider=provider_for_encoding,
model_id=model_id_for_encoding,
)
return response
except Exception as e:
raise await processor._handle_llm_api_exception(
e=e,
user_api_key_dict=user_api_key_dict,
proxy_logging_obj=proxy_logging_obj,
version=version,
)
@router.get(
"/v1/videos/characters/{character_id}",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
@router.get(
"/videos/characters/{character_id}",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
async def video_get_character(
character_id: str,
request: Request,
fastapi_response: Response,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Retrieve a character by ID.
Follows the OpenAI Videos API spec:
https://platform.openai.com/docs/api-reference/videos/get-character
Example:
```bash
curl -X GET "http://localhost:4000/v1/videos/characters/char_123" \
-H "Authorization: Bearer sk-1234"
```
"""
from litellm.proxy.proxy_server import (
general_settings,
llm_router,
proxy_config,
proxy_logging_obj,
select_data_generator,
user_api_base,
user_max_tokens,
user_model,
user_request_timeout,
user_temperature,
version,
)
original_requested_character_id = character_id
data: Dict[str, Any] = {"character_id": character_id}
decoded = decode_character_id_with_provider(character_id)
provider_from_id = decoded.get("custom_llm_provider")
model_id_from_decoded = decoded.get("model_id")
decoded_character_id = decoded.get("character_id")
if decoded_character_id:
data["character_id"] = decoded_character_id
custom_llm_provider = (
get_custom_llm_provider_from_request_headers(request=request)
or get_custom_llm_provider_from_request_query(request=request)
or await get_custom_llm_provider_from_request_body(request=request)
or provider_from_id
or "openai"
)
data["custom_llm_provider"] = custom_llm_provider
if model_id_from_decoded and llm_router:
resolved_model = llm_router.resolve_model_name_from_model_id(
model_id_from_decoded
)
if resolved_model:
data["model"] = resolved_model
processor = ProxyBaseLLMRequestProcessing(data=data)
try:
response = await processor.base_process_llm_request(
request=request,
fastapi_response=fastapi_response,
user_api_key_dict=user_api_key_dict,
route_type="avideo_get_character",
proxy_logging_obj=proxy_logging_obj,
llm_router=llm_router,
general_settings=general_settings,
proxy_config=proxy_config,
select_data_generator=select_data_generator,
model=None,
user_model=user_model,
user_temperature=user_temperature,
user_request_timeout=user_request_timeout,
user_max_tokens=user_max_tokens,
user_api_base=user_api_base,
version=version,
)
if original_requested_character_id.startswith("character_"):
provider_for_encoding = provider_from_id or custom_llm_provider or "openai"
model_id_for_encoding = model_id_from_decoded
response = encode_character_id_in_response(
response=response,
custom_llm_provider=provider_for_encoding,
model_id=model_id_for_encoding,
)
return response
except Exception as e:
raise await processor._handle_llm_api_exception(
e=e,
user_api_key_dict=user_api_key_dict,
proxy_logging_obj=proxy_logging_obj,
version=version,
)
@router.post(
"/v1/videos/edits",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
@router.post(
"/videos/edits",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
async def video_edit(
request: Request,
fastapi_response: Response,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Create a video edit job.
Follows the OpenAI Videos API spec:
https://platform.openai.com/docs/api-reference/videos/create-edit
Example:
```bash
curl -X POST "http://localhost:4000/v1/videos/edits" \
-H "Authorization: Bearer sk-1234" \
-H "Content-Type: application/json" \
-d '{"prompt": "Make it brighter", "video": {"id": "video_123"}}'
```
"""
from litellm.proxy.proxy_server import (
general_settings,
llm_router,
proxy_config,
proxy_logging_obj,
select_data_generator,
user_api_base,
user_max_tokens,
user_model,
user_request_timeout,
user_temperature,
version,
)
body = await request.body()
data = orjson.loads(body)
# Extract video_id from nested video object
video_ref = data.pop("video", {})
video_id = video_ref.get("id", "") if isinstance(video_ref, dict) else ""
data["video_id"] = video_id
decoded = decode_video_id_with_provider(video_id)
provider_from_id = decoded.get("custom_llm_provider")
model_id_from_decoded = decoded.get("model_id")
custom_llm_provider = (
get_custom_llm_provider_from_request_headers(request=request)
or get_custom_llm_provider_from_request_query(request=request)
or get_custom_provider_from_data(data=data)
or provider_from_id
or "openai"
)
data["custom_llm_provider"] = custom_llm_provider
if model_id_from_decoded and llm_router:
resolved_model = llm_router.resolve_model_name_from_model_id(
model_id_from_decoded
)
if resolved_model:
data["model"] = resolved_model
processor = ProxyBaseLLMRequestProcessing(data=data)
try:
return await processor.base_process_llm_request(
request=request,
fastapi_response=fastapi_response,
user_api_key_dict=user_api_key_dict,
route_type="avideo_edit",
proxy_logging_obj=proxy_logging_obj,
llm_router=llm_router,
general_settings=general_settings,
proxy_config=proxy_config,
select_data_generator=select_data_generator,
model=None,
user_model=user_model,
user_temperature=user_temperature,
user_request_timeout=user_request_timeout,
user_max_tokens=user_max_tokens,
user_api_base=user_api_base,
version=version,
)
except Exception as e:
raise await processor._handle_llm_api_exception(
e=e,
user_api_key_dict=user_api_key_dict,
proxy_logging_obj=proxy_logging_obj,
version=version,
)
@router.post(
"/v1/videos/extensions",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
@router.post(
"/videos/extensions",
dependencies=[Depends(user_api_key_auth)],
response_class=ORJSONResponse,
tags=["videos"],
)
async def video_extension(
request: Request,
fastapi_response: Response,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Create a video extension.
Follows the OpenAI Videos API spec:
https://platform.openai.com/docs/api-reference/videos/create-extension
Example:
```bash
curl -X POST "http://localhost:4000/v1/videos/extensions" \
-H "Authorization: Bearer sk-1234" \
-H "Content-Type: application/json" \
-d '{"prompt": "Continue the scene", "seconds": "5", "video": {"id": "video_123"}}'
```
"""
from litellm.proxy.proxy_server import (
general_settings,
llm_router,
proxy_config,
proxy_logging_obj,
select_data_generator,
user_api_base,
user_max_tokens,
user_model,
user_request_timeout,
user_temperature,
version,
)
body = await request.body()
data = orjson.loads(body)
# Extract video_id from nested video object
video_ref = data.pop("video", {})
video_id = video_ref.get("id", "") if isinstance(video_ref, dict) else ""
data["video_id"] = video_id
decoded = decode_video_id_with_provider(video_id)
provider_from_id = decoded.get("custom_llm_provider")
model_id_from_decoded = decoded.get("model_id")
custom_llm_provider = (
get_custom_llm_provider_from_request_headers(request=request)
or get_custom_llm_provider_from_request_query(request=request)
or get_custom_provider_from_data(data=data)
or provider_from_id
or "openai"
)
data["custom_llm_provider"] = custom_llm_provider
if model_id_from_decoded and llm_router:
resolved_model = llm_router.resolve_model_name_from_model_id(
model_id_from_decoded
)
if resolved_model:
data["model"] = resolved_model
processor = ProxyBaseLLMRequestProcessing(data=data)
try:
return await processor.base_process_llm_request(
request=request,
fastapi_response=fastapi_response,
user_api_key_dict=user_api_key_dict,
route_type="avideo_extension",
proxy_logging_obj=proxy_logging_obj,
llm_router=llm_router,
general_settings=general_settings,
proxy_config=proxy_config,
select_data_generator=select_data_generator,
model=None,
user_model=user_model,
user_temperature=user_temperature,
user_request_timeout=user_request_timeout,
user_max_tokens=user_max_tokens,
user_api_base=user_api_base,
version=version,
)
except Exception as e:
raise await processor._handle_llm_api_exception(
e=e,
user_api_key_dict=user_api_key_dict,
proxy_logging_obj=proxy_logging_obj,
version=version,
)
+56
View File
@@ -0,0 +1,56 @@
from typing import Any, Dict, Optional
import orjson
from litellm.types.videos.utils import encode_character_id_with_provider
def extract_model_from_target_model_names(target_model_names: Any) -> Optional[str]:
if isinstance(target_model_names, str):
target_model_names = [m.strip() for m in target_model_names.split(",") if m.strip()]
elif not isinstance(target_model_names, list):
return None
return target_model_names[0] if target_model_names else None
def get_custom_provider_from_data(data: Dict[str, Any]) -> Optional[str]:
custom_llm_provider = data.get("custom_llm_provider")
if custom_llm_provider:
return custom_llm_provider
extra_body = data.get("extra_body")
if isinstance(extra_body, str):
try:
parsed_extra_body = orjson.loads(extra_body)
if isinstance(parsed_extra_body, dict):
extra_body = parsed_extra_body
except Exception:
extra_body = None
if isinstance(extra_body, dict):
extra_body_custom_llm_provider = extra_body.get("custom_llm_provider")
if isinstance(extra_body_custom_llm_provider, str):
return extra_body_custom_llm_provider
return None
def encode_character_id_in_response(
response: Any, custom_llm_provider: str, model_id: Optional[str]
) -> Any:
if isinstance(response, dict) and response.get("id"):
response["id"] = encode_character_id_with_provider(
character_id=response["id"],
provider=custom_llm_provider,
model_id=model_id,
)
return response
character_id = getattr(response, "id", None)
if isinstance(character_id, str) and character_id:
response.id = encode_character_id_with_provider(
character_id=character_id,
provider=custom_llm_provider,
model_id=model_id,
)
return response
+40
View File
@@ -1076,12 +1076,20 @@ class Router:
"""Initialize video endpoints."""
from litellm.videos import (
avideo_content,
avideo_create_character,
avideo_edit,
avideo_extension,
avideo_generation,
avideo_get_character,
avideo_list,
avideo_remix,
avideo_status,
video_content,
video_create_character,
video_edit,
video_extension,
video_generation,
video_get_character,
video_list,
video_remix,
video_status,
@@ -1111,6 +1119,26 @@ class Router:
avideo_remix, call_type="avideo_remix"
)
self.video_remix = self.factory_function(video_remix, call_type="video_remix")
self.avideo_create_character = self.factory_function(
avideo_create_character, call_type="avideo_create_character"
)
self.video_create_character = self.factory_function(
video_create_character, call_type="video_create_character"
)
self.avideo_get_character = self.factory_function(
avideo_get_character, call_type="avideo_get_character"
)
self.video_get_character = self.factory_function(
video_get_character, call_type="video_get_character"
)
self.avideo_edit = self.factory_function(avideo_edit, call_type="avideo_edit")
self.video_edit = self.factory_function(video_edit, call_type="video_edit")
self.avideo_extension = self.factory_function(
avideo_extension, call_type="avideo_extension"
)
self.video_extension = self.factory_function(
video_extension, call_type="video_extension"
)
def _initialize_container_endpoints(self):
"""Initialize container endpoints."""
@@ -4828,6 +4856,14 @@ class Router:
"video_content",
"avideo_remix",
"video_remix",
"avideo_create_character",
"video_create_character",
"avideo_get_character",
"video_get_character",
"avideo_edit",
"video_edit",
"avideo_extension",
"video_extension",
"acreate_container",
"create_container",
"alist_containers",
@@ -4995,6 +5031,10 @@ class Router:
"avideo_status",
"avideo_content",
"avideo_remix",
"avideo_create_character",
"avideo_get_character",
"avideo_edit",
"avideo_extension",
"acreate_skill",
"alist_skills",
"aget_skill",
+2
View File
@@ -2187,6 +2187,7 @@ class CreateVideoRequest(TypedDict, total=False):
model: Optional[str] - The video generation model to use (defaults to sora-2)
seconds: Optional[str] - Clip duration in seconds (defaults to 4 seconds)
size: Optional[str] - Output resolution formatted as width x height (defaults to 720x1280)
characters: Optional[List[Dict[str, str]]] - Character references to include in generation
user: Optional[str] - A unique identifier representing your end-user
extra_headers: Optional[Dict[str, str]] - Additional headers
extra_body: Optional[Dict[str, str]] - Additional body parameters
@@ -2198,6 +2199,7 @@ class CreateVideoRequest(TypedDict, total=False):
model: Optional[str]
seconds: Optional[str]
size: Optional[str]
characters: Optional[List[Dict[str, str]]]
user: Optional[str]
extra_headers: Optional[Dict[str, str]]
extra_body: Optional[Dict[str, str]]
+28
View File
@@ -358,6 +358,14 @@ class CallTypes(str, Enum):
avideo_retrieve_job = "avideo_retrieve_job"
video_delete = "video_delete"
avideo_delete = "avideo_delete"
video_create_character = "video_create_character"
avideo_create_character = "avideo_create_character"
video_get_character = "video_get_character"
avideo_get_character = "avideo_get_character"
video_edit = "video_edit"
avideo_edit = "avideo_edit"
video_extension = "video_extension"
avideo_extension = "avideo_extension"
vector_store_file_create = "vector_store_file_create"
avector_store_file_create = "avector_store_file_create"
vector_store_file_list = "vector_store_file_list"
@@ -700,6 +708,26 @@ API_ROUTE_TO_CALL_TYPES = {
],
"/videos/{video_id}/remix": [CallTypes.avideo_remix, CallTypes.video_remix],
"/v1/videos/{video_id}/remix": [CallTypes.avideo_remix, CallTypes.video_remix],
"/videos/characters": [
CallTypes.avideo_create_character,
CallTypes.video_create_character,
],
"/v1/videos/characters": [
CallTypes.avideo_create_character,
CallTypes.video_create_character,
],
"/videos/characters/{character_id}": [
CallTypes.avideo_get_character,
CallTypes.video_get_character,
],
"/v1/videos/characters/{character_id}": [
CallTypes.avideo_get_character,
CallTypes.video_get_character,
],
"/videos/edits": [CallTypes.avideo_edit, CallTypes.video_edit],
"/v1/videos/edits": [CallTypes.avideo_edit, CallTypes.video_edit],
"/videos/extensions": [CallTypes.avideo_extension, CallTypes.video_extension],
"/v1/videos/extensions": [CallTypes.avideo_extension, CallTypes.video_extension],
# Vector Stores
"/vector_stores": [CallTypes.avector_store_create, CallTypes.vector_store_create],
"/v1/vector_stores": [
+42 -2
View File
@@ -1,10 +1,9 @@
from typing import Any, Dict, List, Literal, Optional
from openai.types.audio.transcription_create_params import FileTypes # type: ignore
from pydantic import BaseModel
from typing_extensions import TypedDict
from litellm.types.utils import FileTypes
class VideoObject(BaseModel):
"""Represents a generated video object."""
@@ -83,6 +82,7 @@ class VideoCreateOptionalRequestParams(TypedDict, total=False):
model: Optional[str]
seconds: Optional[str]
size: Optional[str]
characters: Optional[List[Dict[str, str]]]
user: Optional[str]
extra_headers: Optional[Dict[str, str]]
extra_body: Optional[Dict[str, str]]
@@ -104,3 +104,43 @@ class DecodedVideoId(TypedDict, total=False):
custom_llm_provider: Optional[str]
model_id: Optional[str]
video_id: str
class CharacterObject(BaseModel):
"""Represents a character created from a video."""
id: str
object: Literal["character"] = "character"
created_at: int
name: str
_hidden_params: Dict[str, Any] = {}
def __contains__(self, key):
return hasattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def __getitem__(self, key):
return getattr(self, key)
def json(self, **kwargs): # type: ignore
try:
return self.model_dump(**kwargs)
except Exception:
return self.dict()
class VideoEditRequestParams(TypedDict, total=False):
"""TypedDict for video edit request parameters."""
prompt: str
video: Dict[str, str] # {"id": "video_123"}
class VideoExtensionRequestParams(TypedDict, total=False):
"""TypedDict for video extension request parameters."""
prompt: str
seconds: str
video: Dict[str, str] # {"id": "video_123"}
+104
View File
@@ -12,6 +12,26 @@ from litellm.types.utils import SpecialEnums
from litellm.types.videos.main import DecodedVideoId
VIDEO_ID_PREFIX = "video_"
CHARACTER_ID_PREFIX = "character_"
CHARACTER_ID_TEMPLATE = "litellm:custom_llm_provider:{};model_id:{};character_id:{}"
class DecodedCharacterId(dict):
"""Structure representing a decoded character ID."""
custom_llm_provider: Optional[str]
model_id: Optional[str]
character_id: str
def _add_base64_padding(value: str) -> str:
"""
Add missing base64 padding when IDs are copied without trailing '=' chars.
"""
missing_padding = len(value) % 4
if missing_padding:
value += "=" * (4 - missing_padding)
return value
def encode_video_id_with_provider(
@@ -59,6 +79,7 @@ def decode_video_id_with_provider(encoded_video_id: str) -> DecodedVideoId:
try:
cleaned_id = encoded_video_id.replace(VIDEO_ID_PREFIX, "")
cleaned_id = _add_base64_padding(cleaned_id)
decoded_id = base64.b64decode(cleaned_id.encode("utf-8")).decode("utf-8")
if ";" not in decoded_id:
@@ -103,3 +124,86 @@ def extract_original_video_id(encoded_video_id: str) -> str:
"""Extract original video ID without encoding."""
decoded = decode_video_id_with_provider(encoded_video_id)
return decoded.get("video_id", encoded_video_id)
def encode_character_id_with_provider(
character_id: str, provider: str, model_id: Optional[str] = None
) -> str:
"""Encode provider and model_id into character_id using base64."""
if not provider or not character_id:
return character_id
decoded = decode_character_id_with_provider(character_id)
if decoded.get("custom_llm_provider") is not None:
return character_id
assembled_id = CHARACTER_ID_TEMPLATE.format(provider, model_id or "", character_id)
base64_encoded_id: str = base64.b64encode(assembled_id.encode("utf-8")).decode(
"utf-8"
)
return f"{CHARACTER_ID_PREFIX}{base64_encoded_id}"
def decode_character_id_with_provider(encoded_character_id: str) -> DecodedCharacterId:
"""Decode provider and model_id from encoded character_id."""
if not encoded_character_id:
return DecodedCharacterId(
custom_llm_provider=None,
model_id=None,
character_id=encoded_character_id,
)
if not encoded_character_id.startswith(CHARACTER_ID_PREFIX):
return DecodedCharacterId(
custom_llm_provider=None,
model_id=None,
character_id=encoded_character_id,
)
try:
cleaned_id = encoded_character_id.replace(CHARACTER_ID_PREFIX, "")
cleaned_id = _add_base64_padding(cleaned_id)
decoded_id = base64.b64decode(cleaned_id.encode("utf-8")).decode("utf-8")
if ";" not in decoded_id:
return DecodedCharacterId(
custom_llm_provider=None,
model_id=None,
character_id=encoded_character_id,
)
parts = decoded_id.split(";")
custom_llm_provider = None
model_id = None
decoded_character_id = encoded_character_id
if len(parts) >= 3:
custom_llm_provider_part = parts[0]
model_id_part = parts[1]
character_id_part = parts[2]
custom_llm_provider = custom_llm_provider_part.replace(
"litellm:custom_llm_provider:", ""
)
model_id = model_id_part.replace("model_id:", "")
decoded_character_id = character_id_part.replace("character_id:", "")
return DecodedCharacterId(
custom_llm_provider=custom_llm_provider,
model_id=model_id,
character_id=decoded_character_id,
)
except Exception as e:
verbose_logger.debug(f"Error decoding character_id '{encoded_character_id}': {e}")
return DecodedCharacterId(
custom_llm_provider=None,
model_id=None,
character_id=encoded_character_id,
)
def extract_original_character_id(encoded_character_id: str) -> str:
"""Extract original character ID without encoding."""
decoded = decode_character_id_with_provider(encoded_character_id)
return decoded.get("character_id", encoded_character_id)
+23 -7
View File
@@ -1,16 +1,24 @@
"""Video generation and management functions for LiteLLM."""
from .main import (
avideo_generation,
video_generation,
avideo_list,
video_list,
avideo_status,
video_status,
avideo_content,
video_content,
avideo_create_character,
avideo_edit,
avideo_extension,
avideo_generation,
avideo_get_character,
avideo_list,
avideo_remix,
avideo_status,
video_content,
video_create_character,
video_edit,
video_extension,
video_generation,
video_get_character,
video_list,
video_remix,
video_status,
)
__all__ = [
@@ -24,4 +32,12 @@ __all__ = [
"video_content",
"avideo_remix",
"video_remix",
"avideo_create_character",
"video_create_character",
"avideo_get_character",
"video_get_character",
"avideo_edit",
"video_edit",
"avideo_extension",
"video_extension",
]
+524 -1
View File
@@ -14,7 +14,11 @@ from litellm.llms.custom_httpx.llm_http_handler import BaseLLMHTTPHandler
from litellm.main import base_llm_http_handler
from litellm.types.router import GenericLiteLLMParams
from litellm.types.utils import CallTypes, FileTypes
from litellm.types.videos.main import VideoCreateOptionalRequestParams, VideoObject
from litellm.types.videos.main import (
CharacterObject,
VideoCreateOptionalRequestParams,
VideoObject,
)
from litellm.types.videos.utils import decode_video_id_with_provider
from litellm.utils import ProviderConfigManager, client
from litellm.videos.utils import VideoGenerationRequestUtils
@@ -1093,3 +1097,522 @@ def video_status( # noqa: PLR0915
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
async def avideo_create_character(
name: str,
video: Any,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> CharacterObject:
"""
Asynchronously create a character from an uploaded video file.
Maps to POST /v1/videos/characters
"""
local_vars = locals()
try:
loop = asyncio.get_event_loop()
kwargs["async_call"] = True
if custom_llm_provider is None:
custom_llm_provider = "openai"
func = partial(
video_create_character,
name=name,
video=video,
timeout=timeout,
custom_llm_provider=custom_llm_provider,
extra_headers=extra_headers,
extra_query=extra_query,
extra_body=extra_body,
**kwargs,
)
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response
return response
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
def video_create_character(
name: str,
video: Any,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Union[CharacterObject, Coroutine[Any, Any, CharacterObject]]:
"""
Create a character from an uploaded video file.
Maps to POST /v1/videos/characters
"""
local_vars = locals()
try:
litellm_logging_obj: LiteLLMLoggingObj = kwargs.pop("litellm_logging_obj") # type: ignore
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None)
_is_async = kwargs.pop("async_call", False) is True
mock_response = kwargs.get("mock_response", None)
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)
return CharacterObject(**mock_response)
if custom_llm_provider is None:
custom_llm_provider = "openai"
litellm_params = GenericLiteLLMParams(**kwargs)
provider_config: Optional[BaseVideoConfig] = ProviderConfigManager.get_provider_video_config(
model=None,
provider=litellm.LlmProviders(custom_llm_provider),
)
if provider_config is None:
raise ValueError(f"video create character is not supported for {custom_llm_provider}")
local_vars.update(kwargs)
request_params: Dict = {"name": name}
litellm_logging_obj.update_environment_variables(
model="",
user=kwargs.get("user"),
optional_params=dict(request_params),
litellm_params={"litellm_call_id": litellm_call_id, **request_params},
custom_llm_provider=custom_llm_provider,
)
litellm_logging_obj.call_type = CallTypes.video_create_character.value
return base_llm_http_handler.video_create_character_handler(
name=name,
video=video,
video_provider_config=provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=litellm_logging_obj,
extra_headers=extra_headers,
timeout=timeout or DEFAULT_REQUEST_TIMEOUT,
_is_async=_is_async,
client=kwargs.get("client"),
)
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
async def avideo_get_character(
character_id: str,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> CharacterObject:
"""
Asynchronously retrieve a character by ID.
Maps to GET /v1/videos/characters/{character_id}
"""
local_vars = locals()
try:
loop = asyncio.get_event_loop()
kwargs["async_call"] = True
func = partial(
video_get_character,
character_id=character_id,
timeout=timeout,
custom_llm_provider=custom_llm_provider,
extra_headers=extra_headers,
extra_query=extra_query,
extra_body=extra_body,
**kwargs,
)
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response
return response
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
def video_get_character(
character_id: str,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Union[CharacterObject, Coroutine[Any, Any, CharacterObject]]:
"""
Retrieve a character by ID.
Maps to GET /v1/videos/characters/{character_id}
"""
local_vars = locals()
try:
litellm_logging_obj: LiteLLMLoggingObj = kwargs.pop("litellm_logging_obj") # type: ignore
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None)
_is_async = kwargs.pop("async_call", False) is True
mock_response = kwargs.get("mock_response", None)
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)
return CharacterObject(**mock_response)
if custom_llm_provider is None:
custom_llm_provider = "openai"
litellm_params = GenericLiteLLMParams(**kwargs)
provider_config: Optional[BaseVideoConfig] = ProviderConfigManager.get_provider_video_config(
model=None,
provider=litellm.LlmProviders(custom_llm_provider),
)
if provider_config is None:
raise ValueError(f"video get character is not supported for {custom_llm_provider}")
local_vars.update(kwargs)
request_params: Dict = {"character_id": character_id}
litellm_logging_obj.update_environment_variables(
model="",
user=kwargs.get("user"),
optional_params=dict(request_params),
litellm_params={"litellm_call_id": litellm_call_id, **request_params},
custom_llm_provider=custom_llm_provider,
)
litellm_logging_obj.call_type = CallTypes.video_get_character.value
return base_llm_http_handler.video_get_character_handler(
character_id=character_id,
video_provider_config=provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=litellm_logging_obj,
extra_headers=extra_headers,
timeout=timeout or DEFAULT_REQUEST_TIMEOUT,
_is_async=_is_async,
client=kwargs.get("client"),
)
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
async def avideo_edit(
video_id: str,
prompt: str,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> VideoObject:
"""
Asynchronously create a video edit job.
Maps to POST /v1/videos/edits
"""
local_vars = locals()
try:
loop = asyncio.get_event_loop()
kwargs["async_call"] = True
func = partial(
video_edit,
video_id=video_id,
prompt=prompt,
timeout=timeout,
custom_llm_provider=custom_llm_provider,
extra_headers=extra_headers,
extra_query=extra_query,
extra_body=extra_body,
**kwargs,
)
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response
return response
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
def video_edit(
video_id: str,
prompt: str,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Union[VideoObject, Coroutine[Any, Any, VideoObject]]:
"""
Create a video edit job.
Maps to POST /v1/videos/edits
"""
local_vars = locals()
try:
litellm_logging_obj: LiteLLMLoggingObj = kwargs.pop("litellm_logging_obj") # type: ignore
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None)
_is_async = kwargs.pop("async_call", False) is True
mock_response = kwargs.get("mock_response", None)
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)
return VideoObject(**mock_response)
if custom_llm_provider is None:
decoded = decode_video_id_with_provider(video_id)
custom_llm_provider = decoded.get("custom_llm_provider") or "openai"
litellm_params = GenericLiteLLMParams(**kwargs)
provider_config: Optional[BaseVideoConfig] = ProviderConfigManager.get_provider_video_config(
model=None,
provider=litellm.LlmProviders(custom_llm_provider),
)
if provider_config is None:
raise ValueError(f"video edit is not supported for {custom_llm_provider}")
local_vars.update(kwargs)
request_params: Dict = {"video_id": video_id, "prompt": prompt}
litellm_logging_obj.update_environment_variables(
model="",
user=kwargs.get("user"),
optional_params=dict(request_params),
litellm_params={"litellm_call_id": litellm_call_id, **request_params},
custom_llm_provider=custom_llm_provider,
)
litellm_logging_obj.call_type = CallTypes.video_edit.value
return base_llm_http_handler.video_edit_handler(
prompt=prompt,
video_id=video_id,
video_provider_config=provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=litellm_logging_obj,
extra_headers=extra_headers,
extra_body=extra_body,
timeout=timeout or DEFAULT_REQUEST_TIMEOUT,
_is_async=_is_async,
client=kwargs.get("client"),
)
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
async def avideo_extension(
video_id: str,
prompt: str,
seconds: str,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> VideoObject:
"""
Asynchronously create a video extension.
Maps to POST /v1/videos/extensions
"""
local_vars = locals()
try:
loop = asyncio.get_event_loop()
kwargs["async_call"] = True
func = partial(
video_extension,
video_id=video_id,
prompt=prompt,
seconds=seconds,
timeout=timeout,
custom_llm_provider=custom_llm_provider,
extra_headers=extra_headers,
extra_query=extra_query,
extra_body=extra_body,
**kwargs,
)
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response
return response
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
def video_extension(
video_id: str,
prompt: str,
seconds: str,
timeout=600,
custom_llm_provider=None,
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Union[VideoObject, Coroutine[Any, Any, VideoObject]]:
"""
Create a video extension.
Maps to POST /v1/videos/extensions
"""
local_vars = locals()
try:
litellm_logging_obj: LiteLLMLoggingObj = kwargs.pop("litellm_logging_obj") # type: ignore
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None)
_is_async = kwargs.pop("async_call", False) is True
mock_response = kwargs.get("mock_response", None)
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)
return VideoObject(**mock_response)
if custom_llm_provider is None:
decoded = decode_video_id_with_provider(video_id)
custom_llm_provider = decoded.get("custom_llm_provider") or "openai"
litellm_params = GenericLiteLLMParams(**kwargs)
provider_config: Optional[BaseVideoConfig] = ProviderConfigManager.get_provider_video_config(
model=None,
provider=litellm.LlmProviders(custom_llm_provider),
)
if provider_config is None:
raise ValueError(f"video extension is not supported for {custom_llm_provider}")
local_vars.update(kwargs)
request_params: Dict = {"video_id": video_id, "prompt": prompt, "seconds": seconds}
litellm_logging_obj.update_environment_variables(
model="",
user=kwargs.get("user"),
optional_params=dict(request_params),
litellm_params={"litellm_call_id": litellm_call_id, **request_params},
custom_llm_provider=custom_llm_provider,
)
litellm_logging_obj.call_type = CallTypes.video_extension.value
return base_llm_http_handler.video_extension_handler(
prompt=prompt,
video_id=video_id,
seconds=seconds,
video_provider_config=provider_config,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=litellm_logging_obj,
extra_headers=extra_headers,
extra_body=extra_body,
timeout=timeout or DEFAULT_REQUEST_TIMEOUT,
_is_async=_is_async,
client=kwargs.get("client"),
)
except Exception as e:
raise litellm.exception_type(
model="",
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@@ -564,6 +564,10 @@ async def test_ensure_initialize_azure_sdk_client_always_used(call_type):
call_type == CallTypes.avideo_content
or call_type == CallTypes.avideo_list
or call_type == CallTypes.avideo_remix
or call_type == CallTypes.avideo_create_character
or call_type == CallTypes.avideo_get_character
or call_type == CallTypes.avideo_edit
or call_type == CallTypes.avideo_extension
):
# Skip video call types as they don't use Azure SDK client initialization
pytest.skip(f"Skipping {call_type.value} because Azure video calls don't use initialize_azure_sdk_client")
+542
View File
@@ -1,4 +1,5 @@
import asyncio
import io
import json
import os
import sys
@@ -174,6 +175,34 @@ class TestVideoGeneration:
assert files == []
assert returned_api_base == "https://api.openai.com/v1/videos"
def test_video_generation_request_decodes_encoded_character_ids(self):
"""Encoded character IDs should be decoded before upstream create-video call."""
from litellm.types.videos.utils import encode_character_id_with_provider
config = OpenAIVideoConfig()
encoded_character_id = encode_character_id_with_provider(
character_id="char_123",
provider="openai",
model_id="sora-2",
)
data, files, returned_api_base = config.transform_video_create_request(
model="sora-2",
prompt="Test video prompt",
api_base="https://api.openai.com/v1/videos",
video_create_optional_request_params={
"seconds": "8",
"size": "720x1280",
"characters": [{"id": encoded_character_id}],
},
litellm_params=MagicMock(),
headers={},
)
assert data["characters"] == [{"id": "char_123"}]
assert files == []
assert returned_api_base == "https://api.openai.com/v1/videos"
def test_video_generation_response_transformation(self):
"""Test video generation response transformation."""
config = OpenAIVideoConfig()
@@ -1623,3 +1652,516 @@ def test_video_remix_handler_prefers_explicit_api_key():
if __name__ == "__main__":
pytest.main([__file__])
# ===== Tests for new video endpoints (characters, edits, extensions) =====
class TestVideoCreateCharacter:
"""Tests for video_create_character / avideo_create_character."""
def test_video_create_character_transform_request(self):
"""Verify multipart form construction for POST /videos/characters."""
config = OpenAIVideoConfig()
fake_video = b"fake_video_bytes"
url, files_list = config.transform_video_create_character_request(
name="hero",
video=fake_video,
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
assert url == "https://api.openai.com/v1/videos/characters"
# Should have (name field) + (video file field) = 2 entries
assert len(files_list) == 2
field_names = [f[0] for f in files_list]
assert "name" in field_names
assert "video" in field_names
def test_video_create_character_sets_video_mimetype(self):
"""Ensure character video upload is sent as video/mp4."""
config = OpenAIVideoConfig()
fake_video = io.BytesIO(b"....ftyp....video-bytes")
fake_video.name = "character.mp4"
_, files_list = config.transform_video_create_character_request(
name="hero",
video=fake_video,
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
video_parts = [f for f in files_list if f[0] == "video"]
assert len(video_parts) == 1
video_tuple = video_parts[0][1]
assert video_tuple[0] == "character.mp4"
assert video_tuple[2] == "video/mp4"
def test_video_create_character_transform_response(self):
"""Verify CharacterObject is returned from response."""
from litellm.types.videos.main import CharacterObject
config = OpenAIVideoConfig()
mock_response = MagicMock()
mock_response.json.return_value = {
"id": "char_abc123",
"object": "character",
"created_at": 1712697600,
"name": "hero",
}
result = config.transform_video_create_character_response(
raw_response=mock_response,
logging_obj=MagicMock(),
)
assert isinstance(result, CharacterObject)
assert result.id == "char_abc123"
assert result.name == "hero"
def test_video_create_character_mock_response(self):
"""video_create_character returns CharacterObject on mock_response."""
from litellm.types.videos.main import CharacterObject
from litellm.videos.main import video_create_character
response = video_create_character(
name="hero",
video=b"fake",
mock_response={
"id": "char_abc",
"object": "character",
"created_at": 1712697600,
"name": "hero",
},
)
assert isinstance(response, CharacterObject)
assert response.id == "char_abc"
class TestVideoGetCharacter:
"""Tests for video_get_character / avideo_get_character."""
def test_video_get_character_transform_request(self):
"""Verify URL construction for GET /videos/characters/{character_id}."""
config = OpenAIVideoConfig()
url, params = config.transform_video_get_character_request(
character_id="char_xyz",
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
assert url == "https://api.openai.com/v1/videos/characters/char_xyz"
assert params == {}
def test_video_get_character_transform_response(self):
"""Verify CharacterObject is returned from GET response."""
from litellm.types.videos.main import CharacterObject
config = OpenAIVideoConfig()
mock_response = MagicMock()
mock_response.json.return_value = {
"id": "char_xyz",
"object": "character",
"created_at": 1712697600,
"name": "villain",
}
result = config.transform_video_get_character_response(
raw_response=mock_response,
logging_obj=MagicMock(),
)
assert isinstance(result, CharacterObject)
assert result.id == "char_xyz"
assert result.name == "villain"
def test_video_get_character_mock_response(self):
"""video_get_character returns CharacterObject on mock_response."""
from litellm.types.videos.main import CharacterObject
from litellm.videos.main import video_get_character
response = video_get_character(
character_id="char_xyz",
mock_response={
"id": "char_xyz",
"object": "character",
"created_at": 1712697600,
"name": "villain",
},
)
assert isinstance(response, CharacterObject)
assert response.id == "char_xyz"
class TestVideoEdit:
"""Tests for video_edit / avideo_edit."""
def test_video_edit_transform_request(self):
"""Verify JSON body with video.id for POST /videos/edits."""
config = OpenAIVideoConfig()
url, data = config.transform_video_edit_request(
prompt="make it brighter",
video_id="video_abc123",
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
assert url == "https://api.openai.com/v1/videos/edits"
assert data["prompt"] == "make it brighter"
assert data["video"]["id"] == "video_abc123"
def test_video_edit_transform_request_with_extra_body(self):
"""Extra body params are merged into request data."""
config = OpenAIVideoConfig()
url, data = config.transform_video_edit_request(
prompt="darken it",
video_id="video_abc123",
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
extra_body={"resolution": "1080p"},
)
assert data["resolution"] == "1080p"
def test_video_edit_mock_response(self):
"""video_edit returns VideoObject on mock_response."""
from litellm.videos.main import video_edit
response = video_edit(
video_id="video_abc123",
prompt="make it brighter",
mock_response={
"id": "video_edit_001",
"object": "video",
"status": "queued",
"created_at": 1712697600,
},
)
assert isinstance(response, VideoObject)
assert response.id == "video_edit_001"
def test_video_edit_strips_encoded_provider_from_video_id(self):
"""Provider-encoded video IDs are decoded before sending to API."""
from litellm.types.videos.utils import encode_video_id_with_provider
config = OpenAIVideoConfig()
encoded_id = encode_video_id_with_provider("raw_video_id", "openai", None)
url, data = config.transform_video_edit_request(
prompt="test",
video_id=encoded_id,
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
# The video.id in the request body should be the raw ID, not the encoded one
assert data["video"]["id"] == "raw_video_id"
class TestVideoExtension:
"""Tests for video_extension / avideo_extension."""
def test_video_extension_transform_request(self):
"""Verify JSON body with video.id + seconds for POST /videos/extensions."""
config = OpenAIVideoConfig()
url, data = config.transform_video_extension_request(
prompt="continue the scene",
video_id="video_abc123",
seconds="5",
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
assert url == "https://api.openai.com/v1/videos/extensions"
assert data["prompt"] == "continue the scene"
assert data["seconds"] == "5"
assert data["video"]["id"] == "video_abc123"
def test_video_extension_transform_request_with_extra_body(self):
"""Extra body params are merged into request data."""
config = OpenAIVideoConfig()
url, data = config.transform_video_extension_request(
prompt="extend",
video_id="video_abc123",
seconds="10",
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
extra_body={"model": "sora-2"},
)
assert data["model"] == "sora-2"
def test_video_extension_mock_response(self):
"""video_extension returns VideoObject on mock_response."""
from litellm.videos.main import video_extension
response = video_extension(
video_id="video_abc123",
prompt="continue the scene",
seconds="5",
mock_response={
"id": "video_ext_001",
"object": "video",
"status": "queued",
"created_at": 1712697600,
},
)
assert isinstance(response, VideoObject)
assert response.id == "video_ext_001"
def test_video_extension_strips_encoded_provider_from_video_id(self):
"""Provider-encoded video IDs are decoded before sending to API."""
from litellm.types.videos.utils import encode_video_id_with_provider
config = OpenAIVideoConfig()
encoded_id = encode_video_id_with_provider("raw_video_id", "openai", None)
url, data = config.transform_video_extension_request(
prompt="extend",
video_id=encoded_id,
seconds="5",
api_base="https://api.openai.com/v1/videos",
litellm_params=MagicMock(),
headers={},
)
assert data["video"]["id"] == "raw_video_id"
@pytest.fixture
def video_proxy_test_client():
from fastapi import FastAPI
from fastapi.testclient import TestClient
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.video_endpoints.endpoints import router as video_router
app = FastAPI()
app.include_router(video_router)
app.dependency_overrides[user_api_key_auth] = lambda: MagicMock()
return TestClient(app)
def test_character_id_encode_decode_roundtrip():
from litellm.types.videos.utils import (
decode_character_id_with_provider,
encode_character_id_with_provider,
)
encoded = encode_character_id_with_provider(
character_id="char_raw_123",
provider="vertex_ai",
model_id="veo-2.0-generate-001",
)
decoded = decode_character_id_with_provider(encoded)
assert decoded["character_id"] == "char_raw_123"
assert decoded["custom_llm_provider"] == "vertex_ai"
assert decoded["model_id"] == "veo-2.0-generate-001"
def test_character_id_decode_handles_missing_base64_padding():
from litellm.types.videos.utils import (
decode_character_id_with_provider,
encode_character_id_with_provider,
)
encoded = encode_character_id_with_provider(
character_id="id",
provider="openai",
model_id="gpt-4o",
)
encoded_without_padding = encoded.rstrip("=")
decoded = decode_character_id_with_provider(encoded_without_padding)
assert decoded["character_id"] == "id"
assert decoded["custom_llm_provider"] == "openai"
assert decoded["model_id"] == "gpt-4o"
def test_video_create_character_target_model_names_returns_encoded_id(video_proxy_test_client):
from litellm.proxy.common_request_processing import ProxyBaseLLMRequestProcessing
from litellm.types.videos.utils import decode_character_id_with_provider
captured_data = {}
async def _mock_base_process(self, **kwargs):
captured_data.update(self.data)
return {
"id": "char_upstream_123",
"object": "character",
"created_at": 1712697600,
"name": "hero",
}
with patch.object(
ProxyBaseLLMRequestProcessing,
"base_process_llm_request",
new=_mock_base_process,
):
response = video_proxy_test_client.post(
"/v1/videos/characters",
headers={"Authorization": "Bearer sk-1234"},
files={"video": ("character.mp4", b"fake-video", "video/mp4")},
data={
"name": "hero",
"target_model_names": "vertex-ai-sora-2",
"extra_body": json.dumps({"custom_llm_provider": "vertex_ai"}),
},
)
assert response.status_code == 200, response.text
response_json = response.json()
decoded = decode_character_id_with_provider(response_json["id"])
assert decoded["character_id"] == "char_upstream_123"
assert decoded["custom_llm_provider"] == "vertex_ai"
assert decoded["model_id"] == "vertex-ai-sora-2"
assert captured_data["model"] == "vertex-ai-sora-2"
assert captured_data["custom_llm_provider"] == "vertex_ai"
def test_video_get_character_accepts_encoded_character_id(video_proxy_test_client):
from litellm.proxy.common_request_processing import ProxyBaseLLMRequestProcessing
from litellm.types.videos.utils import (
decode_character_id_with_provider,
encode_character_id_with_provider,
)
captured_data = {}
async def _mock_base_process(self, **kwargs):
captured_data.update(self.data)
return {
"id": "char_upstream_123",
"object": "character",
"created_at": 1712697600,
"name": "hero",
}
encoded_character_id = encode_character_id_with_provider(
character_id="char_upstream_123",
provider="vertex_ai",
model_id="veo-2.0-generate-001",
)
mock_router = MagicMock()
mock_router.resolve_model_name_from_model_id.return_value = "vertex-ai-sora-2"
with patch("litellm.proxy.proxy_server.llm_router", mock_router):
with patch.object(
ProxyBaseLLMRequestProcessing,
"base_process_llm_request",
new=_mock_base_process,
):
response = video_proxy_test_client.get(
f"/v1/videos/characters/{encoded_character_id}",
headers={"Authorization": "Bearer sk-1234"},
)
assert response.status_code == 200, response.text
assert captured_data["character_id"] == "char_upstream_123"
assert captured_data["custom_llm_provider"] == "vertex_ai"
assert captured_data["model"] == "vertex-ai-sora-2"
response_decoded = decode_character_id_with_provider(response.json()["id"])
assert response_decoded["character_id"] == "char_upstream_123"
assert response_decoded["custom_llm_provider"] == "vertex_ai"
assert response_decoded["model_id"] == "veo-2.0-generate-001"
@pytest.mark.parametrize("endpoint", ["/v1/videos/edits", "/v1/videos/extensions"])
def test_edit_and_extension_support_custom_provider_from_extra_body(
video_proxy_test_client, endpoint
):
from litellm.proxy.common_request_processing import ProxyBaseLLMRequestProcessing
captured_data = {}
async def _mock_base_process(self, **kwargs):
captured_data.update(self.data)
return {
"id": "video_resp_123",
"object": "video",
"status": "queued",
"created_at": 1712697600,
}
payload = {
"prompt": "test",
"video": {"id": "video_raw_123"},
"extra_body": {"custom_llm_provider": "vertex_ai"},
}
if endpoint.endswith("extensions"):
payload["seconds"] = "4"
with patch.object(
ProxyBaseLLMRequestProcessing,
"base_process_llm_request",
new=_mock_base_process,
):
response = video_proxy_test_client.post(
endpoint,
headers={"Authorization": "Bearer sk-1234"},
json=payload,
)
assert response.status_code == 200, response.text
assert captured_data["custom_llm_provider"] == "vertex_ai"
@pytest.mark.parametrize("endpoint", ["/v1/videos/edits", "/v1/videos/extensions"])
def test_edit_and_extension_route_with_encoded_video_ids(
video_proxy_test_client, endpoint
):
from litellm.proxy.common_request_processing import ProxyBaseLLMRequestProcessing
from litellm.types.videos.utils import encode_video_id_with_provider
captured_data = {}
async def _mock_base_process(self, **kwargs):
captured_data.update(self.data)
return {
"id": "video_resp_123",
"object": "video",
"status": "queued",
"created_at": 1712697600,
}
encoded_video_id = encode_video_id_with_provider(
video_id="video_raw_123",
provider="vertex_ai",
model_id="veo-2.0-generate-001",
)
payload = {"prompt": "test", "video": {"id": encoded_video_id}}
if endpoint.endswith("extensions"):
payload["seconds"] = "4"
mock_router = MagicMock()
mock_router.resolve_model_name_from_model_id.return_value = "vertex-ai-sora-2"
with patch("litellm.proxy.proxy_server.llm_router", mock_router):
with patch.object(
ProxyBaseLLMRequestProcessing,
"base_process_llm_request",
new=_mock_base_process,
):
response = video_proxy_test_client.post(
endpoint,
headers={"Authorization": "Bearer sk-1234"},
json=payload,
)
assert response.status_code == 200, response.text
assert captured_data["video_id"] == encoded_video_id
assert captured_data["custom_llm_provider"] == "vertex_ai"
assert captured_data["model"] == "vertex-ai-sora-2"