c# - Combining latest from an observable of observables -
suppose have set of uris monitoring availability. each uri either "up" or "down", , new uris monitor may added system @ time:
public enum connectionstatus { up, down } public class websitestatus { public string uri { get; set; } public connectionstatus status { get; set; } } public class program { static void main(string[] args) { var statusstream = new subject<websitestatus>(); test(statusstream); console.writeline("done"); console.readkey(); } private static void test(iobservable<websitestatus> statusstream) { } }
now suppose in test()
want reactively ascertain:
- whether uris down (as
bool
) - which uris down (as
ienumerable<string>
)
so test
end creating observable iobservable<tuple<bool, ienumerable<string>>>
bool
indicates whether uris down , ienumerable<string>
contains uris are.
how go this? initial thinking need group uri, combine latest each group list perform select
against. however, did not work out due way combinelatest
works.
edit: matthew's answer looked rxx , found implemented combinelatest
overload in fashion have expected in rx out of box, except needed change publishes when there single source stream being combined (by default waiting minimum of 2 source streams). also, can't justify pulling in 2mb of binaries sake of 1 method, have copy/pasted project. doing so, able solve follows:
private static void test(iobservable<websitestatus> statusstream) { statusstream .groupby(x => x.uri) .combinelatest() .select( x => { var down = x.where(y => y.status == connectionstatus.down); var downcount = down.count(); var downuris = down.select(y => y.uri).tolist(); return new { alldown = x.count == downcount, downuris = downuris }; }) .subscribe(x => { console.writeline(" sources down ({0}): {1}", x.alldown ? "that's of them" : "some still up", x.downuris.aggregate("", (y, z) => y += (z + " | "))); }); }
the neatest way use rxx extension in this answer. alternative below, keeps list of sites down/up.
var downstream = statusstream .aggregate<websitestatus, ienumerable<string>>(new string[0], (down, newstatus) => { if (newstatus.isup) return down.where(uri => uri != newstatus.uri); else if (!down.contains(newstatus.uri)) return down.concat(new string[] { newstatus.uri }); else return down; }); var upstream = statusstream .aggregate<websitestatus, ienumerable<string>>(new string[0], (up, newstatus) => { if (!newstatus.isup) return up.where(uri => uri != newstatus.uri); else if (!up.contains(newstatus.uri)) return down.concat(new string[] { newstatus.uri }); else return up; }); var alldown = upstream.select(up => !up.any());
Comments
Post a Comment