消息批次 API 支持与消息 API 相同的功能集。虽然本页重点介绍如何使用消息批次 API,但有关消息 API 功能集的示例,请参阅消息 API 示例

创建消息批次

import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

client = anthropic.Anthropic()

message_batch = client.messages.batches.create(
    requests=[
        Request(
            custom_id="my-first-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-20250514",
                max_tokens=1024,
                messages=[{
                    "role": "user",
                    "content": "Hello, world",
                }]
            )
        ),
        Request(
            custom_id="my-second-request",
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-20250514",
                max_tokens=1024,
                messages=[{
                    "role": "user",
                    "content": "Hi again, friend",
                }]
            )
        )
    ]
)
print(message_batch)
JSON
{
  "id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
  "type": "message_batch",
  "processing_status": "in_progress",
  "request_counts": {
    "processing": 2,
    "succeeded": 0,
    "errored": 0,
    "canceled": 0,
    "expired": 0
  },
  "ended_at": null,
  "created_at": "2024-09-24T18:37:24.100435Z",
  "expires_at": "2024-09-25T18:37:24.100435Z",
  "cancel_initiated_at": null,
  "results_url": null
}

轮询消息批次完成情况

要轮询消息批次,您需要其 id,该 ID 在创建请求的响应中提供,或通过列出批次获取。示例 idmsgbatch_013Zva2CMHLNnXjNJJKqJ2EF

import anthropic

client = anthropic.Anthropic()

message_batch = None
while True:
    message_batch = client.messages.batches.retrieve(
        MESSAGE_BATCH_ID
    )
    if message_batch.processing_status == "ended":
        break
              
    print(f"Batch {MESSAGE_BATCH_ID} is still processing...")
    time.sleep(60)
print(message_batch)

列出工作区中的所有消息批次

import anthropic

client = anthropic.Anthropic()

# Automatically fetches more pages as needed.
for message_batch in client.messages.batches.list(
    limit=20
):
    print(message_batch)
Output
{
  "id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
  "type": "message_batch",
  ...
}
{
  "id": "msgbatch_01HkcTjaV5uDC8jWR4ZsDV8d",
  "type": "message_batch",
  ...
}

获取消息批次结果

一旦您的消息批次状态为 ended,您将能够查看批次的 results_url 并以 .jsonl 文件形式获取结果。

import anthropic

client = anthropic.Anthropic()

# Stream results file in memory-efficient chunks, processing one at a time
for result in client.messages.batches.results(
    MESSAGE_BATCH_ID,
):
    print(result)
Output
{
  "id": "my-second-request",
  "result": {
    "type": "succeeded",
    "message": {
      "id": "msg_018gCsTGsXkYJVqYPxTgDHBU",
      "type": "message",
      ...
    }
  }
}
{
  "custom_id": "my-first-request",
  "result": {
    "type": "succeeded",
    "message": {
      "id": "msg_01XFDUDYJgAACzvnptvVoYEL",
      "type": "message",
      ...
    }
  }
}

取消消息批次

在取消后,批次的 processing_status 将立即变为 canceling。您可以使用相同的轮询批次完成情况技术来轮询取消何时完成,因为被取消的批次最终也会变为 ended 状态,并且可能包含结果。

import anthropic

client = anthropic.Anthropic()

message_batch = client.messages.batches.cancel(
    MESSAGE_BATCH_ID,
)
print(message_batch)
JSON
{
  "id": "msgbatch_013Zva2CMHLNnXjNJJKqJ2EF",
  "type": "message_batch",
  "processing_status": "canceling",
  "request_counts": {
    "processing": 2,
    "succeeded": 0,
    "errored": 0,
    "canceled": 0,
    "expired": 0
  },
  "ended_at": null,
  "created_at": "2024-09-24T18:37:24.100435Z",
  "expires_at": "2024-09-25T18:37:24.100435Z",
  "cancel_initiated_at": "2024-09-24T18:39:03.114875Z",
  "results_url": null
}