3

使用Java虚拟线程实现Actor模型保护状态 - Adam

 1 year ago
source link: https://www.jdon.com/63148
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用Java虚拟线程实现Actor模型保护状态 - Adam


Java 19 包含Project Loom核心组件的预览:虚拟线程。我们现在可以随意创建线程,因为我们不再(或至少少得多)受它们在内存和上下文切换时间方面的成本的限制。

哪里有线程,哪里就有并发。那么这对于 JVM 上的并发性意味着什么呢?可以肯定的是,阻塞 API 将卷土重来,取代异步、基于 Future 或反应式的代码。但是到什么程度呢?这还有待观察。

但是,无论我们编写代码的方式如何(阻塞线程、使用Futures 或响应式扩展),作为一个行业,我们在处理并发时已经制定了一些最佳实践。在高层次上,这样的一种做法是避免使用共享内存协调并发运行的进程。这也意味着避免使用锁来同步对该内存的访问。

共享内存和锁的主要问题是死锁的可能性(如果锁并不总是以相同的顺序获取)。这种方法也可能导致高争用和阻塞线程:即使线程是虚拟的,它们也可能实现重要的业务功能,除非绝对必要,否则不应阻塞。最后,我们可能会遇到数据竞争,如果不是所有必需的锁都被获取或持有的时间不够长。

我们可以将设计基于消息传递,而不是共享内存和锁。这是Actor模型的基石,在Erlang和Akka中实现,但它也是 Go 的goroutines的基础。

在最初的草稿中,Project Loom 还包括一个类似 Go 的通道实现;但是,它不包含在预览中。不过,我们可能会怀疑,通道将在稍后的某个时间点添加到 Java 的 std 库中。

Actor
好消息是,使用 Loom 实现一个Actor,或者至少是一个类似Actor的抽象,变得更加容易。但首先,什么是Actor?

Actor 包含了一些(可能是可变的)状态,在按顺序处理来自其邮箱的传入消息时使用该状态。发送消息(并可选择接收回复)是与参与者交互的唯一选项。消息是逐个串行处理的,因此无法同时访问actor的状态。

将其转换为更专业的术语,参与者运行一个无限循环,该循环使用并处理来自队列的消息。以前,实现这一点需要编写一个自定义调度程序,它将许多参与者多路复用到一个有界线程池中。使用 Java 19,我们可以为每个参与者创建一个虚拟线程并依赖 JVM 的调度。

让我们看看如何使用 Loom 实现一个基本的 Actor 实现。当然,它远不及像 Akka 这样的生产级 Actor 实现,后者包括监督、错误处理、远程处理、位置透明性等功能。尽管如此,如果你有一些可变状态,它的访问应该受到保护和序列化,它可能会起作用。

使用 Loom 实现
在我们的实现中,actor 将是一对:一个队列(我们将使用 unbounded LinkedBlockingQueue)和一个从该队列消费的虚拟线程。我们至少需要两个类:Actor实现运行循环,以及ActorRef(遵循 Akka 的术语),它允许向参与者发送消息。它Actor是单线程的,ActorRef可以在多个线程之间自由共享。

actors.png?g-da16a08f

我们将为所有消息定义一个基本接口,要求它们指定发送者在响应正在处理的消息时可能期望的回复类型。这与 Actor 通常的实现方式不同——使用单向开火并忘记。然而,在我们的Actor微库的简单用例中,访问回复可能很有用:

public interface Message<REPLY> {
    default Reply reply(REPLY r) {
        return new Reply(r);
    }
}

public class Reply {
    private final Object reply;
    Reply(Object reply) { this.reply = reply; }
    Object getReply() { return reply; }
}

课程更多的Reply是技术性。它旨在包装对消息的回复,以便发回。它只能使用 来创建Message.reply,它采用正确类型的参数(注意包私有构造函数)。如果我们希望将参与者的行为表示为简单的模式匹配(参见下面的示例),我们无法使用泛型来表达这一点,因为 Java 中没有流类型。

