[rxjs] Creating An Observable with RxJS
with an Rxjs creating Observable
2023-09-14 09:00:55 时间
Create an observable
var Observable = Rx.Observable; var source = Observable.create(function(observe){ var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }); var sub = source.subscribe(function onNext(person){ console.log(person.name + ' say ' + person.message); }, function onError(err){ console.log(err); }, function onCompleted(){ console.log("done"); }); //Zhentian say Hello World! //done
Async
var Observable = Rx.Observable; var source = Observable.create(function(observe){ setTimeout(function(){ var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }, 1000); console.log("ansyc finished!"); }); var sub = source.subscribe(function onNext(person){ console.log(person.name + ' say ' + person.message); }, function onError(err){ console.log(err); }, function onCompleted(){ console.log("done"); }); //"ansyc finished!" //"Zhentian say Hello World!" //"done"
Dispose the async
When you dispose the operation, we can see it log out "start timeout", which is not good, because, the onNext() would never be called, what we want is it even don't get inside setTimeout function.
var Observable = Rx.Observable; var source = Observable.create(function(observe){ setTimeout(function(){ console.log("Starat timeout"); var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }, 1000); console.log("ansyc finished!"); }); var sub = source.subscribe(function onNext(person){ console.log(person.name + ' say ' + person.message); }, function onError(err){ console.log(err); }, function onCompleted(){ console.log("done"); }); setTimeout(function(){ sub.dispose(); }, 500); /* "ansyc finished!" "Starat timeout" */
Define the dispose
We can give setTimeout and id, and in the return function, we clear this timeout.
var Observable = Rx.Observable; var source = Observable.create(function(observe){ var id = setTimeout(function(){ console.log("Starat timeout"); var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }, 1000); console.log("ansyc finished!"); // Note that this is optional, you do not have to return this if you require no cleanup return function(){ clearTimeout(id); } }); var sub = source.subscribe(function onNext(person){ console.log(person.name + ' say ' + person.message); }, function onError(err){ console.log(err); }, function onCompleted(){ console.log("done"); }); setTimeout(function(){ sub.dispose(); }, 500); /* "ansyc finished!" */
Catch error
If we throw an error in the code, but we found it actually not catched by the onError handler.
var Observable = Rx.Observable; var source = Observable.create(function(observe){ var id = setTimeout(function(){ throw "there is an error"; //Throw an error here var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }, 1000); // Note that this is optional, you do not have to return this if you require no cleanup return function(){ clearTimeout(id); } }); var sub = source.subscribe(function onNext(person){ console.log(person.name + ' say ' + person.message); }, function onError(err){ console.log("Error: " + err); }, function onCompleted(){ console.log("done"); }); /* "error" "Uncaught there is an error (line 6)" */
What we can do is to add try catch in the block.
var Observable = Rx.Observable; var source = Observable.create(function(observe){ var id = setTimeout(function(){ try{ throw "there is an error"; //Throw an error here var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }catch(error){ observe.onError(error); } }, 1000); // Note that this is optional, you do not have to return this if you require no cleanup return function(){ clearTimeout(id); } }); var sub = source.subscribe(function onNext(person){ console.log(person.name + ' say ' + person.message); }, function onError(err){ console.log("Error: " + err); }, function onCompleted(){ console.log("done"); }); /* "Error: there is an error" */
相关文章
- [Android Pro] 完美解决 No toolchains found in the NDK toolchains folder for ABI with prefix: mips64el-linux-android
- [Functional Programming] Discard the High and Low Values From an Array of Numbers with Ramda
- [Node.js] Mock an API for Local Development in React with Mirage JS
- [XState] Multiple Simultaneous States with Parallel States
- [Javascript] Wrap an API with a Proxy
- [PWA] Add Push Notifications to a PWA with React in Chrome and on Android
- [Algorithms] Divide and Recurse Over an Array with Merge Sort in JavaScript
- [Python] Working with file
- [Nuxt] Navigate with nuxt-link and Customize isClient Behavior in Nuxt and Vue.js
- [React] Normalize Events with Reacts Synthetic Event System
- [RxJS] Split an RxJS observable conditionally with windowToggle
- [Webpack 2] Tree shaking with Webpack 2
- [Redux] Navigating with React Router <Link>
- [Javascript] Automating Releases with semantic-release
- [Javascript] Manage Application State with Immutable.js
- [CSS 3] Add a Cutout Notch to an HTML Element with a CSS Polygon Clip-Path
- [RxJS] Build an Event Combo Observable with RxJS (takeWhile, takeUntil, take, skip)
- [Angular 8] Lazy loading with dynamic loading syntax
- [Algorithm] Tree Width with Level Width
- [Javascript] Create an Image with JavaScript Using Fetch and URL.createObjectURL
- [Algorithms] Divide and Recurse Over an Array with Merge Sort in JavaScript
- [Tool] Open Multiple Terminal Tabs on npm Start with ttab and npm-run-all
- [Parcel] Bundle a React App with Parcel
- [AngularFire] Firebase OAuth Login With Custom Firestore User Data
- [D3] Build an Area Chart with D3 v4
- [RxJS] Split an RxJS observable with window
- [RxJS] Updating Data with Scan
- 成功解决ImportError: Something is wrong with the numpy installation. While importing we detected an olde
- 成功解决frozen importlib._bootstrap“, line 2 in _call_with_frames_removed ImportError: DLL lo(.dll文件简介)
- 【第12篇】Sparse R-CNN: End-to-End Object Detection with Learnable Proposals
- The differentiation program with abstract data
- 论文解读(PGNN)《Rumor detection based on propagation graph neural network with attention mechanism》
- CentOS安装mysql*.rpm提示conflicts with file from package的解决的方法
- 开发报错记录解决(三):编译python出现“SyntaxError: Non-UTF-8 code starting with ‘xcc‘ in file D”的统一解决办法