18

异步编程二:promise模式 - 简书

 4 years ago
source link: https://www.jianshu.com/p/b3febf70c09e?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

异步编程二:promise模式

0.1922020.01.18 22:51:51字数 1,656阅读 1,080

书接上回,我们聊了为什么要有异步编程模式以及异步编程的魅力。
本文主要聊一聊异步编程中的promise模式;promise模式广泛应用于前端开发
,所以故事先从前端开发讲起。
因为前端的ajax技术,与服务端通信皆是基于异步编程,需要处理各种回调;
回调本身并不可怕,但是如果需要对多个回调结果进行compose, 那就比较麻烦了,会出现所谓的回调地狱;在异步编程方面,前端同学先痛起来了,所以,在这个方向上,前端同学确实走的也比较靠前,模式也比较成熟。
那么对于java而言,先不谈协程的玩法,异步编程可选的模式就不多了,其中一个便是promise模式,以及接下来会进行介绍的reactor模式。

接下来将按照案例对promise模式进行安利。
(特别提示,本系列文章皆是采用kotlin编写,写过kotlin之后,真的很难切回java,由俭入奢易,由奢入俭难啊)

有一远程服务名曰“计算器”,提供加法和减法两个http接口

  • 计算 n + m :/add?a=n&b=m
  • 计算 n - m :/sub?a=n&b=m
    基于上面的计算器服务,需要在我们的服务中开发一个接口,计算 m + n - l, 采用异步代码该如何写呢?此处基于vert.x进行示范
 override fun start(startPromise: Promise<Void>?) {
    webClient = WebClient.create(vertx)

    var eventBus = vertx.eventBus()
    eventBus.consumer<JsonObject>("calc.promise"){ msg ->
        var msgBody = msg.body()
        var m = msgBody.getInteger("m", 0)
        var n = msgBody.getInteger("n", 0)
        var l = msgBody.getInteger("l", 0)

        webClient.get(7777, "pi", "/add?a=$m&b=$n").send{
            if (it.succeeded()) {
                var `m + n` = it.result().bodyAsString().toInt()
                // 此处是vertx的webClient,是一个异步的http client
                // 7777是端口,pi是host(在下部署在树莓派上做的测试),后面是uri
                webClient.get(7777, "pi", "/sub?a=$`m + n`&b=$l").send{
                    if (it.succeeded()) {
                        var `m + n - l` = it.result().bodyAsString().toInt()
                        replyHandler(`m + n - l`.toString())
                    } else {
                        replyHandler("calc error: m + n - l")
                    }
                }
            } else {
                replyHandler("calc error: m + n")
            }
        }
    }
}

上面是一段基于callback的写法,略微有一些回调地狱迷之缩进
那么如果采用promise模式,在本案例的场景下,是可以缓解回调地狱迷之缩进,大家感受一下(此处同样也是使用了vert.x里的promise类):

 override fun start(startPromise: Promise<Void>?) {
    webClient = WebClient.create(vertx)

    var eventBus = vertx.eventBus()
    eventBus.consumer<JsonObject>("calc.promise"){ msg ->
        var msgBody = msg.body()
        var m = msgBody.getInteger("a", 0)
        var n = msgBody.getInteger("b", 0)
        var l = msgBody.getInteger("c", 0)

        add(m, n).future().onSuccess {
            sub(it, l).future().onSuccess {
                msg.reply(it.toString())
            }
        }
    }
}

fun add(a: Int, b: Int) : Promise<Int> {
    var promise = Promise.promise<Int>()

    webClient.get(7777, "pi", "/add?a=$a&b=$b").send{
        if (it.succeeded()) {
            var addResult = it.result().bodyAsString().toInt()
            promise.complete(addResult)
        } else {
            promise.fail("calc failed $a add $b")
        }
    }

    return promise
}

fun sub(a: Int, b: Int) : Promise<Int> {
    var promise = Promise.promise<Int>()

    webClient.get(7777, "pi", "/sub?a=$a&b=$b").send{
        if (it.succeeded()) {
            var addResult = it.result().bodyAsString().toInt()
            promise.complete(addResult)
        } else {
            promise.fail("calc failed $a sub $b")
        }
    }

    return promise
}

上面这种写法是基于promise模式的,虽然有所缓解,但主要功效还是在把一些代码抽离了,核心的计算逻辑,依然还是迷之缩进的,莫着急,后面还有内容。
此处先仔细看下这两段代码,vertx原生提供的io.vertx.core.Promise类,就是用来做promise模式的, 那么promise相比于callback,改变到底是啥?经过日夜揣摩,在下得出结论:

当调用一个异步方法时,promise 模式可以把针对该方法的执行结果的处理逻辑从入参(回调函数),变为返回值;这样带来的编程变化是:基于返回值的处理,更方便的支持链式写法,把缩进改为一条链

这个话题先聊到这里,稍微找一找promise的感觉,咱们接着案例往下聊

基于计算器服务,实现一个接口: a + ((b -c)+ d) -e -f + g
先看看基于回调怎么写:

//源码地址:https://github.com/HongkaiWen/vertx-kotlin/blob/promise/src/main/kotlin/com/github/hongkaiwen/reactor/vk/calc/CallbackVerticle.kt

class CallbackVerticle : AbstractVerticle(){

    lateinit var webClient: WebClient

    override fun start(startPromise: Promise<Void>?) {
        webClient = WebClient.create(vertx)

        var eventBus = vertx.eventBus()
        eventBus.consumer<JsonObject>("calc.callback"){
            var msgBody = it.body()
            var a = msgBody.getInteger("a", 0)
            var b = msgBody.getInteger("b", 0)
            var c = msgBody.getInteger("c", 0)
            var d = msgBody.getInteger("d", 0)
            var e = msgBody.getInteger("e", 0)
            var f = msgBody.getInteger("f", 0)
            var g = msgBody.getInteger("g", 0)
            calc(a, b, c, d, e, f, g){reply ->
                it.reply(reply)
            }
        }
    }

    fun calc(a: Int, b: Int, c: Int, d: Int, e: Int, f: Int, g: Int, replyHandler: (String) -> Unit){
        webClient.get(7777, "pi", "/sub?a=$b&b=$c").send{
            if (it.succeeded()) {
                var `b-c` = it.result().bodyAsString().toInt()
                webClient.get(7777, "pi", "/add?a=$`b-c`&b=$d").send{
                    if (it.succeeded()) {
                        var `(b-c)+d` = it.result().bodyAsString().toInt()
                        webClient.get(7777, "pi", "/add?a=$a&b=$`(b-c)+d`").send{
                            if (it.succeeded()) {
                                var `a+(b-c)+d` = it.result().bodyAsString().toInt()
                                webClient.get(7777, "pi", "/sub?a=$`a+(b-c)+d`&b=$e").send{
                                    if (it.succeeded()) {
                                        var `a+(b-c)+d-e` = it.result().bodyAsString().toInt()
                                        webClient.get(7777, "pi", "/sub?a=$`a+(b-c)+d-e`&b=$f").send{
                                            if (it.succeeded()) {
                                                var `a+(b-c)+d-e-f` = it.result().bodyAsString().toInt()
                                                webClient.get(7777, "pi", "/add?a=$`a+(b-c)+d-e-f`&b=$g").send{
                                                    if (it.succeeded()) {
                                                        var `a+(b-c)+d-e-f+g` = it.result().bodyAsString().toInt()
                                                        replyHandler(`a+(b-c)+d-e-f+g`.toString())
                                                    } else {
                                                        replyHandler("calc error: a + (b - c) + d -e -f + g")
                                                    }
                                                }
                                            } else {
                                                replyHandler("calc error: a + (b - c) + d -e -f")
                                            }
                                        }

                                    } else {
                                        replyHandler("calc error: a + (b - c) + d -e")
                                    }
                                }
                            } else {
                                replyHandler("calc error: a + (b - c) + d")
                            }
                        }
                    } else {
                        replyHandler("calc error: (b - c) + d")
                    }
                }
            } else {
                replyHandler("calc error: b - c")
            }
        }
    }

}

上面这段代码,实在让人吐血,终于知道啥叫地狱了, 针对上面的方法,基于promise来做改进,效果如何呢:

//源码地址:https://github.com/HongkaiWen/vertx-kotlin/blob/promise/src/main/kotlin/com/github/hongkaiwen/reactor/vk/calc/PromiseVerticle.kt

class PromiseVerticle : AbstractVerticle(){


    lateinit var webClient: WebClient

