7

基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】 - 实习小生

 1 year ago
source link: https://www.cnblogs.com/sxxs/p/17309745.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

基于chunjun纯钧的增量数据同步

目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步

chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方
根据文档我编写了一个SQL脚本

create table `source` (
        `sfzh` STRING COMMENT '',
        `xm` STRING COMMENT '',
        `xb` STRING COMMENT '',
        `xbdm` STRING COMMENT '',
        `jzdz` STRING COMMENT '',
        `fzrq` DATE COMMENT '',
        `dsc_biz_record_id` STRING COMMENT ''
) with (
        'connector' = 'mysql-x',
        'url' = 'jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
        'table-name' = '',
        'username' = '',
        'password' = '',
        'scan.fetch-size' = '1024',
        'scan.increment.column' = 'fzrq',
        --'scan.increment.column-type' = 'date',
        'scan.start-location' = '1659974400000'
);

create table `sink` (
        `sfzh` STRING COMMENT '',
        `xm` STRING COMMENT '',
        `xb` STRING COMMENT '',
        `xbdm` STRING COMMENT '',
        `jzdz` STRING COMMENT '',
        `fzrq` DATE COMMENT '',
        `dsc_biz_record_id` STRING COMMENT '',
        PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED
) with (
        'connector' = 'stream-x'
);

然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!

在本地调试源码解决问题的大致过程

在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量

/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
protected transient CustomReporter customReporter;

该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

    @Override
    public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true

 /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
    @Override
    protected boolean useCustomReporter() {
        return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
    }

    /** 增量同步或者间隔轮询时,是否初始化外部存储 */
    protected Boolean initReporter = true;

经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式

1920321-20230412142015665-310064444.png

尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了

1920321-20230412143101149-1793566892.png

那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader

public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

    public static CustomReporter discoverMetric(
            ChunJunCommonConf commonConf,
            RuntimeContext context,
            boolean makeTaskFailedWhenReportFailed) {
        try {
            String pluginName = commonConf.getMetricPluginName();
            // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
            String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);
            MetricParam metricParam =
                    new MetricParam(
                            context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());

            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            Class<?> clazz = classLoader.loadClass(pluginClassName);
            Constructor<?> constructor = clazz.getConstructor(MetricParam.class);

            return (CustomReporter) constructor.newInstance(metricParam);
        } catch (Exception e) {
            throw new ChunJunRuntimeException(e);
        }
    }

在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK