1

저는 Angular2 앱을 제작 중이므로 Observables와 Reactive Extensions 전체에 익숙해 져 있습니다. TypeScript와 rxjs를 사용하고 있습니다.서로 다른 작업/작업으로 여러 Observables를 결합하십시오.

이제 일부 객체의 배열을 관측 할 수있는 스트림이나 스트림이 생겼습니다. Person 객체를 예로 들어 보겠습니다. 지금은 사람 - 객체의 두 스트림을 가지고 이러한 그래서 난 항상 최신 상태로 스트림 얻을 결합하려는 것 : 사람들 - 스트림의 배열을 방출하는 경우의 말 (5)을하자,

var people$ = getPeople();     // Observable<Person[]> 
var personAdded$ = eventHub.personAdded; // Observable<Person>; 
var personRemoved$ = eventHub.personRemoved // Observable<Person>; 

var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...); 

을 사람들은 personAdded-stream이 사람을 방출하면 allPeople-stream은 6의 배열을 방출합니다. personRemoved-stream이 사람을 방출하면 allPeople-stream은 Person 객체의 배열을 방금 배출하지 않고 방출해야합니다 personRemoved-stream에 의해 방출된다.

rxjs에는이 동작을위한 방법이 내장되어 있습니까?

답변

0

제 제안은 action의 아이디어를 스트림으로 결합한 다음 병합하여 Array에 직접 적용하는 것입니다.

첫 번째 단계는 당신의 행동을 설명하는 몇 가지 기능을 정의하는 것입니다 :

function add(people, person) { 
    return people.concat([people]); 
} 

function remove(people, person) { 
    const index = people.indexOf(person); 
    return index < 0 ? people : people.splice(index, 1); 
} 

참고 :이 예기치 못한 부작용이있을 수 있기 때문에 우리는 장소에 배열 돌연변이 마십시오. 순결은 우리가 대신 배열의 복사본을 생성 할 것을 요구합니다. 입력 및 출력 배열 될 것입니다 people => people :

const added$ = eventHub.personAdded.map(person => people => add(people, person)); 
const removed$ = eventHub.personAdded.map(person => people => remove(people, person)); 

이제 우리는 형태의 이벤트를 얻을 :

이제 우리는 기능을 방출하는 Observable을 만들려면 다음 함수를 사용하여 스트림을 올릴 수 (이 예에서는 문자열 배열로 단순화되었습니다.)

이제 어떻게 연결하나요? 즉, 함수 태닝을 방지 또는 사용 (필요에 따라이 기술의

const currentPeople = 

    // Resets this stream if a new set of people comes in 
    people$.switchMap(peopleArray => 

    // Merge the actions together 
    Rx.Observable.merge(added$, removed$) 

     // Pass in the starting Array and apply each action as it comes in 
     .scan((current, op) => op(current), peopleArray) 

     // Always emit the starting array first 
     .startWith(people) 
) 
    // This just makes sure that every new subscription doesn't restart the stream 
    // and every subscriber always gets the latest value 
    .shareReplay(1); 

가 몇 가지 최적화 : 그럼 우리가 정말에만 후 이러한 이벤트 를 추가하거나 제거하는 방법에 대한 관심이 우리에 적용 할 수있는 배열을 이진 검색),하지만 위의 경우는 비교적 일반적인 경우에 대해 비교적 우아합니다.

+0

좋은 설명, 나는 이것을 시험해 볼 것입니다. 그러나 나는 당신이 말하는 것을 얻습니다! 나는 이것이 사람들이 더 자주하고 싶어하는 것이지 그렇지 않다고 생각할 것이다. – YentheO

1

모든 스트림 (Ghostbusters 스타일)을 병합 한 다음 스캔 작업자를 사용하여 상태를 파악하고 싶습니다. 스캔 연산자는 Javascript reduce처럼 작동합니다. 여기

당신은 데이터의 구조를 언급하지 않았다
const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4']; 

const initialPeople$ = Rx.Observable.from(initialPeople); 

const addPeople = ['Person 5', 'Person 6', 'Person 7']; 

const addPeople$ = Rx.Observable.from(addPeople) 
      .concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async 

const removePeople = ['Person 2x', 'Person 4x']; 

const removePeople$ = Rx.Observable.from(removePeople) 
               .delay(5000) 
               .concatMap(x => Rx.Observable.of(x).delay(1000)); 

const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$) 

mergedStream$ 
    .scan((acc, stream) => { 
     if (stream.includes('x') && acc.length > 0) { 
      const index = acc.findIndex(person => person === stream.replace('x', '')) 
      acc.splice(index, 1); 
     } else { 
      acc.push(stream); 
     } 
     return acc; 
    }, []) 
    .subscribe(x => console.log(x)) 

// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"] 

http://jsbin.com/rozetoy/edit?js,console

... 데모입니다. 내 "x"를 깃발로 사용하는 것은 다소 복잡하고 문제가 많습니다. 하지만 데이터에 맞게 스캔 연산자를 수정하는 방법을 알고 있다고 생각합니다.

관련 문제