3

源码解析springbatch的job是如何运行的? - starmoon1900

 2 years ago
source link: https://www.cnblogs.com/starmoon1994/p/16565987.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

202208-源码解析springbatch的job是如何运行的?

注,本文中的demo代码节选于图书《Spring Batch批处理框架》的配套源代码,并做并适配springboot升级版本,完全开源。

SpringBatch的背景和用法,就不再赘述了,默认本文受众都使用过batch框架。
本文仅讨论普通的ChunkStep,分片/异步处理等功能暂不讨论。

1. 表结构

Spring系列的框架代码,大多又臭又长,让人头晕。先列出整体流程,再去看源码。顺带也可以了解存储表结构。

  1. 每一个jobname,加运行参数的MD5值,被定义为一个job_instance,存储在batch_job_instance表中;
  2. job_instance每次运行时,会创建一个新的job_execution,存储在batch_job_execution / batch_job_execution_context 表中;
    1. 扩展:任务重启时,如何续作? 答,判定为任务续作,创建新的job_execution时,会使用旧job_execution的运行态ExecutionContext(通俗讲,火车出故障只换了车头,车厢货物不变。)
  3. job_execution会根据job排程中的step顺序,逐个执行,逐个转化为step_execution,并存储在batch_step_execution / batch_step_execution_context表中
  4. 每个step在执行时,会维护step运行状态,当出现异常或者整个step清单执行完成,会更新job_execution的状态
  5. 在每个step执行前后、job_execution前后,都会通知Listener做回调。

框架使用的表

batch_job_instancebatch_job_executionbatch_job_execution_contextbatch_job_execution_paramsbatch_step_executionbatch_step_execution_contextbatch_job_seqbatch_step_execution_seqbatch_job_execution_seq

2. API入口

先看看怎么调用启动Job的API,看起来非常简单,传入job信息和参数即可

@Autowired @Qualifier("billJob") private Job job; @Test public void billJob() throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addLong("currentTimeMillis", System.currentTimeMillis()) .addString("batchNo","2022080402") .toJobParameters(); JobExecution result = jobLauncher.run(job, jobParameters); System.out.println(result.toString()); Thread.sleep(6000); }
<!-- 账单作业 --> <batch:job id="billJob"> <batch:step id="billStep"> <batch:tasklet transaction-manager="transactionManager"> <batch:chunk reader="csvItemReader" writer="csvItemWriter" processor="creditBillProcessor" commit-interval="3"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>

org.springframework.batch.core.launch.support.SimpleJobLauncher#run

// 简化部分代码(参数检查、log日志)@Overridepublic JobExecution run(final Job job, final JobParameters jobParameters){ final JobExecution jobExecution; JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); // 上次执行存在,说明本次请求是重启job,先做检查 if (lastExecution != null) { if (!job.isRestartable()) { throw new JobRestartException("JobInstance already exists and is not restartable"); } /* 检查stepExecutions的状态 * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING * retrieve the previous execution and check */ for (StepExecution execution : lastExecution.getStepExecutions()) { BatchStatus status = execution.getStatus(); if (status.isRunning() || status == BatchStatus.STOPPING) { throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + lastExecution); } else if (status == BatchStatus.UNKNOWN) { throw new JobRestartException( "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "); } } } // Check jobParameters job.getJobParametersValidator().validate(jobParameters); // 创建JobExecution 同一个job+参数,只能有一个Execution执行器 jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); try { // SyncTaskExecutor 看似是异步,实际是同步执行(可扩展) taskExecutor.execute(new Runnable() { @Override public void run() { try { // 关键入口,请看[org.springframework.batch.core.job.AbstractJob#execute] job.execute(jobExecution); if (logger.isInfoEnabled()) { Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime()); } } catch (Throwable t) { rethrow(t); } } private void rethrow(Throwable t) { // 省略各类抛异常 throw new IllegalStateException(t); } }); } catch (TaskRejectedException e) { // 更新job_execution的运行状态 jobExecution.upgradeStatus(BatchStatus.FAILED); if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); } jobRepository.update(jobExecution); } return jobExecution;}

3. 深入代码流程

简单看看API入口,子类划分较多,继续往后看

总体代码流程

  1. org.springframework.batch.core.launch.support.SimpleJobLauncher#run 入口api,构建jobExecution
  2. org.springframework.batch.core.job.AbstractJob#execute 对jobExecution进行执行、listener的前置处理
  3. FlowJob#doExecute -> SimpleFlow#start 按顺序逐个处理Step、构建stepExecution
  4. JobFlowExecutor#executeStep -> SimpleStepHandler#handleStep -> AbstractStep#execute 执行stepExecution
  5. TaskletStep#doExecute 通过RepeatTemplate,调用TransactionTemplate方法,在事务中执行
    1. 内部类TaskletStep.ChunkTransactionCallback#doInTransaction
  6. 反复调起ChunkOrientedTasklet#execute 去执行read-process-writer方法,
    1. 通过自定义的Reader得到inputs,例如本文实现的是flatReader读取csv文件
    2. 遍历inputs,将item逐个传入,调用processor处理
    3. 调用writer,将outputs一次性写入
    4. 不同reader的实现内容不同,通过缓存读取的行数等信息,可做到分片、按数量处理chunk

JobExecution的处理过程

org.springframework.batch.core.job.AbstractJob#execute

/** 运行给定的job,处理全部listener和DB存储的调用* Run the specified job, handling all listener and repository calls, and* delegating the actual processing to {@link #doExecute(JobExecution)}.** @see Job#execute(JobExecution)* @throws StartLimitExceededException* if start limit of one of the steps was exceeded*/@Ovrridepublic final void execute(JobExecution execution) { // 同步控制器,防并发执行 JobSynchronizationManager.register(execution); // 计时器,记录耗时 LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs", Tag.of("name", execution.getJobInstance().getJobName())); LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start(); Timer.Sample timerSample = BatchMetrics.createTimerSample(); try { // 参数再次进行校验 jobParametersValidator.validate(execution.getJobParameters()); if (execution.getStatus() != BatchStatus.STOPPING) { // 更新db中任务状态 execution.setStartTime(new Date()); updateStatus(execution, BatchStatus.STARTED); // 回调所有listener的beforeJob方法 listener.beforeJob(execution); try { doExecute(execution); } catch (RepeatException e) { throw e.getCause(); // 搞不懂这里包一个RepeatException 有啥用 } } else { // 任务状态时BatchStatus.STOPPING,说明任务已经停止,直接改成STOPPED // The job was already stopped before we even got this far. Deal // with it in the same way as any other interruption. execution.setStatus(BatchStatus.STOPPED); execution.setExitStatus(ExitStatus.COMPLETED); } } catch (JobInterruptedException e) { // 任务被打断 STOPPED execution.setExitStatus(getDefaultExitStatusForFailure(e, execution)); execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus())); execution.addFailureException(e); } catch (Throwable t) { // 其他原因失败 FAILED logger.error("Encountered fatal error executing job", t); execution.setExitStatus(getDefaultExitStatusForFailure(t, execution)); execution.setStatus(BatchStatus.FAILED); execution.addFailureException(t); } finally { try { if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED) && execution.getStepExecutions().isEmpty()) { ExitStatus exitStatus = execution.getExitStatus(); ExitStatus newExitStatus = ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job."); execution.setExitStatus(exitStatus.and(newExitStatus)); } // 计时器 计算总耗时 timerSample.stop(BatchMetrics.createTimer("job", "Job duration", Tag.of("name", execution.getJobInstance().getJobName()), Tag.of("status", execution.getExitStatus().getExitCode()) )); longTaskTimerSample.stop(); execution.setEndTime(new Date()); try { // 回调所有listener的afterJob方法 调用失败也不影响任务完成 listener.afterJob(execution); } catch (Exception e) { logger.error("Exception encountered in afterJob callback", e); } // 写入db jobRepository.update(execution); } finally { // 释放控制 JobSynchronizationManager.release(); } } }

3.2何时调用Reader?

在SimpleChunkProvider#provide中会分次调用reader,并将结果包装为Chunk返回。

其中有几个细节,此处不再赘述。

  1. 如何控制一次读取几个item?
  2. 如何控制最后一行读完就不读了?
  3. 如果需要跳过文件头的前N行,怎么处理?
  4. 在StepContribution中记录读取数量
org.springframework.batch.core.step.item.SimpleChunkProcessor#process @Nullable @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { @SuppressWarnings("unchecked") Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { inputs = chunkProvider.provide(contribution); if (buffering) { chunkContext.setAttribute(INPUTS_KEY, inputs); } } chunkProcessor.process(contribution, inputs); chunkProvider.postProcess(contribution, inputs); // Allow a message coming back from the processor to say that we // are not done yet if (inputs.isBusy()) { logger.debug("Inputs still busy"); return RepeatStatus.CONTINUABLE; } chunkContext.removeAttribute(INPUTS_KEY); chunkContext.setComplete(); if (logger.isDebugEnabled()) { logger.debug("Inputs not busy, ended: " + inputs.isEnd()); } return RepeatStatus.continueIf(!inputs.isEnd()); }

3.3何时调用Processor/Writer?

在RepeatTemplate和外围事务模板的包装下,通过SimpleChunkProcessor进行处理:

  1. 查出若干条数的items,打包为Chunk
  2. 遍历items,逐个item调用processor
    1. 通知StepListener,环绕处理调用before/after方法
// 忽略无关代码... @Override public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { // 输入为空,直接返回If there is no input we don't have to do anything more if (isComplete(inputs)) { return; } // Make the transformation, calling remove() on the inputs iterator if // any items are filtered. Might throw exception and cause rollback. Chunk<O> outputs = transform(contribution, inputs); // Adjust the filter count based on available data contribution.incrementFilterCount(getFilterCount(inputs, outputs)); // Adjust the outputs if necessary for housekeeping purposes, and then // write them out... write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); } // 遍历items,逐个item调用processor protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { Chunk<O> outputs = new Chunk<>(); for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { final I item = iterator.next(); O output; String status = BatchMetrics.STATUS_SUCCESS; try { output = doProcess(item); } catch (Exception e) { /* * For a simple chunk processor (no fault tolerance) we are done here, so prevent any more processing of these inputs. */ inputs.clear(); status = BatchMetrics.STATUS_FAILURE; throw e; } if (output != null) { outputs.add(output); } else { iterator.remove(); } } return outputs; }

4. 每个step是如何与事务处理挂钩?

在TaskletStep#doExecute中会使用TransactionTemplate,包装事务操作

标准的事务操作,通过函数式编程风格,从action的CallBack调用实际处理方法

  1. 通过transactionManager获取事务
  2. 无异常,则提交事务
  3. 若异常,则回滚
// org.springframework.batch.core.step.tasklet.TaskletStep#doExecute result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore)); // 事务启用过程 // org.springframework.transaction.support.TransactionTemplate#execute @Override @Nullable public <T> T execute(TransactionCallback<T> action) throws TransactionException { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) { return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action); } else { TransactionStatus status = this.transactionManager.getTransaction(this); T result; try { result = action.doInTransaction(status); } catch (RuntimeException | Error ex) { // Transactional code threw application exception -> rollback rollbackOnException(status, ex); throw ex; } catch (Throwable ex) { // Transactional code threw unexpected exception -> rollback rollbackOnException(status, ex); throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); } this.transactionManager.commit(status); return result; } }

5. 怎么控制每个chunk几条记录提交一次事务? 控制每个事务窗口处理的item数量

在配置任务时,有个step级别的参数,[commit-interval],用于每个事务窗口提交的控制被处理的item数量。

RepeatTemplate#executeInternal 在处理单条item后,会查看已处理完的item数量,与配置的chunk数量做比较,如果满足chunk数,则不再继续,准备提交事务。

StepBean在初始化时,会新建SimpleCompletionPolicy(chunkSize会优先使用配置值,默认是5)

在每个chunk处理开始时,都会调用SimpleCompletionPolicy#start新建RepeatContextSupport#count用于计数。

源码(简化) org.springframework.batch.repeat.support.RepeatTemplate#executeInternal

/** * Internal convenience method to loop over interceptors and batch * callbacks. * @param callback the callback to process each element of the loop. */private RepeatStatus executeInternal(final RepeatCallback callback) { // Reset the termination policy if there is one... // 此处会调用completionPolicy.start方法,更新chunk的计数器 RepeatContext context = start(); // Make sure if we are already marked complete before we start then no processing takes place. // 通过running字段来判断是否继续处理next boolean running = !isMarkedComplete(context); // 省略listeners处理.... // Return value, default is to allow continued processing. RepeatStatus result = RepeatStatus.CONTINUABLE; RepeatInternalState state = createInternalState(context); try { while (running) { /* * Run the before interceptors here, not in the task executor so * that they all happen in the same thread - it's easier for * tracking batch status, amongst other things. */ // 省略listeners处理.... if (running) { try { // callback是实际处理方法,类似函数式编程 result = getNextResult(context, callback, state); executeAfterInterceptors(context, result); } catch (Throwable throwable) { doHandle(throwable, context, deferred); } // 检查当前chunk是否处理完,决策出是否继续处理下一条item // N.B. the order may be important here: if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty() { running = false; } } } result = result.and(waitForResults(state)); // 省略throwables处理.... // Explicitly drop any references to internal state... state = null; } finally { // 省略代码... } return result;}

JSR-352标准定义了Java批处理的基本模型,包含批处理的元数据像 JobExecutions,JobInstances,StepExecutions 等等。通过此类模型,提供了许多基础组件与扩展点:

  1. 完善的基础组件
    1. Spring Batch 有很多的这类组件 例如 ItemReaders,ItemWriters,PartitionHandlers 等等对应各类数据和环境。
  2. 丰富的配置
    1. JSR-352 定义了基于XML的任务设置模型。Spring Batch 提供了基于Java (类型安全的)的配置方式
  3. 可伸缩性
    1. 伸缩性选项-Local Partitioning 已经包含在JSR -352 里面了。但是还应该有更多的选择 ,例如Spring Batch 提供的 Multi-threaded Step,Remote Partitioning ,Parallel Step,Remote Chunking 等等选项
  4. 扩展点
    1. 良好的listener模式,提供step/job运行前后的锚点,以供开发人员个性化处理批处理流程。

2013年, JSR-352标准包含在 JavaEE7中发布,到2022年已近10年,Spring也在探索新的批处理模式, 如Spring Attic /Spring Cloud Data Flow。 https://docs.spring.io/spring-batch/docs/current/reference/html/jsr-352.html

1. Job/Step运行时的上下文,是如何保存?如何控制?

整个Job在运行时,会将运行信息保存在JobContext中。 类似的,Step运行时也有StepContext。可以在Context中保存一些参数,在任务或者步骤中传递使用。

查看JobContext/StepContext源码,发现仅用了普通变量保存Execution,这个类肯定有线程安全问题。 生产环境中常常出现多个任务并处处理的情况。

SpringBatch用了几种方式来包装并发安全:

  1. 每个job初始化时,通过JobExecution新建了JobContext,每个任务线程都用自己的对象。
  2. 使用JobSynchronizationManager,内含一个ConcurrentHashMap,KEY是JobExecution,VALUE是JobContext
  3. 在任务解释时,会移除当前JobExecution对应的k-v

此处能看到,如果在JobExecution存储大量的业务数据,会导致无法GC回收,导致OOM。所以在上下文中,只应保存精简的数据。

2. step执行时,如果出现异常,如何保护运行状态?

在源码中,使用了各类同步控制和加锁、oldVersion版本拷贝,整体比较复杂(org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction)

  1. oldVersion版本拷贝:上一次运行出现异常时,本次执行时沿用上次的断点内容
// 节选部分代码oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());copy(stepExecution, oldVersion); private void copy(final StepExecution source, final StepExecution target) { target.setVersion(source.getVersion()); target.setWriteCount(source.getWriteCount()); target.setFilterCount(source.getFilterCount()); target.setCommitCount(source.getCommitCount()); target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));}
  1. 信号量控制,在每个chunk运行完成后,需先获取锁,再更新stepExecution前
    1. Shared semaphore per step execution, so other step executions can run in parallel without needing the lockSemaphore (org.springframework.batch.core.step.tasklet.TaskletStep#doExecute)
// 省略无关代码try { try { // 执行w-p-r模型方法 result = tasklet.execute(contribution, chunkContext); if (result == null) { result = RepeatStatus.FINISHED; } } catch (Exception e) { // 省略... }}finally { // If the step operations are asynchronous then we need to synchronize changes to the step execution (at a // minimum). Take the lock *before* changing the step execution. try { // 获取锁 semaphore.acquire(); locked = true; } catch (InterruptedException e) { logger.error("Thread interrupted while locking for repository update"); stepExecution.setStatus(BatchStatus.STOPPED); stepExecution.setTerminateOnly(); Thread.currentThread().interrupt(); } stepExecution.apply(contribution);}stepExecutionUpdated = true;stream.update(stepExecution.getExecutionContext());try { // 更新上下文、DB中的状态 // Going to attempt a commit. If it fails this flag will stay false and we can use that later. getJobRepository().updateExecutionContext(stepExecution); stepExecution.incrementCommitCount(); getJobRepository().update(stepExecution);}catch (Exception e) { // If we get to here there was a problem saving the step execution and we have to fail. String msg = "JobRepository failure forcing rollback"; logger.error(msg, e); throw new FatalStepExecutionException(msg, e);}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK