c# - Removal of nested observable from stream -


i have class observablecollection<t> represents changing collection:

public interface iobservablecollection<t> : iobservable<ienumerable<t>> {     void add(t item);     void remove(t item); } 

when item added or removed, internally subject<ienumerable<t>> has onnext method called new ienumerable<t> exposed through subscribe method of iobservablecollection<t>.

i have class person:

public interface iperson {     string name { get; }     iobservable<int> position { get; } } 

what want produce stream of ienumerable<tuple<string, int>> representing each person's position, person in collection. seems relatively straightforward:

var peoplecollectionstream = new observablecollection<iperson>();  var peoplepositions = people in peoplecollectionstream                       updatelist in                           (from person in people                            select person.position.select(pos => tuple.create(person.name, pos)))                            .combinelatest()                       select updatelist; 

i can subscribe stream so:

peoplepositions     .subscribe(people =>     {         console.writeline("something updated");         foreach (var persontuple in people)             console.writeline("{0} -> {1}", persontuple.item1, persontuple.item2);     }); 

and desired output:

var alice = new person() { name = "alice" }; peoplecollectionstream.add(alice);        // alice -> 0 alice.move(2);                            // alice -> 2 var bob = new person() { name = "bob" }; peoplecollectionstream.add(bob);          // alice -> 2, bob -> 0 bob.move(3);                              // alice -> 2, bob -> 3 

the problem arises when wish remove person collection, , therefore exclude updates stream:

peoplecollectionstream.remove(bob);       // alice -> 2 bob.move(4);                              // alice -> 2, bob -> 4 

i want stop bob's position updates being included if removed collection. how can this?

i've found trying work add , remove events bad idea, if want these sorts of functional things. matching removes adds, , making sure underlying code too, lot of work.

what instead use perishable items / collections. pair each item lifetime (cancellation token), , item considered removed when lifetime ends. use lifetimes when wiring other things. use collection type, called perishablecollection<t>, takes items paired lifetimes , allows watch contents iobservable<perishable<t>>.

i wrote blog post perishable collections, , published nuget library can reference.

here's code should flatten perishable collection of perishable collections:

public static perishablecollection<t> flattened<t>(this perishablecollection<perishablecollection<t>> collectionofcollections, lifetime lifetimeofresult) {     if (collectionofcollections == null) throw new argumentnullexception("collectionofcollections");      var flattenedcollection = new perishablecollection<t>();     collectionofcollections.currentandfutureitems().subscribe(         c => c.value.currentandfutureitems().subscribe(              // onitem: include in result, prevent lifetimes exceeding source's lifetime             e => flattenedcollection.add(                 item: e.value,                 lifetime: e.lifetime.min(c.lifetime)),              // subscription c ends when c's lifetime ends or result no longer needed             c.lifetime.min(lifetimeofresult)),          // subscription ends when result no longer needed         lifetimeofresult);      return flattenedcollection; } 

the above works subscribing receive collections added collection of collections, each of subscribing receive items. items placed resulting collection, lifetime ends when either item dies or collection dies. subscriptions die when lifetime given method dies.

another way approach problem write method flatten iobservable<perishable<iobservable<perishable<t>>>>. have benefit of not requiring caller manage lifetime of result explicitly , being applicable in more situations. however, method lot harder write because must deal sequences failing/completing in thread-safe way.

here's example of using flatten method (make new console application, reference perishable collections, paste in above method , one):

using twistedoak.collections; using twistedoak.util;  static void main() {     var p = new perishablecollection<perishablecollection<string>>();     var f = p.flattened(lifetime.immortal);     f.currentandfutureitems().subscribe(e => {         console.writeline("{0} added flattened", e.value);         e.lifetime.whendead(() => console.writeline("{0} removed flattened", e.value));     });      // add 'c' items f via p     var c = new perishablecollection<string>();     var clife = new lifetimesource();     c.add("candy", lifetime.immortal);     p.add(c, clife.lifetime);     c.add("cane", lifetime.immortal);      // add 'd' items f via p     var d = new perishablecollection<string>();     p.add(d, lifetime.immortal);     d.add("door", lifetime.immortal);     d.add("dock", lifetime.immortal);       // should remove c's items f via removing c p     clife.endlifetime(); } 

the code should output:

candy added flattened cane added flattened door added flattened dock added flattened candy removed flattened cane removed flattened 

hopefully that's enough started down easier path.


Comments