c# - Rx produce and consume on different threads -
i have tried simplify issue sample code here. have producer thread pumping in data , trying batch time delay between batches ui has time render it. result not expected, produce , consumer seems on same thread.
i don't want batch buffer sleep on thread producing. tried subscribeon
did not much. doing wrong here, how print different thread ids on producer , consumer thread.
static void main(string[] args) { var stream = new replaysubject<int>(); task.factory.startnew(() => { int seed = 1; while (true) { console.writeline("thread {0} producing {1}", thread.currentthread.managedthreadid, seed); stream.onnext(seed); seed++; thread.sleep(timespan.frommilliseconds(500)); } }); stream.buffer(5).do(x => { console.writeline("thread {0} sleeping create time gap between batches", thread.currentthread.managedthreadid); thread.sleep(timespan.fromseconds(2)); }) .subscribeon(newthreadscheduler.default).subscribe(items => { foreach (var item in items) { console.writeline("thread {0} consuming {1}", thread.currentthread.managedthreadid, item); } }); console.read(); }
understanding difference between observeon
, subscribeon
key here. see - observeon , subscribeon - work being done in depth explanation of these.
also, absolutely don't want use thread.sleep
in rx. or anywhere. ever. do
evil, thead.sleep
totally evil. buffer has serveral overloads want use instead - these include time based overload , overload accepts count limit and time-limit, returning buffer when either of these reached. time-based buffering introduce necessary concurrency between producer , consumer - is, deliver buffer it's subscriber on separate thread producer.
also see these questions , answers have discussions on keeping consumers responsive (in context of wpf here, points applicable).
- process lots of small tasks , keep ui responsive
- buffer data database cursor while keeping ui responsive
the last question above uses time-based buffer overload. said, using buffer
or observeon
in call chain allow add concurrency between producer , consumer. still need take care processing of buffer still fast enough don't queue building on buffer subscriber.
if queues build up, you'll need think means of applying backpressure, dropping updates and/or conflating updates. these big topic broad in depth discussion here - either:
- drop events. there have been many ways discussed tackle in rx. current ignore incoming stream updates if last callback hasn't finished yet see with rx, how ignore all-except-the-latest value when subscribe method running , there many other discussions of this.
- signal producer out of band tell slow down or send conflated updates, or
- you introduce operator in-stream conflation - smarter
buffer
compress events to, example, include latest price on stock item etc. can author operators sensitive timeonnext
invocations take process, example.
see if proper buffering helps first, think throttling/conflating events @ source (a ui can show infomation anway) - consider smarter conflation can quite complex. https://github.com/adaptiveconsulting/reactivetrader example of project using advanced conflation techniques.
Comments
Post a Comment