[RxJS] Combining Streams with CombineLatest
with Rxjs Streams
2023-09-14 09:00:54 时间
Two streams often need to work together to produce the values you’ll need. This lesson shows how to use an input stream and an interval stream together and push an object with both values through the stream.
const Observable = Rx.Observable; const startButton = document.querySelector('#start'); const halfButton = document.querySelector('#half'); const quarterButton = document.querySelector('#quarter'); const input = document.querySelector("#input"); const stopButton = document.querySelector('#stop'); const resetButton = document.querySelector('#reset'); const data = {count:0}; const inc = (acc)=> ({count: acc.count + 1}); const reset = (acc)=> data; const start$ = Observable.fromEvent(startButton, 'click'); const half$ = Observable.fromEvent(halfButton, 'click'); const quarter$ = Observable.fromEvent(quarterButton, 'click'); const stop$ = Observable.fromEvent(stopButton, 'click'); const reset$ = Observable.fromEvent(resetButton, 'click'); const starters$ = Observable.merge( start$.mapTo(1000), half$.mapTo(500), quarter$.mapTo(250) ); const intervalActions = (time) => { return Observable.merge( Observable.interval(time) .takeUntil(stop$) .mapTo(inc), reset$.mapTo(reset) ) }; const timer$ = starters$ .switchMap(intervalActions) .startWith(data) .scan( (acc, curr) => curr(acc)); const input$ = Observable.fromEvent(input, "input") .map(ev=>ev.target.value); Observable.combineLatest( timer$, input$ ) // We will get an array combineLatest .map((array)=>{ return {count: array[0].count, input: array[1]} }) .subscribe(x => console.log(x))
They last param of combineLatest is a result function, which can parse the result in the fucntion:
Observable.combineLatest( timer$, input$, (timer, input)=>{ return {count: timer.count, input} } ) .subscribe(x => console.log(x))
相关文章
- [RxJS] Automate unsubscribe process with takeUntil
- [SCSS] Access Theme Color Values With Sass
- [RxJS] Loading Spinner with busyDelayMs & busyMinDurationMs api
- [React Testing] Test your Custom Hook Module with react-hooks-testing-library
- [Recompose] Handle React Events as Streams with RxJS and Recompose
- [React] Make Controlled React Components with Control Props
- [RxJS] Split an RxJS Observable into groups with groupBy
- [Angular 2] Managing State in RxJS with StartWith and Scan
- [RxJS] Sharing Streams with Share
- [RxJS] Handling a Complete Stream with Reduce
- [RxJS] Combining Streams with CombineLatest
- [RxJS] Reactive Programming - Rendering on the DOM with RxJS
- [Javascript + rxjs] Simple drag and drop with Observables
- [Unit testing RxJS] Test error handling with marbles
- [RxJS] Loading Spinner with busyDelayMs & busyMinDurationMs api
- [Angular RxJS] Single data observable pattern (combineLatest with startWith)
- [CLI] Create a Hybrid Single-Multi Command Node.js CLI with Oclif and TypeScript
- [Compose] 9. Delay Evaluation with LazyBox
- [Node.js] Write or Append to a File in Node.js with fs.writeFile and fs.writeFileSync
- [Vue-rx] Handle Image Loading Errors in Vue.js with RxJS and domStreams
- [Recompose] Compose Streams of React Props with Recompose’s compose and RxJS
- [Recompose] Handle React Events as Streams with RxJS and Recompose
- [RxJS] Multicast with a selector argument, as a sandbox
- [TypeScript] Understanding Generics with RxJS
- [RxJS] Handling Multiple Streams with Merge
- [RxJS] Updating Data with Scan
- [RxJS] Reactive Programming - Sharing network requests with shareReplay()
- [RxJS] Aggregating Streams With Reduce And Scan using RxJS
- [rxjs] Creating An Observable with RxJS
- [Javascript + rxjs] Using the map method with Observable
- [Angular-Scaled web] 3. Basic State with ui-router
- 成功解决ImportError: Something is wrong with the numpy installation. While importing we detected an olde
- 100天精通Python(基础篇)——第24天:with...as语句介绍
- view(-1)函数调用报错view size is not compatible with input tensor‘s size and stride