2013-11-20 2 views
0

여기 Rx를 사용하여 .NET에서 Observable Sequence가 있습니다.Observables 병합

var aSource = new Subject<int>(); 

var bSource = new Subject<int>(); 

var paired = Observable 
      .Merge(aSource, bSource) 
    .GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1)); 

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1))); 

aSource.OnNext(4); 
bSource.OnNext(1); 
aSource.OnNext(2); 
bSource.OnNext(5); 
aSource.OnNext(3); 
bSource.OnNext(3); 
aSource.OnNext(5); 
bSource.OnNext(2); 
aSource.OnNext(1); 
bSource.OnNext(4); 

출력 : 3 : 3 5 : 5 2 : 2 1 : 1 4 : 4

우리는 이벤트에게 한 쌍의 숫자가 같은 ID로 도착할 때마다 얻을 것이다.

Perfect! 내가 원하는거야.

값으로 쌍을 이루는 그룹입니다. 값 시퀀스의 selectmany/버퍼를 얻을 수있는 방법

다음 질문입니다 ....

.

그래서 1,2,3,4,5는 OnNext()를 통해 aSource와 bSource에 모두 도착합니다. 그런 다음 ConsoleWriteLine()에 1-5를 실행합니다. 그런 다음 2,3,4,5,6이 도착하면 다른 console.writeline()을 얻습니다. 누구 한테 단서?

즉시 수신 포럼 표면에 완벽하게 보이는()

http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

.Window 보는 것이 좋습니다. 내 경우에는이 경우 값 4의 창이 필요합니다.

쿼리 시퀀스에서이 효과를 얻으려면 어디에 속합니까?

var paired = Observable.Merge (aSource, bSource) .GroupBy (i => i) .SelectMany (g => g.Buffer (2) .Take (1));

출력 1,2,3,4,5 : 1,2,3,4,5 2,3,4,5,6 : 2,3,4,5,6

감사합니다 ,

다니엘

+0

두 번째 부분은 각 소스에서 순서대로 번호가 도착합니까? 아니면 임의의 순서? –

+0

그것은 무작위 일 수 있습니다. 그것들은 다양한 길이의 긴 프로세스의 결과입니다. – WebSight

답변

1

가정 이벤트가 순서대로 이벤트를 얻을 "Reordering events with Reactive Extensions"에 내 대답을 사용하여 소스에서 무작위로 도착합니다.

이어서 슬라이딩 버퍼를 생성하도록 Observable.Buffer을 사용

여기
// get this using the OrderedCollect/Sort in the referenced question 
IObservable<int> orderedSource; 

// then subscribe to this 
orderedSource.Buffer(5, 1); 
+0

감사합니다. 오늘 밤 집에 올 때 이것을 시도 할 것입니다. 위의 편집에 따라 Window()가 정렬 된 시퀀스에 있다고 가정합니다. – WebSight

+0

올바른 'Window'는 스트림 스트림을 제공하고, 'Buffer'는리스트 스트림을 제공합니다. 특히, 닫을 때 버퍼 만 가져 오지만 창 스트림은 즉시 전송을 시작합니다. –

+0

맞아,이 공간을 보아라. 나는 더 이상의 이슈와 함께 여기에 결과를 게시 할 것이다. James에게 다시 한 번 감사드립니다. – WebSight

0

내선 방법이 화재가 n 개의 동일한 ID를 입력한다.

public static class RxExtension 
    { 

     public static IObservable<TSource> MergeBuffer<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector, Func<IList<TSource>,TSource> mergeFunction, int bufferCount) 
     { 
      return Observable.Create<TSource>(o => { 
       var buffer = new Dictionary<int, IList<TSource>>(); 
       return source.Subscribe<TSource>(i => 
       { 
        var index = keySelector(i); 
        if (buffer.ContainsKey(index)) 
        { 
         buffer[index].Add(i); 
        } 
        else 
        { 
         buffer.Add(index, new List<TSource>(){i}); 
        } 
        if (buffer.Count==bufferCount) 
        { 
         o.OnNext(mergeFunction(buffer[index])); 
         buffer.Remove(index); 
        } 
       }); 
      }); 
     } 
    } 

내선 번호로 전화 거는 중입니다.

mainInput = Observable.Merge(inputNodes.ToArray()).MergeBuffer<NodeData>(x => x.id, x => MergeData(x), 1);