该ActorRef.send方法接受一条消息并将其放在参与者的队列中:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ActorRef {
    private final LinkedBlockingQueue<PendingMessage<?>> queue;

    ActorRef(LinkedBlockingQueue<PendingMessage<?>> queue) {
        this.queue = queue;
    }

    public <R> Future<R> send(Message<R> message) {
        var future = new CompletableFuture<R>();
        queue.add(new PendingMessage<>(message, future));
        return future;
    }
}

//

import java.util.concurrent.CompletableFuture;

record PendingMessage<R>(
    Message<R> message, 
    CompletableFuture<R> future) {}

队列包含原始消息和记录CompletableFuture中捕获的用于回复消息的PendingMessage。

每个参与者仅支持特定的Message实现子集。理想情况下,我们希望将其添加为ActorRef使用泛型参数的约束。尽管如此,由于 Java 有限的泛型(或者我有限的 Java API 建模知识),这似乎是不可能的。因此,该设计并不像我们理想的那样是类型安全的。

future!?

等等,你说——Future在那里做什么?我们不是已经完成了这个包装废话吗?我们不是要到处使用阻塞代码,因为我们有轻量级线程吗?

不总是——Future仍然是一个方便的抽象!这里它捕获了一个重要属性:消息是异步处理的。这是我们有意识的决定,可能对我们的业务流程至关重要。我们可能对回复根本不感兴趣(在这种情况下,我们只是丢弃 的结果send)。或者我们可能需要稍后某个时间点的回复。最后,我们可能会将回复通过管道发送到另一个参与者的邮箱!

轻量级线程在这里给我们带来的好处是,我们可以.get()在需要结果时调用。但是对在后台运行的计算进行抽象仍然是有效的。

行为良好的Actor

为了实现一个actor,我们微库的用户需要提供它的行为:当消息到达时要做什么。这是需要实现的接口:

import java.util.concurrent.Future;

public interface ActorBehavior<MSG extends Message<?>> {
    Future<Reply> onMessage(ActorRef self, MSG message) 
        throws Exception;
}

与 不同ActorRef的是,您可以看到行为被限制为参与者处理的消息的子类型。参与者可以选择异步提供答案,例如,如果它依赖于来自另一个参与者的回复。如上所述,用户代码创建 a 的唯一方法Reply是向方法提供正确类型的值(由Message的泛型类型指定)Message.reply。

要创建一个Actor,我们有以下静态方法:

public static <MSG extends Message<?>> ActorRef create(
        ActorBehavior<MSG> behavior) {
    var queue = new LinkedBlockingQueue<PendingMessage<?>>();
    var self = new ActorRef(queue);
    var actor = new Actor(self, queue, behavior);
    Thread.startVirtualThread(actor);
    return self;
}

我们启动一个新的虚拟线程,它从创建的队列中消费消息并将关联的消息返回ActorRef给调用者。

