使用 Dead-Letter 队列处理消息传输失败

排队消息可能无法传递。 这些失败的消息将记录在死信队列中。 传递失败的原因可能是网络故障、已删除队列、完整队列、身份验证失败或无法按时交付的原因。

如果接收应用程序不及时从队列中读取消息,排队消息可能会长时间保留在队列中。 此行为可能不适合处理时间敏感的消息。 时间敏感型消息在队列绑定中设置了生存时间(TTL)属性,该属性指示消息在过期之前可以在队列中停留的时长。 过期消息将发送到一个称为死信队列的特殊队列中。 也可以出于其他原因将这些消息放入死信队列,如超出队列配额或由于身份验证失败。

通常,应用程序会编写补偿逻辑来读取死信队列中的消息和失败原因。 补偿逻辑取决于失败的原因。 例如,在身份验证失败的情况下,可以更正随消息一起附加的证书并重新发送消息。 如果由于达到目标队列配额而传递失败,可以重新尝试传递,希望解决配额问题。

大多数队列系统都有一个系统级的死信队列,用于存储来自该系统的所有失败消息。 消息队列(MSMQ)提供两个系统范围的死信队列:一个事务系统范围的死信队列,用于存储无法传递到事务队列的消息和非事务系统范围的死信队列,用于存储未能传递到非事务队列的消息。 如果两个客户端向两个不同的服务发送消息,因此 WCF 中的不同队列共享相同的 MSMQ 服务进行发送,则系统死信队列中可能会出现混杂消息。 这并不总是最佳状态。 在某些情况下(例如,安全性),你可能不希望一个客户端从死信队列中读取另一个客户端的消息。 共享死信队列还要求客户端浏览该队列来查找这些客户端所发送的消息,但根据死信队列中的消息数量,这样做的开销可能会极其大。 因此,在 WCFNetMsmqBindingMsmqIntegrationBinding, 和 Windows Vista 上的 MSMQ 中,提供自定义死信队列(有时称为特定于应用程序的死信队列)。

自定义死信队列在共享同一 MSMQ 服务的客户端之间提供隔离,以发送消息。

在 Windows Server 2003 和 Windows XP 上,Windows Communication Foundation (WCF) 为所有排队客户端应用程序提供系统范围的死信队列。 在 Windows Vista 上,WCF 为每个排队客户端应用程序都提供了一个死信队列。

指定死信队列的用途

死信队列位于发送应用程序的队列管理器中。 它存储已过期或传输失败的消息。

该绑定具有下列死信队列属性:

从死信队列中读取消息

从死信队列中读取消息的应用程序类似于从应用程序队列中读取消息的 WCF 服务,但有以下细微差异:

  • 若要从系统事务死信队列中读取消息,统一资源标识符(URI)必须采用以下形式:net.msmq://localhost/system$;DeadXact。

  • 若要从系统非事务性死信队列中读取消息,URI 必须采用以下形式:net.msmq://localhost/system$;DeadLetter。

  • 若要从自定义死信队列中读取消息,URI 必须采用 form:net.msmq://localhost/private/<,其中 custom-dlq-name> 是自定义死信队列的名称。

有关如何解决队列的详细信息,请参阅 服务终结点和队列寻址

WCF 堆栈在接收方上将消息中的地址与服务正在监听的地址进行匹配。 如果地址匹配,则会发送消息;如果不匹配,则不会发送消息。 在从死信队列中进行读取时,这种操作可能会引发问题,原因是死信队列中的消息通常将寻址到该服务而非死信队列服务。 因此,从死信队列中读取的服务必须安装一个地址筛选器,该筛选器 ServiceBehavior 指示堆栈匹配队列中的所有消息,而不考虑消息的收件人。 具体而言,您必须将一个带有 ServiceBehavior 参数的 Any 添加到从死信队列中读取消息的服务中。

死信队列中的病毒消息处理

病毒消息处理在死信队列中可用,但有一些条件。 因为在从系统的死信队列中进行读取时,您无法从系统队列创建子队列,所以不能将 ReceiveErrorHandling 设置为 Move。 请注意,如果您正在从自定义死信队列中读取,则可以拥有子队列,因此 Move 对于病毒消息为有效处理。

ReceiveErrorHandling 设置为 Reject 时,从自定义死信队列中读取时,毒物消息会被放入系统死信队列中。 如果从系统死信队列中读取消息,则该消息会被丢弃(不再处理)。 在 MSMQ 的系统死信队列中执行拒绝将丢弃(清除)该消息。

示例:

以下示例演示如何创建死信队列以及如何使用它来处理过期的消息。 该示例基于如何:与 WCF 终结点交换排队消息中示例。 下面的示例演示如何将客户端代码写入将死信队列用于每个应用程序的订单处理服务。 该示例还显示如何处理死信队列中的消息。

下面是为每个应用程序指定死信队列的客户端的代码。

using System;
using System.ServiceModel.Channels;
using System.Configuration;
//using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            // Get MSMQ queue name from appsettings in configuration.
            string deadLetterQueueName = ConfigurationManager.AppSettings["deadLetterQueueName"];

            // Create the transacted MSMQ queue for storing dead message if necessary.
            if (!System.Messaging.MessageQueue.Exists(deadLetterQueueName))
                System.Messaging.MessageQueue.Create(deadLetterQueueName, true);

            OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");
        try
            {	

                // Create the purchase order.
                PurchaseOrder po = new PurchaseOrder();
                po.CustomerId = "somecustomer.com";
                po.PONumber = Guid.NewGuid().ToString();

                PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
                lineItem1.ProductId = "Blue Widget";
                lineItem1.Quantity = 54;
                lineItem1.UnitCost = 29.99F;

                PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
                lineItem2.ProductId = "Red Widget";
                lineItem2.Quantity = 890;
                lineItem2.UnitCost = 45.89F;

                po.orderLineItems = new PurchaseOrderLineItem[2];
                po.orderLineItems[0] = lineItem1;
                po.orderLineItems[1] = lineItem2;

                //Create a transaction scope.
                using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
                {
                    // Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po);
                    // Complete the transaction.
                    scope.Complete();
                }

                client.Close();
            }
            catch(TimeoutException timeout)
            {
        Console.WriteLine(timeout.Message);
                client.Abort();
        }
            catch(CommunicationException conexcp)
            {
        Console.WriteLine(conexcp.Message);
                client.Abort();
        }

            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}
Imports System.ServiceModel.Channels
Imports System.Configuration
'using System.Messaging;
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples

    'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    'Client implementation code.
    Friend Class Client
        Shared Sub Main()
            ' Get MSMQ queue name from appsettings in configuration.
            Dim deadLetterQueueName As String = ConfigurationManager.AppSettings("deadLetterQueueName")

            ' Create the transacted MSMQ queue for storing dead message if necessary.
            If (Not System.Messaging.MessageQueue.Exists(deadLetterQueueName)) Then
                System.Messaging.MessageQueue.Create(deadLetterQueueName, True)
            End If


            Dim client As New OrderProcessorClient("OrderProcessorEndpoint")
            Try


                ' Create the purchase order.
                Dim po As New PurchaseOrder()
                po.CustomerId = "somecustomer.com"
                po.PONumber = Guid.NewGuid().ToString()

                Dim lineItem1 As New PurchaseOrderLineItem()
                lineItem1.ProductId = "Blue Widget"
                lineItem1.Quantity = 54
                lineItem1.UnitCost = 29.99F

                Dim lineItem2 As New PurchaseOrderLineItem()
                lineItem2.ProductId = "Red Widget"
                lineItem2.Quantity = 890
                lineItem2.UnitCost = 45.89F

                po.orderLineItems = New PurchaseOrderLineItem(1) {}
                po.orderLineItems(0) = lineItem1
                po.orderLineItems(1) = lineItem2

                'Create a transaction scope.
                Using scope As New TransactionScope(TransactionScopeOption.Required)
                    ' Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po)
                    ' Complete the transaction.
                    scope.Complete()
                End Using


                client.Close()
            Catch timeout As TimeoutException
                Console.WriteLine(timeout.Message)
                client.Abort()
            Catch conexcp As CommunicationException
                Console.WriteLine(conexcp.Message)
                client.Abort()
            End Try

            Console.WriteLine()
            Console.WriteLine("Press <ENTER> to terminate client.")
            Console.ReadLine()
        End Sub
    End Class
End Namespace

下面是客户端配置文件的代码。

下面是处理死信队列消息的服务的代码。

using System;
using System.ServiceModel.Description;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract.
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void SubmitPurchaseOrder(PurchaseOrder po);
    }

    // Service class that implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, AddressFilterMode = AddressFilterMode.Any)]
    public class PurchaseOrderDLQService : IOrderProcessor
    {
        OrderProcessorClient orderProcessorService;
        public PurchaseOrderDLQService()
        {
            orderProcessorService = new OrderProcessorClient("OrderProcessorEndpoint");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SimpleSubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine($"Submitting purchase order did not succeed ");
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine($"Message Delivery Status: {mqProp.DeliveryStatus} ");
            Console.WriteLine($"Message Delivery Failure: {mqProp.DeliveryFailure}");
            Console.WriteLine();
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine($"Submitting purchase order did not succeed ");
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine($"Message Delivery Status: {mqProp.DeliveryStatus} ");
            Console.WriteLine($"Message Delivery Failure: {mqProp.DeliveryFailure}");
            Console.WriteLine();

            // Resend the message if timed out.
            if (mqProp.DeliveryFailure == DeliveryFailure.ReachQueueTimeout ||
                mqProp.DeliveryFailure == DeliveryFailure.ReceiveTimeout)
            {
                // Re-send.
                Console.WriteLine("Purchase order Time To Live expired");
                Console.WriteLine("Trying to resend the message");

                // Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po);
                Console.WriteLine("Purchase order resent");
            }
        }

        // Host the service within this EXE console application.
        public static void Main()
        {
            // Create a ServiceHost for the PurchaseOrderDLQService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(PurchaseOrderDLQService)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close();
            }
        }
    }
}

Imports System.ServiceModel.Description
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.ServiceModel.Channels
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
    ' Define a service contract. 
    <ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples")> _
    Public Interface IOrderProcessor
        <OperationContract(IsOneWay:=True)> _
        Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder)
    End Interface

    ' Service class that implements the service contract.
    ' Added code to write output to the console window
    <ServiceBehavior(InstanceContextMode:=InstanceContextMode.Single, ConcurrencyMode:=ConcurrencyMode.Single, AddressFilterMode:=AddressFilterMode.Any)> _
    Public Class PurchaseOrderDLQService
        Implements IOrderProcessor
        Private orderProcessorService As OrderProcessorClient
        Public Sub New()
            orderProcessorService = New OrderProcessorClient("OrderProcessorEndpoint")
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SimpleSubmitPurchaseOrder(ByVal po As PurchaseOrder)
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder) Implements IOrderProcessor.SubmitPurchaseOrder
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()

            ' Resend the message if timed out.
            If mqProp.DeliveryFailure = DeliveryFailure.ReachQueueTimeout OrElse mqProp.DeliveryFailure = DeliveryFailure.ReceiveTimeout Then
                ' Re-send.
                Console.WriteLine("Purchase order Time To Live expired")
                Console.WriteLine("Trying to resend the message")

                ' Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po)
                Console.WriteLine("Purchase order resent")
            End If
        End Sub

        ' Host the service within this EXE console application.
        Public Shared Sub Main()
            ' Create a ServiceHost for the PurchaseOrderDLQService type.
            Using serviceHost As New ServiceHost(GetType(PurchaseOrderDLQService))
                ' Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open()

                ' The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.")
                Console.WriteLine("Press <ENTER> to terminate service.")
                Console.WriteLine()
                Console.ReadLine()

                ' Close the ServiceHostBase to shutdown the service.
                serviceHost.Close()
            End Using
        End Sub
    End Class
End Namespace

下面是死信队列服务配置文件的代码。

另请参阅