    override fun start(startPromise: Promise<Void>?) {
        webClient = WebClient.create(vertx)

        var eventBus = vertx.eventBus()
        eventBus.consumer<JsonObject>("calc.promise"){ msg ->
            var msgBody = msg.body()
            var a = msgBody.getInteger("a", 0)
            var b = msgBody.getInteger("b", 0)
            var c = msgBody.getInteger("c", 0)
            var d = msgBody.getInteger("d", 0)
            var e = msgBody.getInteger("e", 0)
            var f = msgBody.getInteger("f", 0)
            var g = msgBody.getInteger("g", 0)

            sub(b, c).future().onSuccess {
                add(it, d).future().onSuccess {
                    add(a, it).future().onSuccess {
                        sub(it, e).future().onSuccess {
                            sub(it, f).future().onSuccess {
                                add(it, g).future().onSuccess {
                                    msg.reply(it.toString())
                                }
                            }
                        }
                    }
                }
            }

        }
    }

    fun add(a: Int, b: Int) : Promise<Int> {
        var promise = Promise.promise<Int>()

        webClient.get(7777, "pi", "/add?a=$a&b=$b").send{
            if (it.succeeded()) {
                var addResult = it.result().bodyAsString().toInt()
                promise.complete(addResult)
            } else {
                promise.fail("calc failed $a add $b")
            }
        }

        return promise
    }

    fun sub(a: Int, b: Int) : Promise<Int> {
        var promise = Promise.promise<Int>()

        webClient.get(7777, "pi", "/sub?a=$a&b=$b").send{
            if (it.succeeded()) {
                var addResult = it.result().bodyAsString().toInt()
                promise.complete(addResult)
            } else {
                promise.fail("calc failed $a sub $b")
            }
        }

        return promise
    }


}

大地狱和小地狱的区别而已,因此就在思忖,貌似使用姿势不对,基于promise的代码依然是迷之缩进的,除非我们可以把promise的处理逻辑做成链式的

JavaScript的promise

感觉上面的路子还是不对,所以我们来参考下前端同学是如何玩promise的;因为在下对JavaScript不感兴趣,所以不求甚解的从大神的博客 摘取一段,如下:

function multiply(input) {
    return new Promise(function (resolve, reject) {
        log('calculating ' + input + ' x ' + input + '...');
        setTimeout(resolve, 500, input * input);
    });
}

// 0.5秒后返回input+input的计算结果:
function add(input) {
    return new Promise(function (resolve, reject) {
        log('calculating ' + input + ' + ' + input + '...');
        setTimeout(resolve, 500, input + input);
    });
}

var p = new Promise(function (resolve, reject) {
    log('start new Promise...');
    resolve(123);
});

p.then(multiply)
 .then(add)
 .then(multiply)
 .then(add)
 .then(function (result) {
    log('Got value: ' + result);
});

发现JavaScript里的promise是链式的(就应该这样嘛),那么上面的例子里的姿势一定是错了,问题在哪呢?
JavaScript里的用法,promise是通过then函数进行顺序链式条用的,那么这个then函数,输入是一个处理函数,返回的是一个新的promise才行,这样可以针对一环一环的处理作出管道的效果来。
因为我采用的vertx的promise来做研究,这个类本身处理promise的结果的方法 onSuccess返回的是future本身,并不是一个新的promise的future,所以不好做链式调用:

  @Fluent
  default Future<T> onSuccess(Handler<T> handler) {
    return onComplete(ar -> {
      if (ar.succeeded()) {
        handler.handle(ar.result());
      }
    });
  }

仔细看了一圈io.vertx.core.Promise的相关方法, 发现compose正是我们的解药。

class PromiseLineVerticle3 : AbstractVerticle(){


    lateinit var webClient: WebClient

    override fun start(startPromise: Promise<Void>?) {
        webClient = WebClient.create(vertx)

        var eventBus = vertx.eventBus()
        eventBus.consumer<JsonObject>("calc.promise.line3"){ msg ->
            var msgBody = msg.body()
            var a = msgBody.getInteger("a", 0)
            var b = msgBody.getInteger("b", 0)
            var c = msgBody.getInteger("c", 0)
            var d = msgBody.getInteger("d", 0)
            var e = msgBody.getInteger("e", 0)
            var f = msgBody.getInteger("f", 0)
            var g = msgBody.getInteger("g", 0)

            //只要一个onFailure即可
            (b asyncSub c)
                .compose { it asyncAdd d }
                .compose { it asyncAdd a }
                .compose { it asyncSub e }
                .compose { it asyncSub f }
                .compose { it asyncAdd g }
                .onSuccess { msg.reply(it.toString()) }
                .onFailure{msg.fail(500, it.message)}
        }
    }

    infix fun Int.asyncAdd(input : Int) : Future<Int> {
        return calc(this, input, CalcOperator.add)
    }

    infix fun Int.asyncSub(input : Int) : Future<Int> {
        return calc(this, input, CalcOperator.sub)
    }

    /**
     * 所有异常必须被处理
     */
    fun calc(a: Int, b: Int, operator: CalcOperator) : Future<Int> {
        var promise = Promise.promise<Int>()

        webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b").send{
            if (it.succeeded()) {
                try{
                    var addResult = it.result().bodyAsString().toInt()
                    promise.complete(addResult)
                    println("$a - $b = $addResult")
                } catch (e: Exception) {
                    promise.fail(e)
                }
            } else {
                it.cause().printStackTrace()
                promise.fail("calc failed $a add $b")
            }
        }

        return promise.future()
    }
}

enum class CalcOperator {
    add, sub
}

通过compose函数代码清爽了许多。
特别要提一下的是,上面的写法it asyncAdd d是kotlin的中缀表达式,另外这个地方的it,java程序员一定会问怎么没有声明过就使用?对的,kotlin中的lambda如果只有一个参数,就可以默认用it代替,写法简单而且避免不知道该其起啥变量名。

基于CompletableFuture实现promise

因为笔者最近vert.x用的多,各种例子都是基于vert.x的;但其实如果把java和promise一起做搜索的话,得到的结果一般可能是jdk的CompletableFuture实现;而且vert.x社区也有roadmap准备向jdk的CompletionStage靠拢(RFC),所以再写一个CompletableFuture的实现版本:

class PromiseLineVerticle4 : AbstractVerticle(){


    lateinit var webClient: WebClient

    override fun start(startPromise: Promise<Void>?) {
        webClient = WebClient.create(vertx)

        var eventBus = vertx.eventBus()
        eventBus.consumer<JsonObject>("calc.promise.line4"){ msg ->
            var msgBody = msg.body()
            var a = msgBody.getInteger("a", 0)
            var b = msgBody.getInteger("b", 0)
            var c = msgBody.getInteger("c", 0)
            var d = msgBody.getInteger("d", 0)
            var e = msgBody.getInteger("e", 0)
            var f = msgBody.getInteger("f", 0)
            var g = msgBody.getInteger("g", 0)


            (b asyncSub c)
                .thenCompose { it asyncAdd d }
                .thenCompose { it asyncAdd a }
                .thenCompose { it asyncSub e }
                .thenCompose { it asyncSub f }
                .thenCompose { it asyncAdd g }
                .thenAccept { msg.reply(it.toString()) }
                .exceptionally {
                    msg.fail(500, it.message)
                    null
                }
        }
    }



    infix fun Int.asyncAdd(input : Int) : CompletableFuture<Int> {
        return calc(this, input, CalcOperator.add)
    }

    infix fun Int.asyncSub(input : Int) : CompletableFuture<Int> {
        return calc(this, input, CalcOperator.sub)
    }

    /**
     * 所有异常必须被处理
     */
    fun calc(a: Int, b: Int, operator: CalcOperator) : CompletableFuture<Int> {
        var promise = CompletableFuture<Int>()

        webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b")
            .expect(ResponsePredicate.SC_OK).send{
            if (it.succeeded()) {
                try{
                    var addResult = it.result().bodyAsString().toInt()
                    promise.complete(addResult)
                    println("$a - $b = $addResult")
                } catch (e: Exception) {
                    promise.completeExceptionally(e)
                }
            } else {
                it.cause().printStackTrace()
                promise.completeExceptionally(RuntimeException("calc failed $a add $b"))
            }
        }

        return promise
    }
}

其实无论是vert.x的promise还是jdk的CompletableFuture, 都是promise模式,写发生也比较类似,代码整洁度上也相似,当然笔者很乐于看到java生态圈能够对此形成标准,就和JS的ES6统一定义了前端的promise一样,免得很多口水。

实践上升理论

基于上面的实践,相信对promise是有一点感觉的了。那么需要实践上升理论了,理论参见:
https://en.wikipedia.org/wiki/Futures_and_promises
理论说明相关文章很多,笔者理解主要是状态机的维护,笔者就不再献丑了,后续如果得空在写一篇vert.x的promise源码分析的文章。

promise是异步编程的一个模式,在某些场景下可以解决回调地狱的问题。
接下来的一篇文章讲来聊聊reactor模式。

系列文章快速导航:
异步编程一:异步编程的魅力
异步编程二:promise模式
异步编程三:reactor模式
异步编程四:协程


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK