由网友(し蔓珠唦譁の霺笶︶)分享简介:我注意到RxMerge运算符接受一个可选的maxConcurrent参数。这可用于通过并发订阅有限数量的子序列来限制最大并发性。当推送新的子序列的速度慢于订阅的子序列的完成速度时,它工作得很好,但当推送新的子序列的速度快于此速度时,它就会变得有问题。发生的情况是,子序列被缓冲在大小不断增加的内部缓冲区中,并且当前订阅的...
我注意到RxMerge
运算符接受一个可选的maxConcurrent
参数。这可用于通过并发订阅有限数量的子序列来限制最大并发性。当推送新的子序列的速度慢于订阅的子序列的完成速度时,它工作得很好,但当推送新的子序列的速度快于此速度时,它就会变得有问题。发生的情况是,子序列被缓冲在大小不断增加的内部缓冲区中,并且当前订阅的子序列也变得越来越陈旧。以下是此问题的演示:
await Observable
.Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
.Select(_ => Observable
.Return(DateTime.Now)
.Do(d => Console.WriteLine(
$"Then: {d:HH:mm:ss.fff}, " +
$"Now: {DateTime.Now:HH:mm:ss.fff}, " +
$"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
.Delay(TimeSpan.FromMilliseconds(1000)))
.Merge(maxConcurrent: 1)
.Take(10);
每隔10毫秒推送一个新的子序列,每个子序列在1000毫秒后完成。子序列以最大并发数1合并(按顺序)。
输出:
Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes
(Try it on Fiddle)
内存使用量稳步增长,每个子序列的创建和订阅之间的时间间隔也越来越大。
我想要的是一个自定义的Merge
变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入的子序列都应该导致丢弃当前最旧的缓冲子序列。下面是所需行为的大理石图,配置了最大并发=1和缓冲区容量=1:
Source: +----A------B------C------|
A: +-------a----a---|
B: not-subscribed
C: +-----c----|
Result: +------------a----a---------c----|
子序列A在发出后立即被订阅。
然后发出B并将其存储在缓冲区中,因为A尚未完成。
然后发出C并替换缓冲区中的B。因此,B子序列被删除,并且从未被订阅。
完成子序列A之后,立即订阅缓冲子序列C。
最终结果包含A和C子序列发出的合并值。
如何实现具有此特定行为的自定义Rx运算符?下面是我尝试实现的操作符的存根:
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return source.Merge(maximumConcurrency);
// TODO: enforce the boundedCapacity policy somehow
}
推荐答案
我想出了一个功能解决方案,但由于复杂性,我不确定它是否可行。但我想我已经做好了所有的准备。
首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前有多少可观测对象正在执行和缓冲区队列。可能影响状态的两个事件是新的可观察对象进入缓冲区队列(导致缓冲区队列上的入队),或当前正在执行的可观察对象终止(导致缓冲区队列上的出队)。 由于状态机基本上意味着Scan
,并且Scan
只能处理一种类型,因此我们必须将两个事件强制为一种类型,下面我称之为Message
。然后状态机知道所有情况,并可以执行Merge(n)
重载的工作。
最后一个技巧是环回:因为完成的可观测对象是Scan
的‘下游’,所以我们需要将该可观测对象的终止‘环回’到Scan
中。为此,我总是参考[This Answer][1]中的Drain
函数。
public static class X
{
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return Observable.Defer(() =>
{
var capacityQueue = new Subject<Unit>();
var toReturn = source.Publish(_source => _source
.Select(o => Message.Enqueue(o))
.Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
.Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
{
var buffer = state.buffer;
var bufferCount = state.bufferCount;
var executionCount = state.executionCount;
if (message.IsEnqueue)
{
if (executionCount < maximumConcurrency)
return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);
buffer = buffer.Enqueue(message.Object);
if (bufferCount == boundedCapacity)
buffer = buffer.Dequeue();
else
bufferCount++;
return (bufferCount, buffer, executionCount, null);
}
else
{
if (bufferCount == 0)
return (0, buffer, executionCount - 1, null);
else
return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
}
})
.Where(t => t.item != null)
.Select(t => t.item)
.Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
.TakeUntil(_source.IgnoreElements().Materialize())
.Merge()
);
return toReturn;
});
}
public class Message
{
public static Message<T> Enqueue<T>(T t)
{
return Message<T>.Enqueue(t);
}
public static Message<T> Dequeue<T>(T t)
{
return Message<T>.Dequeue(t);
}
}
public class Message<T>
{
private readonly T _t;
private readonly bool _isEnqueue;
private Message(bool isEnqueue, T t)
{
_t = t;
_isEnqueue = isEnqueue;
}
public static Message<T> Enqueue(T t)
{
return new Message<T>(true, t);
}
public static Message<T> Dequeue(T t)
{
return new Message<T>(false, t);
}
public bool IsEnqueue => _isEnqueue;
public T Object => _t;
}
}
我编写了一些测试代码(基于原始问题)来验证,如果您想利用它的话。测试正在通过:
// T: 0123456789012345678901234567890123
// T10: 0 1 2 3
// Source: +----A------B------C------|
// A: +-------a----a---|
// B: +----------b----b---|
// C: +--------c----|
// ExpectedResult: +------------a----a---------c----|
var ts = new TestScheduler();
var A = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual"); // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
(测试代码无异常通过)
相关推荐
最新文章