c# - Combining latest from an observable of observables -


suppose have set of uris monitoring availability. each uri either "up" or "down", , new uris monitor may added system @ time:

public enum connectionstatus {     up,     down }  public class websitestatus {     public string uri     {         get;         set;     }      public connectionstatus status     {         get;         set;     } }  public class program {     static void main(string[] args)     {         var statusstream = new subject<websitestatus>();         test(statusstream);          console.writeline("done");         console.readkey();     }      private static void test(iobservable<websitestatus> statusstream)     {     } } 

now suppose in test() want reactively ascertain:

  • whether uris down (as bool)
  • which uris down (as ienumerable<string>)

so test end creating observable iobservable<tuple<bool, ienumerable<string>>> bool indicates whether uris down , ienumerable<string> contains uris are.

how go this? initial thinking need group uri, combine latest each group list perform select against. however, did not work out due way combinelatest works.

edit: matthew's answer looked rxx , found implemented combinelatest overload in fashion have expected in rx out of box, except needed change publishes when there single source stream being combined (by default waiting minimum of 2 source streams). also, can't justify pulling in 2mb of binaries sake of 1 method, have copy/pasted project. doing so, able solve follows:

private static void test(iobservable<websitestatus> statusstream) {     statusstream         .groupby(x => x.uri)         .combinelatest()         .select(             x =>             {                 var down = x.where(y => y.status == connectionstatus.down);                 var downcount = down.count();                 var downuris = down.select(y => y.uri).tolist();                  return new                 {                     alldown = x.count == downcount,                     downuris = downuris                 };             })         .subscribe(x =>         {             console.writeline("    sources down ({0}): {1}", x.alldown ? "that's of them" : "some still up", x.downuris.aggregate("", (y, z) => y += (z + " | ")));         }); } 

the neatest way use rxx extension in this answer. alternative below, keeps list of sites down/up.

var downstream = statusstream     .aggregate<websitestatus, ienumerable<string>>(new string[0], (down, newstatus) =>     {         if (newstatus.isup)             return down.where(uri => uri != newstatus.uri);         else if (!down.contains(newstatus.uri))             return down.concat(new string[] { newstatus.uri });         else             return down;     });  var upstream = statusstream     .aggregate<websitestatus, ienumerable<string>>(new string[0], (up, newstatus) =>     {         if (!newstatus.isup)             return up.where(uri => uri != newstatus.uri);         else if (!up.contains(newstatus.uri))             return down.concat(new string[] { newstatus.uri });         else             return up;     });  var alldown = upstream.select(up => !up.any()); 

Comments

Popular posts from this blog

python - Subclassed QStyledItemDelegate ignores Stylesheet -

java - HttpClient 3.1 Connection pooling vs HttpClient 4.3.2 -

SQL: Divide the sum of values in one table with the count of rows in another -