1

PoweJob高级特性-MapReduce完整示例 - 风小雅

 2 years ago
source link: https://www.cnblogs.com/ylty/p/16374927.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

由于网上搜索 PowerJob MapReduce 都是设计原理,demo也展示个空壳子,没有演示Map到Reduce结果怎么传递,对于没有MR开发经验的人来说并没有什么帮助,所以这里写了一个有完整计算意义的demo供参考。

代码功能:

实现一个sum累加。

任务输入参数:

batchSize=100&batchNum=10,
其中batchSize表示每个子任务大小,这里就是一个子任务负责100个数据累加。
batchNum表示批次大小,也就是本次分发为10个子任务来完成。
执行过程就是:Map过程是将本次任务划分为10个子任务,每个子任务分别完成1累加到100,101累加到201,...,以此类推。Reduce过程获取每个子任务的执行结果汇总累加,返回结果值。

package org.example.demo;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.log.OmsLogger;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
 * 控制台参数 batchSize=100&batchNum=10
 * @author zhengqian
 * @date 2022.05.30
 */
@Component
public class MRSumProcessor implements MapReduceProcessor {

    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();

        System.out.println("============== TestMapReduceProcessor#process ==============");
        System.out.println("isRootTask:" + isRootTask());
        System.out.println("taskContext:" + JsonUtils.toJSONString(context));

        if (isRootTask()) {
            System.out.println("==== MAP ====");
            omsLogger.info("[DemoMRProcessor] start root task~");

            // 根据控制台参数获取MR批次及子任务大小
            Map<String, String> jobParams = Splitter.on("&").withKeyValueSeparator("=").split(context.getJobParams());
            Integer batchSize = Integer.parseInt(jobParams.getOrDefault("batchSize", "100"));
            Integer batchNum = Integer.parseInt(jobParams.getOrDefault("batchNum", "10"));

            List<SubTaskParam> subTasks = Lists.newLinkedList();
            for (int j = 0; j < batchNum; j++) {
                subTasks.add(new SubTaskParam(j * batchSize + 1, (j + 1) * batchSize));
                map(subTasks, "INFO");
                subTasks.clear();
            }
            omsLogger.info("[DemoMRProcessor] map success~");
            return new ProcessResult(true, "MAP_SUCCESS");
        } else if (context.getTaskName().equals("INFO")) {
            // 子任务执行
            SubTaskParam subTaskParam = (SubTaskParam) context.getSubTask();
            omsLogger.info(subTaskParam.toString());

            long sum = 0L;
            for (int x = subTaskParam.getStart(); x <= subTaskParam.getEnd(); x++) {
                sum += x;
            }
            omsLogger.info("[DemoMRProcessor] start={}, end={}, sum={}", subTaskParam.getStart(), subTaskParam.getEnd(), sum);
            return new ProcessResult(true, String.valueOf(sum));
        }
        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
        log.info("================ MapReduceProcessorDemo#reduce ================");
        log.info("TaskContext: {}", JsonUtils.toJSONString(context));
        log.info("List<TaskResult>: {}", JsonUtils.toJSONString(taskResults));
        context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);

        long sum = 0L;
        for (TaskResult taskResult : taskResults) {
            String result = taskResult.getResult();
            if (NumberUtils.isDigits(result)) {
                sum += Long.parseLong(result);
            }
        }

        return new ProcessResult(true, sum + ": " + sum);
    }

    private static class SubTaskParam implements Serializable {
        private int start;
        private int end;

        public SubTaskParam() {}

        public SubTaskParam(int start, int end) {
            this.start = start;
            this.end = end;
        }

        public int getStart() {
            return start;
        }

        public void setStart(int start) {
            this.start = start;
        }

        public int getEnd() {
            return end;
        }

        public void setEnd(int end) {
            this.end = end;
        }

        @Override
        public String toString() {
            return start + ":" + end;
        }
    }

}


执行日志如图

执行日志

执行结果如图

执行日志

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK