17

RxJS进阶——关于流的理解和应用

 5 years ago
source link: http://yalishizhude.github.io/2018/09/30/rxjs-stream/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RxJS是微软公司推出的响应式编程的JavaScript库。

对于它的学习,最开始我的理解是把它当成是 能优雅地解决异步问题的lodash

随着学习的深入,发现它采用了订阅者模式,其中也带有纯函数的思想。

直到在使用了RxJS 6之后才了解其少有人意识到的另一面——流。

什么是流?这里我们不用专业术语来解释,用生活中大家熟悉的的例子来类比,比如“河流”。

河流有什么特点?

至少有两个特点:

有朝向。

水往低处流,河流虽然可能会蜿蜒盘旋,但是朝向是固定的,比如我国的长江和黄河就都是由西往东流。

在RxJS中数据的流向也是固定的,就是从发送者到订阅者。基本都如下面这种形式:

from(Promise.resolve(1)) // 流的源头
......
.subscribe(x => console.log(x)); // 流的终点

有分支。

大的河流一般有干流和支流,大大小小的支流汇入干流。

RxJS中的数据则可以通过操作符将数据流进行聚合或拆分。

// 流的聚合
mergeMap(from(Promise.resolve(1)), from(Promise.resolve(2)))
......
.subscribe(x => console.log(x))

// 流的拆分
const obs$ = from(Promise.resolve(1)
obs$.subscribe(x => console.log(x))
obs$.subscribe(x => {
  // do sth
})/

RxJS 6 相对于 RxJS 5(这里指5.5以下的版本,因为pipe函数在RxJS 5.5中作为新特性已被引入。) 来说不仅修改了一部分操作符的名称,同时做了一个较大的改动,引入了管道(pipe)。这个改动到底有多大?

首先是写法上的变化。

RxJS 5的这种操作符的调用方式有没有一种似曾相识的感觉?

是的,它看上去很像JQuery那种意大利面条式的链式调用

而RxJS 6和Gulp的写法有些像,想想Gulp是什么?基于流的构建工具!

// RxJS 5 伪代码
myObservable
  .map(data => data * 2)
  .switchMap(...)
  .throttle(...))
  .subscribe(...);

// RxJS 6 伪代码
myObservable
  .pipe(map(data => data * 2), switchMap(...), throttle(...))
  .subscribe(...);

这种写法上的变化就带来了用法上的变化,以前的固定“河流”可以通过“管道”(pipe)来控制形成灵活的“水流”。

下面举个例子来更加形象地阐述加入管道之后流的灵活性。

现在有一个这样的业务场景:

点击按钮之后发送一个请求,让服务端开始执行任务,然后轮询发送请求查询任务执行状态,根据不同状态进行不同操作。有3种状态”controlling”——继续轮询, “stop”——停止轮询,”finish”——停止轮询,并进行后续操作。

不考虑判断条件,伪代码是下面这样子:

// 开始任务
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  switchMap(() => getStatus$()), // 查询状态
  )
.subscribe(x => {
  // 后续操作
})

这段代码有一个问题没有解决,根据状态进行相应操作。

先来看看这3种状态对应的操作。

  • controlling。继续轮询很好处理,不进行任何操作即可。
  • stop。停止轮询的操作符有3个: take ,需要固定次数,这个次数没法预先确定。 takeUntil ,需要创建一个额外的subject来进行停止,应该可以实现,不过代码量比较大。 takeWhile ,只需简单的逻辑判断即可,比较合适。
  • finish。问题来了, 如过我们在管道操作符中判断状态的并停止流的话,那么订阅者将无法收到消息,意味着后续操作无法执行。

解决方法就是把后续操作放到管道中。代码如下:

// 开始任务
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  switchMap(() => getStatus$()), // 查询状态
  filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操作
  takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
  tap(() => {
    // 后续操作
  })
  takeWhile(() => false) // 操作完成结束轮询
  )
.subscribe();

现在需求变化了,在另一段代码中,我们也要通过查询状态并根据状态进行,但是不再需要开始任务和轮询了。

那么上面的代码查询和操作部分可以利用pipe方法抽取出来。

const handle = pipe(
  switchMap(() => getStatus$()), // 查询状态
  filter(x => x==='stop' || x==='finish') // 'controlling'状态下继续轮询,其它状态进行对应操作
  takeWhile(x => x!=='stop') // 当为'stop'时结束轮询
  tap(() => {
    // 后续操作
  })
  takeWhile(() => false) // 操作完成结束轮询
)
start$().pipe(
  switchMap(() => interval(1000)), // 开始轮询
  handle
).subscribe();
// 直接复用查询状态代码和后续操作部分代码
other.pipe(
  handle
).subscribe()

总结一下。RxJS比较完整的理解应该是基于流的订阅者模式,而流的灵活性体现在可拆分和聚合,有了pipe管道的加入,流的可复用性增强,因此更容易对代码逻辑进行抽象。


Recommend

  • 60
    • www.tuicool.com 6 years ago
    • Cache

    RxJS: Avoiding Unbound Methods

    RxJS: Avoiding Unbound Methods Photo by

  • 59
    • 掘金 juejin.im 6 years ago
    • Cache

    RxJS教程

    入门 基本概念: Observable(可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。 Observer(观察者): 一个回调函数的集合,它知道如何去监听由Observable提供的值。 **Subscription(订阅):**

  • 62
    • zhuanlan.zhihu.com 6 years ago
    • Cache

    [译] RxJS 高级缓存

  • 46
    • semlinker.com 5 years ago
    • Cache

    RxJS Subject

    观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。 我们可以使用日常生活中,期刊订阅的例子来形象地解释一...

  • 36
    • 微信 mp.weixin.qq.com 5 years ago
    • Cache

    RxJS进阶——关于流的理解

  • 35
    • insights.thoughtworks.cn 5 years ago
    • Cache

    RxJS 快速入门

    这是一篇给新手的 RxJS 快速入门,它可能不精确、不全面,但力求对新手友好。 异步与“回调地狱” 我们都知道 JavaScript 是个多范式语言,它既支持过程式编程,又支持函数式编程,两者分别适用于不同的场合。在同步环...

  • 67
    • Github github.com 5 years ago
    • Cache

    GitHub - xripcsu/rxjs-watcher

    README.md RxJS watcher Simple chrome devtools extension to visualize Rxjs observables. Installation Install npm package ...

  • 16
    • quickapp.vivo.com.cn 4 years ago
    • Cache

    在快应用中使用 RxJS

    在快应用中使用 RxJS 快应用 • May 09,...

  • 16
    • Github github.com 3 years ago
    • Cache

    构建流式应用—RxJS详解

    原文地址 最近在 Alloyteam Conf 2016 分享了《使用RxJS构建流式前端应用》,会后在线上线下跟大家交流时发现对于 RxJS 的态度呈现出两大类:有用过的都表达了 RxJS 带来的优雅编码体验,未用过...

  • 10
    • discretetom.github.io 2 years ago
    • Cache

    关于open gl中变换应用顺序的理解

    关于open gl中变换应用顺序的理解 图像变换顺序与机制、视角变换相关 OPEN GL中的变换相关代码需要反着写 比如,我们需要(以下过程不需要脑补画面,只需要稍微记一下顺序: 创建一个几...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK