使用无功扩展处理以正确的顺序多个响应多个、顺序、正确

由网友(咿呀咿呀哟)分享简介:状况我有一个,其中一个请求产生了两个响应系统。请求和响应都有相应的观测值:的IObservable< RequestSent> _要求;的IObservable< MainResponseReceived> _mainResponses;的IObservable< SecondResp...

状况

我有一个,其中一个请求产生了两个响应系统。请求和响应都有相应的观测值:

 的IObservable< RequestSent> _要求;
的IObservable< MainResponseReceived> _mainResponses;
的IObservable< SecondResponseReceived> _secondaryResponses;
 

可以保证 RequestSent 早于 MainResponseReceived 事件发生和 SecondaryResponseReceived ,但反应过来以随机顺序。

我有什么

本来我想处理程序,说明如何处理答复,所以我压缩的观测值:

  _requests
    .SelectMany(异步请求=>
    {
        VAR主要= _mainResponses.FirstAsync(M => m.Id == request.Id);
        变种次级= _secondaryResponses.FirstAsync(S => s.Id == request.Id);

        VAR zippedResponse = main.Zip(二级,(M,S)=>新建MainAndSecondaryResponseReceived {
            请求=请求,
            主要= M,
            二级= S
        });
        返回等待zippedResponse.FirstAsync(); ;
    })
    .Subscribe(OnMainAndSecondaryResponseReceived);
 

我需要什么

为什么电压变动调无功

现在我还需要办理 MainResponseReceived 无需等待SecondaryResponseRecieved,它必须保证,那OnMainResponseRecieved前 OnMainAndSecondaryResponseReceived 叫

如何定义两个订阅,请?

测试案例1:

RequestSent 发生 MainResponseReceived 时 - > OnMainResponseReceived叫 SecondaryResponseReceive D出现 - > OnMainAndSecondaryResponseReceived叫

测试案例2:

RequestSent 发生 SecondaryResponseReceived 发生 MainResponseReceived发生 - > OnMainResponseReceived被称为 - > OnMainAndSecondaryResponseReceived叫 解决方案

我觉得你是pretty的多少在正确的轨道。我将停止摆弄周围所有的异步的东西 - 那只是使事情复杂化

试试这个查询:

  VAR的查询=
    _要求
        .SelectMany(要求=>
            _mainResponses.Where(米=> m.Id == request.Id)。取(1)
                。做(米=> OnMainResponseReceived(米))
                。拉链(
                    _secondaryResponses.Where(S => s.Id == request.Id)。取(1),
                    (M,S)=>新MainAndSecondaryResponseReceived()
                    {
                        请求=请求,
                        主要= M,
                        二级= S
                    }));

VAR订阅=
    query.Subscribe(X => OnMainAndSecondaryResponseReceived(x)的);
 

。做(...)中的重要缺失的部分在code。它确保了 OnMainResponseReceived OnMainAndSecondaryResponseReceived 被称为无论如果主要或次要响应进来第一。

我测试用:

 受试对象; RequestSent> _requestsSubject =新的受试对象; RequestSent>();
受试对象; MainResponseReceived> _mainResponsesSubject =新的受试对象; MainResponseReceived>();
受试对象; SecondResponseReceived> _secondaryResponsesSubject =新的受试对象; SecondResponseReceived>();

的IObservable< RequestSent> _requests = _requestsSubject.AsObservable();
的IObservable< MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
的IObservable< SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();

_requestsSubject.OnNext(新RequestSent(){n = 42});
_mainResponsesSubject.OnNext(新MainResponseReceived(){n = 42});
_secondaryResponsesSubject.OnNext(新SecondResponseReceived(){n = 42});

_requestsSubject.OnNext(新RequestSent(){n = 99});
_mainResponsesSubject.OnNext(新MainResponseReceived(){n = 99});
_secondaryResponsesSubject.OnNext(新SecondResponseReceived(){n = 99});
 

Situation

I have a system where one request produces two responses. The request and responses have corresponding observables:

IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;

it is guaranteed that RequestSent event occurs earlier than MainResponseReceived and SecondaryResponseReceived but the responses come in random order.

What I have

Originally I wanted handler that handles both responses, so I zipped the observables:

_requests
    .SelectMany(async request =>
    {
        var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
        var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);

        var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
            Request = request,
            Main = m, 
            Secondary = s
        });
        return await zippedResponse.FirstAsync(); ;
    })
    .Subscribe(OnMainAndSecondaryResponseReceived);

What I need

Now I need to handle also MainResponseReceived without waiting for SecondaryResponseRecieved and it must be guaranteed, that the OnMainResponseRecieved completes before OnMainAndSecondaryResponseReceived is called

How to define the two subscriptions, please?

Test case 1:

RequestSent occurs MainResponseReceived occurs -> OnMainResponseReceived is called SecondaryResponseReceived occurs -> OnMainAndSecondaryResponseReceived is called

Test case 2:

RequestSent occurs SecondaryResponseReceived occurs MainResponseReceived occurs -> OnMainResponseReceived is called -> OnMainAndSecondaryResponseReceived is called

解决方案

I think you're pretty much on the right track. I would stop mucking around with all the Async stuff - that's just making things complicated.

Try this query:

var query =
    _requests
        .SelectMany(request =>
            _mainResponses.Where(m => m.Id == request.Id).Take(1)
                .Do(m => OnMainResponseReceived(m))
                .Zip(
                    _secondaryResponses.Where(s => s.Id == request.Id).Take(1),
                    (m, s) => new MainAndSecondaryResponseReceived()
                    {
                        Request = request,
                        Main = m, 
                        Secondary = s
                    }));

var subscription =
    query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));

The .Do(...) is the important missing part in your code. It ensures that OnMainResponseReceived is called before OnMainAndSecondaryResponseReceived no matter if the main or secondary response comes in first.

I tested this with:

Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();

IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();

_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });

_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });