c# - Rx produce and consume on different threads -


i have tried simplify issue sample code here. have producer thread pumping in data , trying batch time delay between batches ui has time render it. result not expected, produce , consumer seems on same thread.

i don't want batch buffer sleep on thread producing. tried subscribeon did not much. doing wrong here, how print different thread ids on producer , consumer thread.

static void main(string[] args) {     var stream = new replaysubject<int>();      task.factory.startnew(() =>     {         int seed = 1;         while (true)         {             console.writeline("thread {0} producing {1}",                 thread.currentthread.managedthreadid, seed);              stream.onnext(seed);             seed++;              thread.sleep(timespan.frommilliseconds(500));          }     });      stream.buffer(5).do(x =>     {         console.writeline("thread {0} sleeping create time gap between batches",             thread.currentthread.managedthreadid);          thread.sleep(timespan.fromseconds(2));     })     .subscribeon(newthreadscheduler.default).subscribe(items =>     {         foreach (var item in items)         {             console.writeline("thread {0} consuming {1}",                 thread.currentthread.managedthreadid, item);         }     });     console.read(); } 

understanding difference between observeon , subscribeon key here. see - observeon , subscribeon - work being done in depth explanation of these.

also, absolutely don't want use thread.sleep in rx. or anywhere. ever. do evil, thead.sleep totally evil. buffer has serveral overloads want use instead - these include time based overload , overload accepts count limit and time-limit, returning buffer when either of these reached. time-based buffering introduce necessary concurrency between producer , consumer - is, deliver buffer it's subscriber on separate thread producer.

also see these questions , answers have discussions on keeping consumers responsive (in context of wpf here, points applicable).

the last question above uses time-based buffer overload. said, using buffer or observeon in call chain allow add concurrency between producer , consumer. still need take care processing of buffer still fast enough don't queue building on buffer subscriber.

if queues build up, you'll need think means of applying backpressure, dropping updates and/or conflating updates. these big topic broad in depth discussion here - either:

see if proper buffering helps first, think throttling/conflating events @ source (a ui can show infomation anway) - consider smarter conflation can quite complex. https://github.com/adaptiveconsulting/reactivetrader example of project using advanced conflation techniques.


Comments

Popular posts from this blog

c++ - OpenMP unpredictable overhead -

ruby on rails - RuntimeError: Circular dependency detected while autoloading constant - ActiveAdmin.register Role -

javascript - Wordpress slider, not displayed 100% width -