curl --request GET \
--url https://ai.kaiho.cc/v1/tasks/{task_id}查询异步任务的生成状态和进度
curl --request GET \
--url https://ai.kaiho.cc/v1/tasks/{task_id}创建任务
task_id排队中 (queued)
处理中 (processing)
完成 (completed) / 失败 (failed)
curl https://ai.kaiho.cc/v1/tasks/task_abc123 \
-H "Authorization: Bearer YOUR_API_KEY"
{
"id": "task_abc123",
"type": "video_generation",
"status": "queued",
"progress": 0,
"queue_position": 5,
"estimated_time": 180,
"created_at": "2024-01-15T10:30:00Z"
}
{
"id": "task_abc123",
"type": "video_generation",
"status": "processing",
"progress": 45,
"current_step": "rendering",
"estimated_time_remaining": 90,
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:31:00Z"
}
{
"id": "task_abc123",
"type": "video_generation",
"status": "completed",
"progress": 100,
"result": {
"video_url": "https://storage.kaiho.cc/videos/video-123.mp4",
"thumbnail_url": "https://storage.kaiho.cc/videos/thumb-123.jpg",
"duration": 10,
"resolution": "1920x1080",
"size_bytes": 15728640
},
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:31:00Z",
"completed_at": "2024-01-15T10:33:30Z",
"processing_time": 150
}
{
"id": "task_abc123",
"type": "video_generation",
"status": "failed",
"progress": 65,
"error": {
"code": "content_violation",
"message": "生成的内容违反了我们的内容政策",
"details": "检测到不适当的内容"
},
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:31:00Z",
"failed_at": "2024-01-15T10:32:45Z"
}
| 状态 | 说明 | 可用字段 |
|---|---|---|
queued | 任务已创建,等待处理 | queue_position, estimated_time |
processing | 任务正在处理中 | progress, current_step, estimated_time_remaining |
completed | 任务成功完成 | result, processing_time |
failed | 任务处理失败 | error |
cancelled | 任务被取消 | - |
合理的轮询间隔
import time
def poll_task(task_id):
while True:
status = get_task_status(task_id)
if status['status'] in ['completed', 'failed']:
return status
# 根据进度调整轮询间隔
if status['progress'] < 20:
time.sleep(10) # 初期较慢,10秒轮询一次
else:
time.sleep(5) # 后期较快,5秒轮询一次
使用指数退避
import time
def poll_with_backoff(task_id, max_attempts=100):
interval = 2
max_interval = 60
for attempt in range(max_attempts):
status = get_task_status(task_id)
if status['status'] in ['completed', 'failed']:
return status
time.sleep(interval)
interval = min(interval * 1.5, max_interval) # 指数增长,最大60秒
使用预估时间
def smart_poll(task_id):
status = get_task_status(task_id)
# 根据预估剩余时间智能等待
if 'estimated_time_remaining' in status:
wait_time = max(5, status['estimated_time_remaining'] / 5)
time.sleep(wait_time)
else:
time.sleep(10)
# 创建任务时指定 webhook
response = requests.post(
"https://ai.kaiho.cc/v1/videos/generations",
headers={"Authorization": "Bearer YOUR_API_KEY"},
json={
"model": "sora-2",
"prompt": "视频描述...",
"webhook_url": "https://your-server.com/webhook",
"webhook_events": ["completed", "failed"] # 指定要接收的事件
}
)
{
"event": "task.completed",
"task_id": "task_abc123",
"type": "video_generation",
"status": "completed",
"result": {
"video_url": "https://storage.kaiho.cc/videos/video-123.mp4",
"duration": 10
},
"completed_at": "2024-01-15T10:33:30Z"
}
curl https://ai.kaiho.cc/v1/tasks \
-H "Authorization: Bearer YOUR_API_KEY" \
-G \
-d "ids=task_abc123,task_def456,task_ghi789"
import requests
response = requests.delete(
f"https://ai.kaiho.cc/v1/tasks/task_abc123",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)
print(response.json())
# {"id": "task_abc123", "status": "cancelled"}
response = requests.get(
"https://ai.kaiho.cc/v1/tasks",
headers={"Authorization": "Bearer YOUR_API_KEY"},
params={
"type": "video_generation", # 筛选任务类型
"status": "completed", # 筛选任务状态
"limit": 50, # 每页数量
"offset": 0 # 偏移量
}
)
tasks = response.json()
| 错误码 | 说明 | 处理建议 |
|---|---|---|
task_not_found | 任务不存在 | 检查 task_id 是否正确 |
content_violation | 内容违规 | 修改提示词,避免违规内容 |
insufficient_credits | 余额不足 | 充值账户 |
processing_timeout | 处理超时 | 简化任务或联系支持 |
system_error | 系统错误 | 稍后重试或联系支持 |
import requests
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ai.kaiho.cc/v1"
def create_video_task(prompt):
"""创建视频生成任务"""
response = requests.post(
f"{BASE_URL}/videos/generations",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"model": "sora-2",
"prompt": prompt,
"duration": 10,
"resolution": "1920x1080"
}
)
return response.json()
def get_task_status(task_id):
"""获取任务状态"""
response = requests.get(
f"{BASE_URL}/tasks/{task_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
return response.json()
def wait_for_completion(task_id, timeout=600):
"""等待任务完成"""
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise TimeoutError(f"任务超时: {task_id}")
status = get_task_status(task_id)
print(f"状态: {status['status']}, 进度: {status.get('progress', 0)}%")
if status['status'] == 'completed':
return status['result']
elif status['status'] == 'failed':
raise Exception(f"任务失败: {status['error']}")
# 智能等待
time.sleep(5)
# 使用示例
task = create_video_task("一只猫在弹钢琴")
print(f"任务已创建: {task['id']}")
result = wait_for_completion(task['id'])
print(f"视频已生成: {result['video_url']}")