EventProcessorClientBuilder 类
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClientBuilder
- com.
实现
public class EventProcessorClientBuilder
implements TokenCredentialTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, AmqpTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>
此类提供 Fluent Builder API 来帮助配置和实例化 EventProcessorClient。 调用 buildEventProcessorClient() 构造 的新实例 EventProcessorClient。
若要创建 的 EventProcessorClient实例, 需要以下字段:
CheckpointStore - CheckpointStore 的实现,用于存储检查点和分区所有权信息,以实现负载均衡和检查点处理事件。
processEvent(Consumer<EventContext> processEvent) 或 processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime) - 处理从事件中心接收的事件的回调。
processError(Consumer<ErrorContext> processError) - 处理运行 EventProcessorClient 时可能发生的错误的回调。
针对Azure 事件中心执行操作的凭据。 可以使用以下方法之一设置它们:
- connectionString(String connectionString) 与特定事件中心的连接字符串。
- connectionString(String connectionString, String eventHubName) 包含事件中心 命名空间 连接字符串和事件中心名称。
- credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential) 具有完全限定的命名空间、事件中心名称和一组授权使用事件中心的凭据。
- credential(TokenCredential credential)、 credential(AzureSasCredential credential)或 credential(AzureNamedKeyCredential credential) 以及 fullyQualifiedNamespace(String fullyQualifiedNamespace) 和 eventHubName(String eventHubName)。 使用事件中心的完全限定命名空间、事件中心名称和授权凭据。
本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,建议在生产环境中使用 托管标识 进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。
示例:构造 EventProcessorClient
以下代码示例演示如何创建处理器客户端。 建议将处理器客户端用于生产方案,因为它可以在多个正在运行的实例之间进行负载均衡,可以执行检查点,并在网络中断等暂时性故障时重新连接。 以下示例使用内存CheckpointStore中,但 azure-messaging-eventhubs-checkpointstore-blob 提供由Azure Blob 存储支持的检查点存储。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
字段摘要
修饰符和类型 | 字段和描述 |
---|---|
static final Duration |
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
默认负载均衡更新间隔。 |
static final Duration |
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
默认所有权过期。 |
构造函数摘要
构造函数 | 说明 |
---|---|
EventProcessorClientBuilder() |
创建 EventProcessorClientBuilder 的新实例。 |
方法摘要
方法继承自 java.lang.Object
字段详细信息
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
默认负载均衡更新间隔。 均衡间隔应考虑到客户端与存储帐户之间的延迟。
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
默认所有权过期。
构造函数详细信息
EventProcessorClientBuilder
public EventProcessorClientBuilder()
创建 EventProcessorClientBuilder 的新实例。
方法详细信息
buildEventProcessorClient
public EventProcessorClient buildEventProcessorClient()
这将创建一个配置了此生成器中设置的选项的新 EventProcessorClient 。 对此方法的每次调用都将返回 的新实例 EventProcessorClient。
由此 EventProcessorClient 处理的所有分区都将从 earliest() 相应分区中的可用事件开始处理。
Returns:
checkpointStore
public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
CheckpointStoreEventProcessorClient设置 将用于存储分区所有权和检查点信息的 。
用户可以(可选)提供自己的实现, CheckpointStore 这将存储所有权和检查点信息。
Parameters:
Returns:
clientOptions
public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)
设置处理器客户端的客户端选项。 客户端选项上设置的应用程序 ID 将用于跟踪。 当前未使用设置的 ClientOptions
标头,但可以在更高版本中使用,以添加到 AMQP 消息。
Parameters:
Returns:
configuration
public EventProcessorClientBuilder configuration(Configuration configuration)
设置在构造服务客户端期间使用的配置存储。 如果未指定,则使用默认配置存储来配置 EventHubAsyncClient。 用于 NONE 在构造过程中绕过配置设置。
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString)
为事件中心实例设置给定连接字符串的凭据信息。
如果连接字符串是从事件中心命名空间复制的,则它可能不会包含所需事件中心的名称,这是必需的。 在这种情况下,可以通过将“EntityPath=EVENT_HUB_NAME”添加到连接字符串的末尾来手动添加名称。 例如,“EntityPath=telemetry-hub”。
如果已直接在事件中心本身上定义共享访问策略,则从该事件中心复制连接字符串将导致包含该名称的连接字符串。
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
将给定连接字符串的凭据信息设置为事件中心命名空间,并将名称设置为特定事件中心实例。
Parameters:
Returns:
consumerGroup
public EventProcessorClientBuilder consumerGroup(String consumerGroup)
设置使用者组名称, EventProcessorClient 从中应使用事件。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)
设置要连接到的事件中心实例的凭据信息,以及如何对其授权。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureSasCredential credential)
设置要连接到的事件中心实例的凭据信息,以及如何对其授权。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(TokenCredential credential)
TokenCredential设置用于授权发送到服务的请求的 。 有关正确使用TokenCredential该类型的更多详细信息,请参阅适用于 Java 的 Azure SDK 标识和身份验证文档。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
设置要连接到的事件中心实例的凭据信息,以及如何对其授权。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
设置要连接到的事件中心实例的凭据信息,以及如何对其授权。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
设置要连接到的事件中心实例的凭据信息,以及如何对其授权。
Parameters:
Returns:
customEndpointAddress
public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)
在连接到事件中心服务时设置自定义终结点地址。 当网络不允许连接到标准Azure 事件中心终结点地址,但允许通过中介进行连接时,这非常有用。 例如:https://my.custom.endpoint.com:55300。
如果未指定端口,则使用 的默认 transportType(AmqpTransportType transport) 端口。
Parameters:
Returns:
eventHubName
public EventProcessorClientBuilder eventHubName(String eventHubName)
设置要将客户端连接到的事件中心的名称。
Parameters:
Returns:
fullyQualifiedNamespace
public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
设置事件中心命名空间的完全限定名称。
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Map
设置包含要用于每个分区的事件位置的映射(如果 中 CheckpointStore不存在分区的检查点)。 此映射已从分区 ID 中键出。
构造 时,只应使用 的initialPartitionEventPosition
一个EventProcessorClient重载。
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Function
如果 中不存在 CheckpointStore该分区的检查点,则设置每个分区的默认起始位置。
构造 时,只应使用 的initialPartitionEventPosition
一个EventProcessorClient重载。
Parameters:
Returns:
loadBalancingStrategy
public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
EventProcessorClient将LoadBalancingStrategy用于声明分区所有权。 默认情况下, BALANCED 将使用方法。
Parameters:
Returns:
loadBalancingUpdateInterval
public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
负载均衡更新周期之间的时间间隔。 这通常也是续订分区所有权的时间间隔。 默认情况下,此间隔设置为 10 秒。
Parameters:
Returns:
partitionOwnershipExpirationInterval
public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
如果拥有的处理器实例未续订分区所有权,则分区的所有权将在该持续时间之后过期。 这是此处理器实例在接管以前由非活动处理器拥有的分区的所有权之前等待的持续时间。 默认情况下,此持续时间设置为一分钟。
Parameters:
Returns:
prefetchCount
public EventProcessorClientBuilder prefetchCount(int prefetchCount)
设置接收方用于控制每个使用者在本地主动接收和排队的事件数,而不考虑接收操作当前是否处于活动状态。
Parameters:
Returns:
processError
public EventProcessorClientBuilder processError(Consumer
处理事件时发生错误时调用的函数。 输入包含发生错误的分区信息。
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
为此 接收 EventProcessorClient的每个事件调用的函数。 输入包含分区上下文和事件数据。
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
为此 接收 EventProcessorClient的每个事件调用的函数。 输入包含分区上下文和事件数据。 如果设置了最大等待时间,则接收将等待该持续时间接收事件;如果未收到任何事件,则使用 null 事件数据调用使用者。
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
为此 接收 EventProcessorClient的每个事件调用的函数。 输入包含分区上下文和事件数据。 如果设置了最大等待时间,则接收将等待该持续时间接收事件;如果未收到任何事件,则使用 null 事件数据调用使用者。
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
为此 接收 EventProcessorClient的每个事件调用的函数。 输入包含分区上下文和事件数据。 如果设置了最大等待时间,则接收将等待该持续时间接收事件;如果未收到任何事件,则使用 null 事件数据调用使用者。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.checkpointStore(new SampleCheckpointStore())
.processEventBatch(eventBatchContext -> {
eventBatchContext.getEvents().forEach(eventData -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventBatchContext.getPartitionContext().getPartitionId(),
eventData.getSequenceNumber());
});
}, 50, Duration.ofSeconds(30))
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Parameters:
Returns:
processPartitionClose
public EventProcessorClientBuilder processPartitionClose(Consumer
分区的处理停止时调用的函数。 输入包含分区信息以及停止此分区的事件处理的原因。
Parameters:
Returns:
processPartitionInitialization
public EventProcessorClientBuilder processPartitionInitialization(Consumer
在开始处理分区之前调用的函数。 输入包含分区信息以及用于处理事件的默认起始位置,如果 检查点在 中 CheckpointStore不可用时,将使用这些事件。 如果首选不同的起始位置,用户可以更新此位置。
Parameters:
Returns:
proxyOptions
public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
设置要用于 的 EventHubAsyncClient代理配置。 配置代理后, AMQP_WEB_SOCKETS 必须用于传输类型。
Parameters:
Returns:
retry
@Deprecated
public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)
已放弃
设置 的 EventHubAsyncClient重试策略。 如果未指定,则使用默认重试选项。
Parameters:
Returns:
retryOptions
public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)
设置 的 EventHubAsyncClient重试策略。 如果未指定,则使用默认重试选项。
Parameters:
Returns:
trackLastEnqueuedEventProperties
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
设置事件处理程序是否应请求有关其关联分区上最后一个排队事件的信息,并在收到事件时跟踪该信息。
跟踪有关分区上一个排队事件的信息时,从事件中心服务接收的每个事件都将携带有关分区的元数据,否则不会。 这会导致少量额外的网络带宽消耗,在考虑使用事件中心客户端定期请求分区属性时,这通常是一个有利的权衡。
Parameters:
true
如果生成的事件将跟踪该分区的最后一个排队信息,则为 ; false
否则。
Returns:
transportType
public EventProcessorClientBuilder transportType(AmqpTransportType transport)
设置与Azure 事件中心的所有通信所依据的传输类型。 默认值为 AMQP。
Parameters:
Returns: