坚持与接收观测的SelectManySelectMany

由网友(世人皆醒ヅ我独醉)分享简介:我的目标是通过FTP下载文件,并以某种方式处理它们是异步的。我打开的文件列表中的IObservable和使用的SelectMany组合子处理。里面有一些操作,以阻止下载文件:尝试下载文件的重试次数和返回元组,或在出现故障时,返回元组,并把它包装成观测。样品可观察retriable延迟后,我已经有并略作修改它。问题是...

我的目标是通过FTP下载文件,并以某种方式处理它们是异步的。 我打开的文件列表中的IObservable和使用的SelectMany组合子处理。里面有一些操作,以阻止下载文件:尝试下载文件的重试次数和返回元组,或在出现故障时,返回元组,并把它包装成观测。样品可观察retriable延迟后,我已经有并略作修改它。 问题是,下载几个文件后,我的code随机停止。有时它到达订阅方法OnNext回调。我从来没有发现code深远的onComplete回调。没有异常被抛出无论是。

My goal is to download files via ftp and somehow process them asynchronously. I turn list of files to IObservable and process it using SelectMany combinator. Inside there is some manipulations to download blocked files: try to download file with number of retries and return Tuple, or in case of failure return Tuple and wrap it into Observable. Sample of "Observable retriable after delay" I have taken there and slightly modified it. Problem is that my code randomly stops after downloading couple of files. Sometimes it reaches the "OnNext" callback in "Subscribe" method. I never have detected code reaching "OnComplete" callback. No exception were thrown either.

        files.ToObservable().SelectMany(f =>
        {
            var source = Observable.Defer(() => Observable.Start(() =>
            {
                ftpConnection.DownloadFile(avroPath, f.Name);
                return Tuple.Create(true, f.Name);
            }));
            int attempt = 0;
            return Observable.Defer(() => ((++attempt == 1)
                ? source
                : source.DelaySubscription(TimeSpan.FromSeconds(1))))
                .Retry(4)
                .Catch(Observable.Return(Tuple.Create(false, f.Name)));
        }).Subscribe(
            res =>
            {
                Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
                if (res.Item1) Process(res.Item2);
                else LogOrQueueOrWhatever(res.Item2);
            },
            (Exception ex) =>
            {
                Console.Write("Never was thrown");
            },
            () =>
            {
                Console.Write("Never entered this section");
                ProcessLogs();
                ScheduleNExtDownloadRoutine();
            });

我会很感激,如果有人将呈现更地道的方式来惹的观测量组合子。

I will be grateful if someone will show the more idiomatic way to mess with combinators on Observables.

推荐答案

由于布兰登提到,没有同步/确定可观察的行为后,阻止。所以,我对付它替换订阅与ForEachAsync打电话,转变可观察到的任务和阻止来电与任务的等待的方法:

As Brandon mentioned, there was no synchronization/blocking after defining observable's behavior. So I deal with it by replacing "Subscribe" call with "ForEachAsync", transforming that Observable to Task and blocking caller with Tasks's "Wait" method:

    files.ToObservable().SelectMany(f =>
    {
        var source = Observable.Defer(() => Observable.Start(() =>
        {
            ftpConnection.DownloadFile(avroPath, f.Name);
            return Tuple.Create(true, f.Name);
        }));
        int attempt = 0;
        return Observable.Defer(() => ((++attempt == 1)
            ? source
            : source.DelaySubscription(TimeSpan.FromSeconds(1))))
            .Retry(4)
            .Catch(Observable.Return(Tuple.Create(false, f.Name)));
    }).ForEachAsync(res =>
    {
        if (res.Item1) Process(res.Item2);
        else LogOrQueueOrWhatever(res.Item2);
    }).Wait();

    ProcessLogs();
    ScheduleNExtDownloadRoutine();
阅读全文

相关推荐

最新文章