最后,这是Actor的运行循环。我们消费队列;如果线程被中断,它会通知actor停止。否则,我们运行提供的行为:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Actor implements Runnable {
    private final ActorRef self;
    private final LinkedBlockingQueue<PendingMessage<?>> queue;
    private final ActorBehavior<Message<?>> behavior;

    Actor(ActorRef self, LinkedBlockingQueue<PendingMessage<?>> queue, 
            ActorBehavior<?> behavior) {
        this.self = self;
        this.queue = queue;
        this.behavior = (ActorBehavior<Message<?>>) behavior;
    }

    public void run() {
        var running = true;
        while (running) {
            PendingMessage<?> pending = null;
            try {
                pending = queue.poll(1000, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                running = false;
            }

            if (pending != null) {
                try {
                    var reply = behavior
                        .onMessage(self, pending.message());
                    handleReply(pending, reply);
                } catch (Exception e) {
                    logger.error("Exception when processing: " + 
                        pending.message(), e);
                    pending.future().completeExceptionally(e);
                }
            }
        }
    }

// …
}

我们从行为中得到的回复是Future<Reply>. 完成后,我们还需要完成待处理消息的未来。我们创建一个新的虚拟线程(毕竟它们很便宜!)来阻止Future<Reply>并进一步传递结果。如果你知道flatMap——这正是我们在这里所做的:

private void handleReply(PendingMessage<?> pending, Future<Reply> reply) {
    if (reply != null) {
        Thread.startVirtualThread(() -> {
            try {
                ((CompletableFuture<Object>) pending.future())
                    .complete(reply.get().getReply());
            } catch (Exception e) {
                pending.future().completeExceptionally(e);
            }
        });
    } else pending.future().complete(null);
}

我们完成了!在大约 100 行代码中,我们创建了一个极其简化但功能强大的 Actor 实现。

Actor在行动

例如,让我们编写一个简单的计数器actor,它接受三个消息:Get、Increase和Decrease。我们将这些建模为密封接口:

sealed interface CounterMessage<R> extends Message<R> {}
record Increase(int i) implements CounterMessage<Void> {}
record Decrease(int i) implements CounterMessage<Void> {}
record Get() implements CounterMessage<Integer> {}

接下来,我们将指定actor的行为:

class CounterActorBehavior implements ActorBehavior<CounterMessage<?>> {
    int counter = 0;

    @Override
    public Future<Reply> onMessage(ActorRef self, 
            CounterMessage<?> message) {
        Reply reply = null;
        switch (message) {
            case Increase(int i) inc -> {
                System.out.println("Increase message, by: " + i);
                counter += i;
                reply = inc.reply(null);
            }
            case Decrease(int i) dec -> {
                System.out.println("Decrease message, by: " + i);
                counter -= i;
                reply = dec.reply(null);
            }
            case Get() get -> {
                System.out.println("Get message, current state: " + 
                    counter);
                reply = get.reply(counter);
            }
        }
        return CompletableFuture.completedFuture(reply);
    }

Actor 包含在可变状态 ( counter) 上,访问受到保护。保证此状态只能由单个线程访问。

的合约onMessage迫使我们返回某种回复——在这里,我们总是同步计算它。通过构造该Message.reply方法,回复是类型安全的。

最后,我们将所有内容联系在一起并进行计数:

public static void main(String args) throws Exception {
    var actor = Actor.create(new CounterActorBehavior());

    actor.send(new Increase(10));
    actor.send(new Decrease(8));
    var result = actor.send(new Get()).get();
    System.out.println("Got result: " + result);
}

我们只等待最终回复——因为我们使用 FIFO 队列,所以Get消息应该在Increase和之后处理Decrease。因此,在运行代码时,您应该会看到2.

试试看

可在 GitHub 上找到
除了反例之外,还有一个更大的例子,它为有限数量的工人实现了代理参与者(经理)。代理将等待消息与免费作品配对,因为它们变得可用。一切都以非阻塞方式发生。

陷阱

JVM 上的参与者有其陷阱。有些是 Loom 实现所独有的;由于平台和类型系统的通用性,有些是固有的。

首先,我们在actor内部进行阻塞时必须小心。使用虚拟线程,这可能比以前更成问题。请记住,如果您在 actor 的行为中调用阻塞操作,这将阻止任何消息被进一步处理。如果参与者实现的业务流程需要停止处理其他参与者的消息,这可能是可取的。但大多数情况下,您会希望异步运行阻塞操作(例如在新的虚拟线程中)并将结果作为消息返回给参与者。为此,参与者可以访问self- ActorRef。但是,我们的实现缺少 API 来优雅地表达上述内容。但是,这只是添加几个实用方法的问题。

其次(这个 hazard 与 Akka 共享),我们必须小心不要泄露 actor 的可变状态。如果参与者的内部状态是,例如,一个可变集合,在将集合发送到外部之前,我们应该总是制作一个副本。此外,我们必须小心不要从任何类型的回调或闭包中访问(读取或写入!)actor 的状态——因为这些可能是从其他线程、由其他参与者或在future完成时运行的。这可能会导致对状态的并发访问——这是我们一开始就想避免的!

未来

Actor 会成为我们在后 Loom Java 中进行并发的方式吗?还是我们会选择其他抽象,比如通道?社区会出现什么样的图书馆?

这些问题目前尚无答案,但如果您发现自己想做一些共享内存并发,可能还有其他选择。您的用例很可能需要共享内存方法,但也可能是消息传递提供了一种更简单且不易出错的解决方案。

请记住,现有的 Actor 实现使用 Java 19 可以继续正常工作。但是,创建一个非常基本的 Actor 实现也是一种选择。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK