示例
消息批次示例
消息批次 API 的使用示例
消息批次 API 支持与消息 API 相同的功能集。虽然本页重点介绍如何使用消息批次 API,但有关消息 API 功能集的示例,请参阅消息 API 示例。
创建消息批次
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id="my-first-request",
params=MessageCreateParamsNonStreaming(
model="claude-opus-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": "Hello, world",
}]
)
),
Request(
custom_id="my-second-request",
params=MessageCreateParamsNonStreaming(
model="claude-opus-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": "Hi again, friend",
}]
)
)
]
)
print(message_batch)
JSON
{
"id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
"type": "message_batch",
"processing_status": "in_progress",
"request_counts": {
"processing": 2,
"succeeded": 0,
"errored": 0,
"canceled": 0,
"expired": 0
},
"ended_at": null,
"created_at": "2024-09-24T18:37:24.100435Z",
"expires_at": "2024-09-25T18:37:24.100435Z",
"cancel_initiated_at": null,
"results_url": null
}
轮询消息批次完成情况
要轮询消息批次,您需要其 id
,该 ID 在创建请求的响应中提供,或通过列出批次获取。示例 id
:msgbatch_013Zva2CMHLNnXjNJJKqJ2EF
。
import anthropic
client = anthropic.Anthropic()
message_batch = None
while True:
message_batch = client.messages.batches.retrieve(
MESSAGE_BATCH_ID
)
if message_batch.processing_status == "ended":
break
print(f"Batch {MESSAGE_BATCH_ID} is still processing...")
time.sleep(60)
print(message_batch)
列出工作区中的所有消息批次
import anthropic
client = anthropic.Anthropic()
# Automatically fetches more pages as needed.
for message_batch in client.messages.batches.list(
limit=20
):
print(message_batch)
Output
{
"id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
"type": "message_batch",
...
}
{
"id": "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d",
"type": "message_batch",
...
}
获取消息批次结果
一旦您的消息批次状态为 ended
,您将能够查看批次的 results_url
并以 .jsonl
文件形式获取结果。
import anthropic
client = anthropic.Anthropic()
# Stream results file in memory-efficient chunks, processing one at a time
for result in client.messages.batches.results(
MESSAGE_BATCH_ID,
):
print(result)
Output
{
"id": "my-second-request",
"result": {
"type": "succeeded",
"message": {
"id": "msg_018gCsTGsXkYJVqYPxTgDHBU",
"type": "message",
...
}
}
}
{
"custom_id": "my-first-request",
"result": {
"type": "succeeded",
"message": {
"id": "msg_01XFDUDYJgAACzvnptvVoYEL",
"type": "message",
...
}
}
}
取消消息批次
在取消后,批次的 processing_status
将立即变为 canceling
。您可以使用相同的轮询批次完成情况技术来轮询取消何时完成,因为被取消的批次最终也会变为 ended
状态,并且可能包含结果。
import anthropic
client = anthropic.Anthropic()
message_batch = client.messages.batches.cancel(
MESSAGE_BATCH_ID,
)
print(message_batch)
JSON
{
"id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
"type": "message_batch",
"processing_status": "canceling",
"request_counts": {
"processing": 2,
"succeeded": 0,
"errored": 0,
"canceled": 0,
"expired": 0
},
"ended_at": null,
"created_at": "2024-09-24T18:37:24.100435Z",
"expires_at": "2024-09-25T18:37:24.100435Z",
"cancel_initiated_at": "2024-09-24T18:39:03.114875Z",
"results_url": null
}
On this page