範例
訊息批次範例
訊息批次 API 的使用範例
訊息批次 API 支援與訊息 API 相同的功能集。雖然本頁面專注於如何使用訊息批次 API,但有關訊息 API 功能集的範例,請參閱訊息 API 範例。
建立訊息批次
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id="my-first-request",
params=MessageCreateParamsNonStreaming(
model="claude-opus-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": "Hello, world",
}]
)
),
Request(
custom_id="my-second-request",
params=MessageCreateParamsNonStreaming(
model="claude-opus-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": "Hi again, friend",
}]
)
)
]
)
print(message_batch)
JSON
{
"id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
"type": "message_batch",
"processing_status": "in_progress",
"request_counts": {
"processing": 2,
"succeeded": 0,
"errored": 0,
"canceled": 0,
"expired": 0
},
"ended_at": null,
"created_at": "2024-09-24T18:37:24.100435Z",
"expires_at": "2024-09-25T18:37:24.100435Z",
"cancel_initiated_at": null,
"results_url": null
}
輪詢訊息批次完成情況
要輪詢訊息批次,您需要其 id
,這在建立請求的回應中或通過列出批次時提供。範例 id
:msgbatch_013Zva2CMHLNnXjNJJKqJ2EF
。
import anthropic
client = anthropic.Anthropic()
message_batch = None
while True:
message_batch = client.messages.batches.retrieve(
MESSAGE_BATCH_ID
)
if message_batch.processing_status == "ended":
break
print(f"Batch {MESSAGE_BATCH_ID} is still processing...")
time.sleep(60)
print(message_batch)
列出工作區中的所有訊息批次
import anthropic
client = anthropic.Anthropic()
# Automatically fetches more pages as needed.
for message_batch in client.messages.batches.list(
limit=20
):
print(message_batch)
Output
{
"id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
"type": "message_batch",
...
}
{
"id": "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d",
"type": "message_batch",
...
}
檢索訊息批次結果
一旦您的訊息批次狀態為 ended
,您將能夠查看批次的 results_url
並以 .jsonl
文件形式檢索結果。
import anthropic
client = anthropic.Anthropic()
# Stream results file in memory-efficient chunks, processing one at a time
for result in client.messages.batches.results(
MESSAGE_BATCH_ID,
):
print(result)
Output
{
"id": "my-second-request",
"result": {
"type": "succeeded",
"message": {
"id": "msg_018gCsTGsXkYJVqYPxTgDHBU",
"type": "message",
...
}
}
}
{
"custom_id": "my-first-request",
"result": {
"type": "succeeded",
"message": {
"id": "msg_01XFDUDYJgAACzvnptvVoYEL",
"type": "message",
...
}
}
}
取消訊息批次
在取消後立即,批次的 processing_status
將為 canceling
。您可以使用相同的輪詢批次完成情況技術來輪詢取消何時完成,因為被取消的批次最終也會變為 ended
狀態,並可能包含結果。
import anthropic
client = anthropic.Anthropic()
message_batch = client.messages.batches.cancel(
MESSAGE_BATCH_ID,
)
print(message_batch)
JSON
{
"id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
"type": "message_batch",
"processing_status": "canceling",
"request_counts": {
"processing": 2,
"succeeded": 0,
"errored": 0,
"canceled": 0,
"expired": 0
},
"ended_at": null,
"created_at": "2024-09-24T18:37:24.100435Z",
"expires_at": "2024-09-25T18:37:24.100435Z",
"cancel_initiated_at": "2024-09-24T18:39:03.114875Z",
"results_url": null
}
Was this page helpful?
On this page