7

自动生成获取检查点更新图数据的任务支持减T操作

 2 years ago
source link: https://crazyyanchao.github.io/blog/2022/01/15/%E8%87%AA%E5%8A%A8%E7%94%9F%E6%88%90%E8%8E%B7%E5%8F%96%E6%A3%80%E6%9F%A5%E7%82%B9%E6%9B%B4%E6%96%B0%E5%9B%BE%E6%95%B0%E6%8D%AE%E7%9A%84%E4%BB%BB%E5%8A%A1%E6%94%AF%E6%8C%81%E5%87%8FT%E6%93%8D%E4%BD%9C.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

自动生成获取检查点更新图数据的任务支持减T操作

 

Here’s the table of contents:

-- 配置任务
CALL custom.task.deploy(NULL,NULL,'PRE中文全称','_set_PRE删除','`PRE中文全称`节点动态增加`PRE删除`标签','mayc01') HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除
CALL custom.task.deploy(NULL,NULL,'PRE中文全称','_remove_PRE删除','`PRE中文全称`节点动态移除`PRE删除`标签','mayc01') HGRAPHTASK()-[]->(PRE中文全称)_remove_PRE删除
--- `task_cql`中定义一个变量`{fragment}`,检查点时间会自动替换该变量
WITH
'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\'PRE删除\\\') RETURN t \', \'WITH {t} AS t SET t:PRE删除 \', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql,
2 AS subtracted_t
CALL custom.task.update.by.check_point.sub.t(jdbc_etl_url,task_hcode,task_cql,subtracted_t) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
// SQL减T操作
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_SUB(DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\'),INTERVAL ? day) AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT',[100])

编写一个测试脚本

WITH
'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\'PRE删除\\\') RETURN t \', \'WITH {t} AS t SET t:PRE删除 \', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql,
2 AS subtracted_t
// ===========================获取锁并执行TASK===========================
// 获取任务锁并锁定任务
CALL apoc.load.jdbcUpdate(jdbc_etl_url,'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql,subtracted_t
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbc(jdbc_etl_url,'SELECT DATE_SUB(DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\'),INTERVAL ? day) AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',[subtracted_t,task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql
// 批量迭代执行节点构建
WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql
WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,'{fragment}',TOSTRING(apoc.date.convertFormat(rawCheckPoint,'yyyy-MM-dd HH:mm:ss','yyyyMMddHHmmss'))) AS task_cql,currentTime,rawCheckPoint
CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint
// batchFailedSize>0则任务状态回滚【设置node_check_point、rel_check_point等于rawCheckPoint】
// batchFailedSize<=0【更新node_check_point、rel_check_point为系统时间】
WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint
WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,
    'CALL apoc.load.jdbcUpdate(\''+jdbc_etl_url+'\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\''+task_hcode+'\']) YIELD row RETURN row' AS rollbackSql,
    'CALL apoc.load.jdbcUpdate(\''+jdbc_etl_url+'\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$currentTime,\''+task_hcode+'\']) YIELD row RETURN row' AS updateSql
CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})
    YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,rawCheckPoint
// 释放锁【TASK结束运行释放锁操作】
CALL apoc.load.jdbcUpdate(jdbc_etl_url,'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',[task_hcode]) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;

将测试脚本配置为一个任务

WITH
'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
'CALL apoc.periodic.iterate( \\\'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\\\\\'PRE删除\\\\\\\') RETURN t \\\', \\\'WITH {t} AS t SET t:PRE删除 \\\', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql,
2 AS subtracted_t
WITH jdbc_etl_url,task_hcode,task_cql,subtracted_t,'task.update.by.check_point.sub.t.'+REPLACE('','-','_')+''+REDUCE(hcode='',hcodeStrs IN apoc.text.regexGroups(task_hcode,'((?!=,)([A-Za-z0-9_\u4e00-\u9fa5]+))+') | hcode+hcodeStrs[0]+'_') AS taskName
    ,'WITH \'{jdbc_etl_url}\' AS jdbc_etl_url, \'{task_hcode}\' AS task_hcode, \'{task_cql}\' AS task_cql,\'{subtracted_t}\' AS subtracted_t CALL apoc.load.jdbcUpdate(jdbc_etl_url,\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1\',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql,subtracted_t CALL apoc.load.jdbc(jdbc_etl_url,\'SELECT DATE_SUB(DATE_FORMAT(node_check_point,\\\'%Y-%m-%d %H:%i:%s\\\'),INTERVAL ? day) AS check_point,DATE_FORMAT(NOW(),\\\'%Y-%m-%d %H:%i:%s\\\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?\',[subtracted_t,task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,\'{fragment}\',TOSTRING(apoc.date.convertFormat(rawCheckPoint,\'yyyy-MM-dd HH:mm:ss\',\'yyyyMMddHHmmss\'))) AS task_cql,currentTime,rawCheckPoint CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,     \'CALL apoc.load.jdbcUpdate(\\\'\'+jdbc_etl_url+\'\\\',\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\\\',[$rawCheckPoint,\\\'\'+task_hcode+\'\\\']) YIELD row RETURN row\' AS rollbackSql,     \'CALL apoc.load.jdbcUpdate(\\\'\'+jdbc_etl_url+\'\\\',\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\\\',[$currentTime,$currentTime,\\\'\'+task_hcode+\'\\\']) YIELD row RETURN row\' AS updateSql CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})     YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,currentTime,rawCheckPoint CALL apoc.load.jdbcUpdate(jdbc_etl_url,\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?\',[task_hcode]) YIELD row AS releaseLock RETURN {releaseLock:releaseLock,value:value,batchFailedSize:batchFailedSize,currentTime:currentTime,rawCheckPoint:rawCheckPoint} AS result;' AS cypher_task
WITH taskName,olab.replace(cypher_task,[{raw:'{jdbc_etl_url}',rep:jdbc_etl_url},{raw:'{task_hcode}',rep:task_hcode},{raw:'{task_cql}',rep:task_cql},{raw:'{subtracted_t}',rep:subtracted_t}]) AS cypher_task
CALL apoc.custom.asProcedure(
  taskName,
  cypher_task,
  'WRITE',
  [['result','MAP']],
  [],
  '自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL'
)
RETURN 'CALL custom.'+taskName AS procedure_name,cypher_task
CALL custom.task.update.by.check_point.sub.t.HGRAPHTASK_PRE中文全称__set_PRE删除_

将测试脚本使用参数化方式封装为过程

@param {jdbc_etl_url}-支持MySQL、Oracle、SqlServer
@param {task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel)
@param {task_cql}-任务CQL:需要获取检查点并定期执行的CQL
@param {subtracted_t}-减T操作:当前执行时的检查点向前滚回指定天数
CALL apoc.custom.asProcedure(
  'task.update.by.check_point.sub.t',
  'WITH $jdbc_etl_url AS jdbc_etl_url, $task_hcode AS task_hcode, $task_cql AS task_cql,$subtracted_t AS subtracted_t WITH jdbc_etl_url,task_hcode,task_cql,subtracted_t,\'task.update.by.check_point.sub.t.\'+REPLACE(\'\',\'-\',\'_\')+\'\'+REDUCE(hcode=\'\',hcodeStrs IN apoc.text.regexGroups(task_hcode,\'((?!=,)([A-Za-z0-9_\u4e00-\u9fa5]+))+\') | hcode+hcodeStrs[0]+\'_\') AS taskName     ,\'WITH \\\'{jdbc_etl_url}\\\' AS jdbc_etl_url, \\\'{task_hcode}\\\' AS task_hcode, \\\'{task_cql}\\\' AS task_cql,\\\'{subtracted_t}\\\' AS subtracted_t CALL apoc.load.jdbcUpdate(jdbc_etl_url,\\\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1\\\',[task_hcode]) YIELD row AS lock WHERE lock.count>0 WITH lock,jdbc_etl_url,task_hcode,task_cql,subtracted_t CALL apoc.load.jdbc(jdbc_etl_url,\\\'SELECT  DATE_SUB(DATE_FORMAT(node_check_point,\\\\\\\'%Y-%m-%d %H:%i:%s\\\\\\\'),INTERVAL ? day) AS check_point,DATE_FORMAT(NOW(),\\\\\\\'%Y-%m-%d %H:%i:%s\\\\\\\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?\\\',[subtracted_t,task_hcode]) YIELD row WITH row.currentTime AS currentTime,row.check_point AS rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH rawCheckPoint AS fragment,currentTime,rawCheckPoint,jdbc_etl_url,task_hcode,task_cql WITH jdbc_etl_url,task_hcode,REPLACE(task_cql,\\\'{fragment}\\\',TOSTRING(apoc.date.convertFormat(rawCheckPoint,\\\'yyyy-MM-dd HH:mm:ss\\\',\\\'yyyyMMddHHmmss\\\'))) AS task_cql,currentTime,rawCheckPoint CALL apoc.cypher.doIt(task_cql,{}) YIELD value WITH value,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH SUM(value.batch.failed) AS batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint WITH batchFailedSize,jdbc_etl_url,task_hcode,task_cql,currentTime,rawCheckPoint,     \\\'CALL apoc.load.jdbcUpdate(\\\\\\\'\\\'+jdbc_etl_url+\\\'\\\\\\\',\\\\\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\\\\\\\',[$rawCheckPoint,\\\\\\\'\\\'+task_hcode+\\\'\\\\\\\']) YIELD row RETURN row\\\' AS rollbackSql,     \\\'CALL apoc.load.jdbcUpdate(\\\\\\\'\\\'+jdbc_etl_url+\\\'\\\\\\\',\\\\\\\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\\\\\\\',[$currentTime,$currentTime,\\\\\\\'\\\'+task_hcode+\\\'\\\\\\\']) YIELD row RETURN row\\\' AS updateSql CALL apoc.do.case([batchFailedSize>0,rollbackSql],updateSql,{currentTime:currentTime,rawCheckPoint:rawCheckPoint})     YIELD value WITH value,batchFailedSize,jdbc_etl_url,task_hcode,currentTime,rawCheckPoint CALL apoc.load.jdbcUpdate(jdbc_etl_url,\\\'UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?\\\',[task_hcode]) YIELD row AS releaseLock RETURN {releaseLock:releaseLock,value:value,batchFailedSize:batchFailedSize,currentTime:currentTime,rawCheckPoint:rawCheckPoint} AS result;\' AS cypher_task WITH taskName,olab.replace(cypher_task,[{raw:\'{jdbc_etl_url}\',rep:jdbc_etl_url},{raw:\'{task_hcode}\',rep:task_hcode},{raw:\'{task_cql}\',rep:task_cql},{raw:\'{subtracted_t}\',rep:subtracted_t}]) AS cypher_task CALL apoc.custom.asProcedure(   taskName,   cypher_task,   \'WRITE\',   [[\'result\',\'MAP\']],   [],   \'自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL\' ) RETURN \'CALL custom.\'+taskName AS procedure_name,cypher_task',
  'WRITE',
  [['procedure_name','STRING'],['cypher_task','STRING']],
  [['jdbc_etl_url','STRING'],['task_hcode','STRING'],['task_cql','STRING'],['subtracted_t','INTEGER']],
  '自动生成获取检查点更新图数据的任务:{jdbc_task_url}-任务状态表状态锁表位置默认只支持MySQL ;{task_hcode}-HGRAPHTASK(FromLabel)-[RelaName]->(ToLabel);{task_cql}-任务CQL:需要获取检查点并定期执行的CQL'
);

生产中使用时需要注意:1、需要按照检查点时间进行更新时,必须设置{fragment}参数;2、如果需要任务锁正常释放,则必须使task_cql有返回值

  • 可以正常释放锁并替换检查点时间【生产可用】
    WITH
    'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC' AS jdbc_etl_url,
    'HGRAPHTASK()-[]->(PRE中文全称)_set_PRE删除' AS task_hcode,
    'CALL apoc.periodic.iterate( \\\'MATCH (f:PREPCODE)-[:中文全称]->(t:PRE中文全称) WHERE f.hupdatetime>{fragment} AND f.hisvalid=1 AND t.hisvaild=0 AND NOT apoc.coll.contains(LABELS(t),\\\\\\\'PRE删除\\\\\\\') RETURN t \\\', \\\'WITH {t} AS t SET t:PRE删除 \\\', {parallel:false,batchSize:1000}) YIELD  batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;' AS task_cql,
    2 AS subtracted_t
    CALL custom.task.update.by.check_point.sub.t(jdbc_etl_url,task_hcode,task_cql,subtracted_t) YIELD procedure_name,cypher_task RETURN procedure_name,cypher_task
    

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK