0

스트림의 필터 목록을 구성해야합니다.스트림의 사용자 정의 필터링

스트림에는 처음에는 필터가 없지만 시간이 지나면 활성 필터 목록이 있습니다.

각 필터에는 엄격한 유효 기간이 있습니다.

테스트 케이스는 다음과 같습니다

var scheduler = new TestScheduler(); 

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0200.Ms(), 'A'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0400.Ms(), 'B'), 
    ReactiveTest.OnNext(0500.Ms(), 'A'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

// filters 
// A between 70ms -> 550ms 
// B between 330ms -> 400ms 
// modeled as a string observable where: 
// first word is the char to filter 
// second word are the msecs duration of the filter 

var filters = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0070.Ms(), "A 480"), 
    ReactiveTest.OnNext(0330.Ms(), "B 70") 
); 

var expected = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

은 당신이 할 수있는 최선의 RX-솔루션 인 저를 제안 할 수 있습니까?

PS : 나는

public static class TickExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 
} 
+0

코드가 컴파일되지 않습니다. 나는'Ms()'가 길게 돌아올 것이라고 생각합니다. 'filters'는'CreateColdObservable '이어야합니다. –

+0

@LeeCampbell 맞아, 이걸 고쳤어. – zpul

답변

1

먼저 다음과 같은 확장 방법을 사용하고, 귀하의 의견은 다음과 같아야합니다 @LeeCampbell가 언급 한 바와 같이

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0200.Ms(), 'A'), 
    ReactiveTest.OnNext(0300.Ms(), '2'), 
    ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line 
    ReactiveTest.OnNext(0500.Ms(), 'A'), 
    ReactiveTest.OnNext(0600.Ms(), 'B'), 
    ReactiveTest.OnNext(0700.Ms(), '5'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(0900.Ms(), 'C')); 

또한, 귀하의 Ms 확장 메서드는 형식을 반환해야 long, 아니요 TimeSpan.

여기에 내가 생각 해낸 해결책이다 :

var activeFilters = filters 
    .Select(s => s.Split(' ')) 
    .Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1])))) 
    .Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1)) 
    .MergeCombineLatest(true) 
    .StartWith(new List<char>()); 

var output = activeFilters.Publish(_activeFilters => 
     input.Join(_activeFilters, 
      _ => Observable.Return(1), 
      t => _activeFilters, 
      (c, filterList) => Tuple.Create(c, filterList) 
     ) 
    ) 
    .Where(t => !t.Item2.Contains(t.Item1)) 
    .Select(t => t.Item1); 

var observer = scheduler.CreateObserver<char>(); 
output.Subscribe(observer); 
scheduler.Start(); 

ReactiveAssert.AreElementsEqual(
    expected.Messages, 
    observer.Messages); 

activeFilters 현재 필터에서 문자 목록을 방출이 관측이다. 활성 필터링 된 목록이 변경되면 activeFilters이 새 목록을 내 보냅니다. 동일한 문자가 여러 개의 필터를 가질 수 있으므로 목록이 반드시 고유하지는 않습니다.

주어진 시간에 어떤 필터가 활성 상태인지 파악한 후에는 해당 목록을 입력에 참여시킬 수 있습니다.

public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted) 
{ 
    return outer 
     .SelectMany((inner, i) => inner 
      .Materialize() 
      .SelectMany(nt => nt.Kind == NotificationKind.OnNext 
       ? Observable.Return(Tuple.Create(i, nt.Value, true)) 
       : nt.Kind == NotificationKind.OnCompleted 
        ? removeCompleted 
         ? Observable.Return(Tuple.Create(i, default(T), false)) 
         : Observable.Empty<Tuple<int, T, bool>>() 
        : Observable.Throw<Tuple<int, T, bool>>(nt.Exception) 
      ) 
     ) 
     .Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1)) 
     .Select(dict => dict.Values.ToList()); 
} 

MergeCombineLatest는 관찰 가능한의 관찰을 취하고 자식 관찰 가능한 각각의 최신 값 목록을 방출 :

코드는 System.Collections.Immutable Nuget 패키지를 사용하여이 확장 방법을 필요로한다. removeCompleted이 참일 경우 하위 관찰 가능 항목이 완료되면 목록이 1 씩 줄어 듭니다. removeCompleted이 false이면 마지막 값은 목록에 유지되고 연속적으로 영원히 나열됩니다.

이 작업을 수행하는 좀 더 친숙한 방법이있는 경우 많은 의무가 있습니다.

+0

매우 유용한 답변 주셔서 감사합니다 !! 최대한 빨리 확인하겠습니다. – zpul

+0

당신의 솔루션에서 영감을 얻어 다음과 같은 결과를 얻었습니다. https://gist.github.com/zpul/fcc82cb0e963ab69ebf7ae8fc7d95b2f 어떻게 생각하십니까? – zpul

+0

Rx (또는 기능 코드)의 장점은 스레드 안전 문제뿐만 아니라 부작용/상태 문제도 피할 수 있다는 것입니다. 'List '는 스레드로부터 안전하지 않습니다. https://msdn.microsoft.com/en-us/library/6sh2ey19.aspx#Anchor_10을 참조하십시오. 'excludeFilters' 변수를 우연히 조작 한 다른 기능뿐만 아니라 동시에 필터를 추가/제거하려고하면 코드에 문제가 발생할 수 있습니다. – Shlomo

관련 문제