在Rebus的某消息类型的串行处理串行、消息、类型、Rebus

由网友(冷兔男i)分享简介:我们有一个会谈到第三方Web服务一个Rebus的消息处理程序。由于我们无法直接控制的原因,这WCF服务频频抛出一个异常,因为它遇到了自己的数据库数据库死锁。画谜将尝试处理此消息五次,在大多数情况下是指那些五次之一将是幸运的,并没有得到一个僵局。但它经常发生,一个消息并僵局后得到的僵局,并在我们的错误队列中结束了。 W...

我们有一个会谈到第三方Web服务一个Rebus的消息处理程序。由于我们无法直接控制的原因,这WCF服务频频抛出一个异常,因为它遇到了自己的数据库数据库死锁。画谜将尝试处理此消息五次,在大多数情况下是指那些五次之一将是幸运的,并没有得到一个僵局。但它经常发生,一个消息并僵局后得到的僵局,并在我们的错误队列中结束了。

We have a Rebus message handler that talks to a third party webservice. Due to reasons beyond our immediate control, this WCF service frequently throws an exception because it encountered a database deadlock in its own database. Rebus will then try to process this message five times, which in most cases means that one of those five times will be lucky and not get a deadlock. But it frequently happens that a message does get deadlock after deadlock and ends up in our error queue.

除了固定的死锁的来源,这将是一个长期的目标,我能想到的两个选项:

Besides fixing the source of the deadlocks, which would be a longterm goal, I can think of two options:

请只用这种特殊的信息类型尝试,直到成功为止。 preferably我将能够设置一个超时,所以如果有五个死锁然后再次尝试在5分钟内,而不是更通过不断努力呛的过程了。我已经做了Thread.Sleep(随机)至S $ P $垫的消息有点,但它仍然会五次尝试之后放弃。

Keep trying with only this particular message type until it succeeds. Preferably I would be able to set a timeout, so "if five deadlocks then try again in 5 minutes" rather than choke the process up even more by trying continuously. I already do a Thread.Sleep(random) to spread the messages somewhat, but it will still give up after five tries.

发送此特定消息类型以不同的队列具有处理消息,所以,出现这种情况串联而非并联只有一名工人。我们目前的配置使用8工作者线程,但这只是使死锁的情况更糟,因为Web服务现在被称为并发的消息得到对方的方式。

Send this particular message type to a different queue that has only one worker that processes the message, so that this happens serially rather than in parallel. Our current configuration uses 8 worker threads, but this just makes the deadlock situation worse as the webservice now gets called concurrently and the messages get in each other's way.

选项#2有我的preference,但我不知道这是可能的。我们在配置的接收端的目前看起来是这样的:

Option #2 has my preference, but I'm not sure if this is possible. Our configuration on the receiving side currently looks like this:

var adapter = new Rebus.Ninject.NinjectContainerAdapter(this.Kernel);

var bus = Rebus.Configuration.Configure.With(adapter)
    .Logging(x => x.Log4Net())
   .Transport(t => t.UseMsmqAndGetInputQueueNameFromAppConfig())
   .MessageOwnership(d => d.FromRebusConfigurationSection())
   .CreateBus().Start();

和的.config文件的接收端的:

And the .config for the receiving side:

<rebus inputQueue="app.msg.input" errorQueue="app.msg.error" workers="8">
  <endpoints>
  </endpoints>
</rebus>

从我可以告诉从配置,它只是可以设置一个输入队列'听'到。我真的不能找到一种方法,通过流利的映射API要么做到这一点。这似乎只需要一输入和错误队列,以及:

From what I can tell from the config, it's only possible to set one input queue to 'listen' to. I can't really find a way to do this via the fluent mapping API either. That seems to take only one input- and error queue as well:

.Transport(t =>t.UseMsmq("input", "error"))

基本上,我正在寻找的是沿着线的东西:

Basically, what I'm looking for is something along the lines of:

<rebus workers="8">
  <input name="app.msg.input" error="app.msg.error" />
  <input name="another.input.queue" error="app.msg.error" />
</rebus>

如何处理我的要求有什么建议?

Any tips on how to handle my requirements?

推荐答案

我建议你使用一个传奇和Rebus的'超时服务,以实现适合您的需要重试策略。这样一来,在你的Rebus的web服务的门面,你可以做这样的事情:

I suggest you make use of a saga and Rebus' timeout service to implement a retry strategy that fits your needs. This way, in your Rebus-enabled web service facade, you could do something like this:

public void Handle(TryMakeWebServiceCall message)
{
    try
    {
        var result = client.MakeWebServiceCall(whatever);

        bus.Reply(new ResponseWithTheResult{ ... });
    }
    catch(Exception e)
    {
        Data.FailedAttempts++;

        if (Data.FailedAttempts < 10)
        {
            bus.Defer(TimeSpan.FromSeconds(1), message);
            return;
        }

        // oh no! we failed 10 times... this is probably where we'd
        // go and do something like this:
        emailService.NotifyAdministrator("Something went wrong!");
    }
}

其中,数据是由神奇地提供给您和通话之间持续的传奇数据。

where Data is the saga data that is made magically available to you and persisted between calls.

有关灵感如何创建一个传奇,检查出的wiki页面上的协调的东西出现这种情况随着时间的推移在这里你可以看到如何服务可能有一些状态的一例本地存储所提供的处理消息的(失败的尝试,在你的情况下,即数)。

For inspiration on how to create a saga, check out the wiki page on coordinating stuff that happens over time where you can see an example on how a service might have some state (i.e. number of failed attempts in your case) stored locally that is made available between handling messages.

在时机成熟时,使 bus.Defer 的工作,你有两个选择:1)使用外部的超时服务(我通常都在每个服务器上安装一个),或2),只要用自己作为超时服务。

When the time comes to make bus.Defer work, you have two options: 1) use an external timeout service (which I usually have installed one of on each server), or 2) just use "yourself" as a timeout service.

在配置时,你去

Configure.With(...)
    .(...)
    .Timeouts(t => // configure it here)

在这里您可以 StoreInMemory StoreInSqlServer StoreInMongoDb StoreInRavenDb UseExternalTimeoutManager

如果您选择(1),你需要检查出的Rebus code和建设的 Rebus.Timeout 的自己 - 它基本上是一个可配置的,Topshelf功能的控制台,有一个Rebus的端点内的应用程序

If you choose (1), you need to check out the Rebus code and build Rebus.Timeout yourself - it's basically just a configurable, Topshelf-enabled console application that has a Rebus endpoint inside.

请让我知道如果你需要更多的帮助,使这项工作 - bus.Defer 是你的系统变得真棒,并且将能够克服所有的小毛刺的这让所有的人'下去:)

Please let me know if you need more help making this work - bus.Defer is where your system becomes awesome, and will be capable of overcoming all of the little glitches that make all others' go down :)

阅读全文

相关推荐

最新文章