2

使用 Vert.x 处理 Kafka 和数据库之间的背压

 1 year ago
source link: https://www.jdon.com/67037.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用 Vert.x 处理 Kafka 和数据库之间的背压

异步编程在开发反应式和响应式应用程序方面带来了许多优点。然而,它也存在缺点和挑战,其中主要的问题之一是背压问题。

什么是背压?
在物理学中定义是:

它是与管道中所需的流体流动相反的阻力或力

我们可以把这个问题转化为一个已知的场景:
从总线上轮询的消息并将其保存到数据库中,但是在总线上有大量的消息,我们的应用程序轮询的速度非常快,但保存消息到的数据库却非常慢。

在同步情况下,没有背压的问题,计算的同步性阻止了来自总线的轮询,直到当前消息被处理。

但是,在异步情况下,轮询的执行不是与数据库保存发生在同一线程。因此,如果数据库不能处理来自总线的所有消息,这些消息就会滞留在 "中间",也就是在我们服务的内存中。

这可能会导致失败,或者在最坏的情况下,导致服务故障。

自动轮询
起初,我们的轮询会做这些操作:

  • 初始化JDBC客户端
  • 初始化Kafka客户端

这段代码非常简单,而且在处理少量消息时效果很好。
当负载越来越大时,问题就出现了:使用Vertx Kafka消费者的处理程序意味着没有对消息比例的控制,所以它会连续轮询而不考虑持久化率,导致内存过载。

让我们尝试开发一个将消息持久化在数据库中的应用程序,并使其进化到处理背压的程度:

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise)
      .handler(record -> {
        persist(jdbc, record)
          .onSuccess(result -> System.out.println("Message persisted"))
          .onFailure(cause -> System.err.println("Message not persisted " + cause));
      });
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }


  private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) {
    Promise<UpdateResult> promise = Promise.promise();
    JsonArray params = toParams(record);
    jdbc.updateWithParams("insert or update query to persist record", params, promise);
    return promise.future();
  }

  private JsonObject datasourceConfiguration() {
    // TODO datasource configuration
    return null;
  }

  private JsonArray toParams(KafkaConsumerRecord<String, String> record) {
    // TODO: convert the record into params for the sql command
    return null;
  }
}

显式轮询
为了处理背压,应使用显式轮询,这可以通过避开kafka消费者的处理程序设置和手动调用轮询(在下面的例子中,每100ms)来实现。
通过使用这种方法,可以使每次轮询只有在先前轮询的消息批次被持久化时才被执行。

这种行为可以通过处理每个消息的持久化未来,并用CompositeFuture.all收集所有的消息来实现,只有当所有的消息都完成时才会成功,而且只有在这种情况下才能进行下一次轮询。

如果至少有一个未来失败了,那么一切都会失败,轮询也会停止。

有各种解决方案可以使服务处理失败,例如,将消息发送到死信队列,但我们不会涉及这种情况。

这段代码的问题是,如果一个消息失败了,我们将失去这个消息,因为消费者被设置为自动提交,所以,是vertx提交了主题偏移。

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer<String, String> consumer = KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List<Future<UpdateResult>> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .onSuccess(composite -> {
        System.out.println("All messages persisted");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting messages: " + cause))
    ;
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }

  ...
}

手动提交

将ENABLE_AUTO_COMMIT_CONFIG属性设置为false,我们的服务就会掌管对主题偏移提交的所有权。

只有当每条消息都将被持久化时,才会进行提交,通过这种技巧,至少实现了一次交付。

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer<String, String> consumer = KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List<Future<UpdateResult>> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .compose(composite -> {
        Promise<Void> commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause))
    ;
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }
  ...
}

奖励功能:实现排序
只要稍加努力,就有可能实现排序:
future组合允许强迫每个持久化操作等待其前一个操作的完成。

这可以通过将异步计算一个接一个地串联起来来实现,所以每一个都会在前一个未来成功时被执行。

这是一个聪明的模式,在需要序列化的时候使用。

public class MainVerticle extends AbstractVerticle {
  ...
  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> IntStream.range(0, records.size())
        .mapToObj(records::recordAt)
        .reduce(Future.<UpdateResult>succeededFuture(),
          (acc, record) -> acc.compose(it -> persist(jdbc, record)),
          (a,b) -> a
        )
      )
      .compose(composite -> {
        Promise<Void> commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause));
  }
  ...
}

结论
在使用异步编程时,背压是一个基本话题。
它不是从vert.x的免费功能,但它可以通过一些简单的技巧来实现。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK