4

一个Camel Multicast组件聚合策略问题的解决过程

 2 years ago
source link: https://my.oschina.net/u/4526289/blog/5275799
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

摘要:本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。希望本文可以帮助到遇到同样问题的Camel用户。

本文分享自华为云社区《使用Apache Camel Multicast组件遇到的一个问题》,作者:中间件小哥。

本文翻译自华为加拿大研究所的Reji Mathews发表于Apache Camel社区的《ROUTING MULTICAST OUTPUT AFTER ENCOUNTERING PARTIAL FAILURES》一文。在征得原作者同意后,本文对原文的部分内容作了少许修改。

2 Multicast组件简介

Multicast是Apache Camel(以下简称“Camel”)中一个功能强大的EIP组件,可以将消息发送至多条子路径,然后并行地执行它们。

参考官网文档,我们可以使用两种方式配置Multicast组件:

  • 独立执行所有子路径,并将最后响应的子路径的结果作为最终输出。这也是Multicast组件的默认配置。
  • 通过实现Camel的聚合策略(Aggregation Strategy),使用自定义的聚合器来处理所有子路径的输出。

3 问题描述

本文使用案例如下:使用Jetty组件发布一个API,调用该API后,消息会分别发送至"direct:A"和"direct:B"两条子路径。在使用自定义的聚合策略处理后,继续执行后续步骤。其中在"direct:A"中抛出一个异常,来模拟运行失败;"direct:B"正常运行。同时在onException中定义了异常处理策略。

本文使用的Camel版本为3.8.0

@Override
public void configure() throws Exception {
    onException(Exception.class)
        .useOriginalMessage()
        .handled(true)
        .log("Exception handler invoked")
        .transform().constant("{\"data\" : \"err\"}")
        .end();
 
 from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET")
        .log("received request")
        .log("Entering multicast")
        .multicast(new SimpleFlowMergeAggregator())
        .parallelProcessing().to("direct:A", "direct:B")
        .end()
        .log("Aggregated results ${body}")
        .log("Another log")
        .transform(simple("{\"result\" : \"success\"}"))
        .end();
 
    from("direct:A")
        .log("Executing PATH_1 - exception path")
        .transform(constant("DATA_FROM_PATH_1"))
        .log("Starting exception throw")
        .throwException(new Exception("USER INITIATED EXCEPTION"))
        .log("PATH_1")
        .end();
 
    from("direct:B")
        .log("Executing PATH_2 - success path")
        .delayer(1000)
        .transform(constant("DATA_FROM_PATH_2"))
        .log("PATH_2")
        .end();
}

自定义聚合器SimpleFlowMergeAggregator定义如下,其中我们将所有子路径的结果放入一个list对象。

public class SimpleFlowMergeAggregator implements AggregationStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
        if(oldExchange == null) {
            String data = newExchange.getIn().getBody(String.class);
            List<String> aggregatedDataList = new ArrayList<>();
            aggregatedDataList.add(data);
            newExchange.getIn().setBody(aggregatedDataList);
            return newExchange;
        }
 
        List<String> oldData = oldExchange.getIn().getBody(List.class);
        oldData.add(newExchange.getIn().getBody(String.class));
        oldExchange.getIn().setBody(oldData);
 
        return oldExchange;
    }
}

基于对Multicast组件执行逻辑的理解,我们认为存在多个子路径时,其运行结果应该为:如果其中有一条子路径能运行成功,则使用聚合的结果继续执行后续步骤;如果所有子路径都运行失败,则停止整个路由(route)。本案例中,由于子路径"direct:A"运行异常,子路径"direct:B"运行正常,则应该正常执行后续两个步骤日志(log)和转换(transform)。

运行上述案例,日志信息如下:

2021-05-06 12:43:18.565 INFO 13956 --- [qtp916897446-42] route1 : received request
2021-05-06 12:43:18.566 INFO 13956 --- [qtp916897446-42] route1 : Entering multicast
2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Executing PATH_1 - exception path
2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Starting exception throw
2021-05-06 12:43:18.578 INFO 13956 --- [ #4 - Multicast] route2 : Exception handler invoked
2021-05-06 12:43:18.579 INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}
2021-05-06 12:43:19.575 INFO 13956 --- [ #3 - Multicast] route3 : Executing PATH_2 - success path
2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] route3 : PATH_2
2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2

观察上述日志,我们发现完成两条子路径结果的聚合后,后续的两个步骤日志(log)和转换(transform)并未执行。这并不符合我们期望的结果。

经过多次测试,我们还发现,只有当到达聚合器SimpleFlowMergeAggregator的第一个子路径("direct:A")执行异常时,便会发生这种后续步骤未执行的情况;而如果第一个子路径("direct:A")执行成功,即使另一个子路径("direct:B")执行失败,也会继续执行后续的步骤。

4 问题分析

接下来,我们通过查看Camel源代码,来找出上述现象的原因。

在camel-core-processors模块的Pipeline.java 中,其run()方法中有这样一段代码:

@Override
public void run() {
    boolean stop = exchange.isRouteStop();
    int num = index;
    boolean more = num < size;
    boolean first = num == 0;
 
    if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
 
 // prepare for next run
        if (exchange.hasOut()) {
            exchange.setIn(exchange.getOut());
            exchange.setOut(null);
        }
 
 // get the next processor
        AsyncProcessor processor = processors.get(index++);
 
        processor.process(exchange, this);
    } else {
 // copyResults is needed in case MEP is OUT and the message is not an OUT message
        ExchangeHelper.copyResults(exchange, exchange);
 
 // logging nextExchange as it contains the exchange that might have altered the payload and since
 // we are logging the completion if will be confusing if we log the original instead
 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
        if (LOG.isTraceEnabled()) {
            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
        }
 
        AsyncCallback cb = callback;
        taskFactory.release(this);
        reactiveExecutor.schedule(cb);
    }
}

其中,这个if判断决定了是否继续执行后续步骤:

if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG)))

可以看出,在如下三种情况下,后续步骤将不会被执行:

1. 之前的步骤已经将exchange 对象标记为停止状态。

boolean stop = exchange.isRouteStop();

2. 后续没有步骤可执行。

boolean more = num < size;

3. continueProcessing()方法返回false。

我们来看看continueProcessing()方法的代码。

public final class PipelineHelper {
    public static boolean continueProcessing(Exchange exchange, String message, Logger log) {
        ExtendedExchange ee = (ExtendedExchange) exchange;
        boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()
                || (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled());
        if (stop) {
            if (log.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange);
                if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {
                    sb.append(" Marked as rollback only.");
                }
                if (exchange.getException() != null) {
                    sb.append(" Exception: ").append(exchange.getException());
                }
                if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {
                    sb.append(" Handled by the error handler.");
                }
                log.debug(sb.toString());
            }
 
            return false;
        }
        if (ee.isRouteStop()) {
            if (log.isDebugEnabled()) {
                log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);
            }
            return false;
        }
 
        return true;
    }
}

可以看出,当执行过程发生异常并且被异常处理器捕获时,continueProcessing()方法将返回false。

再回到我们的案例,第一个到达聚合器SimpleFlowMergeAggregator的子路径("direct:A"),会作为后续聚合的基础,其它子路径("direct:B")会在此基础上追加各自的body数据。实际上,很多Camel用户都会采用这种方式来实现自定义聚合策略。但这样做存在一个问题:在异常处理时,子路径"direct:A"的exchange对象会被设置一个状态标识,而此状态标识会被传递到下游,用于判断是否继续执行后续步骤。由于作为聚合基础的"direct:A"子路径的exchange对象状态为“异常”,最终continueProcessing()方法将返回false,后续的步骤也就不会再执行。

5 解决方案

对于上述问题,用户可以使用多种方式来设置异常处理时exchange对象的状态。本文采用如下解决方案:如果第一个子路径执行正常,则继续执行后续步骤;如果第一个子路径执行异常,则将其与其它执行成功的子路径交换,然后继续执行后续步骤。

更新后的自定义聚合器SimpleFlowMergeAggregator如下:

public class SimpleFlowMergeAggregator implements AggregationStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
        if(oldExchange == null) {
            String data = newExchange.getIn().getBody(String.class);
            List<String> aggregatedDataList = new ArrayList<>();
            aggregatedDataList.add(data);
            newExchange.getIn().setBody(aggregatedDataList);
            return newExchange;
        }
 
        if(hadException(oldExchange)) {
            if(!hadException(newExchange)) {
 // aggregate and swap the base
                LOGGER.info("Found new exchange with success. swapping the base exchange");
                List<String> oldData = oldExchange.getIn().getBody(List.class);
                oldData.add(newExchange.getIn().getBody(String.class));
            // swapped the base here
                newExchange.getIn().setBody(oldData);                 
                return newExchange;
            }
        }
 
        List<String> oldData = oldExchange.getIn().getBody(List.class);
        oldData.add(newExchange.getIn().getBody(String.class));
        oldExchange.getIn().setBody(oldData);
 
        return oldExchange;
    }
 
 
    private boolean hadException(Exchange exchange) {
 
        if(exchange.isFailed()) {
            return true;
        }
 
        if(exchange.isRollbackOnly()) {
            return true;
        }
 
        if(exchange.isRollbackOnlyLast()) {
            return true;
        }
 
        if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()
                && ((ExtendedExchange)exchange).isErrorHandlerHandled()) {
            return true;
        }
 
        return false;
    }
}

再次运行上述案例,日志信息如下:

2021-05-06 12:46:19.122 INFO 2576 --- [qtp174245837-45] route1 : received request
2021-05-06 12:46:19.123 INFO 2576 --- [qtp174245837-45] route1 : Entering multicast
2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Executing PATH_1 - exception path
2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Starting exception throw
2021-05-06 12:46:19.134 INFO 2576 --- [ #3 - Multicast] route2 : Exception handler invoked
2021-05-06 12:46:19.135 INFO 2576 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}
2021-05-06 12:46:20.130 INFO 2576 --- [ #4 - Multicast] route3 : Executing PATH_2 - success path
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] route3 : PATH_2
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2
2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Found new exchange with success. swapping the base exchange
2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Aggregated results {"data" : "err"},DATA_FROM_PATH_2
2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Another log

可以看出,使用新的自定义聚合策略后,后续的日志(log)和转换(transform)步骤都成功执行。

本文通过案例,发现了一个Camel Multicast组件聚合策略相关的问题。通过查看Camel源代码,找到了问题原因并给出了解决方案。

希望本文可以帮助到遇到同样问题的Camel用户。

点击关注,第一时间了解华为云新鲜技术~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK