<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.9.3</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.9.3</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.9.3</version> </dependency>
# 调度配置 -- 将任务等保存化到数据库 spring.quartz.job-store-type=jdbc #spring.quartz.jdbc.initialize-schema=always #程序结束时会等待quartz相关的内容结束 spring.quartz.wait-for-jobs-to-complete-on-shutdown=true # 修改定时触发时间能随时生效 spring.quartz.overwrite-existing-jobs=true # scheduler实例名 调度器实例名称 spring.quartz.properties.org.quartz.scheduler.instanceName=myScheduler # 调度器实例编号自动生成 spring.quartz.properties.org.quartz.scheduler.instanceId=AUTO # 持久化方式配置 -- 数据保存方式为数据库持久化 spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX # 数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库 spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate # 数据表的前缀,默认QRTZ_ spring.quartz.properties.org.quartz.jobStore.tablePrefix=qrtz_ # JobDataMaps是否都为String类型 spring.quartz.properties.org.quartz.jobStore.useProperties=false spring.quartz.properties.org.quartz.jobStore.clusterCheckinInterval=10000 # 是否支持集群 spring.quartz.properties.org.quartz.jobStore.isClustered=false # 线程池相关 -- 线程池的实现类 spring.quartz.properties.org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool # 线程池中的线程数量 spring.quartz.properties.org.quartz.threadPool.threadCount=10 # 线程优先级 spring.quartz.properties.org.quartz.threadPool.threadPriority=5 # 配置是否启动自动加载数据库内的定时任务,默认true spring.quartz.properties.org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
package com.example.gauditdemo.config; import com.example.gauditdemo.task.MysqlAddEsScheduler; import org.quartz.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Frederic.Hu * @Description Quartz的相关配置,注册JobDetail和Trigger * 注意JobDetail和Trigger是org.quartz包下的,不是spring包下的,不要导入错误 * @date 2021/12/08 17:32 */ @Configuration public class QuartzConfig { /** builder 类创建了一个JobDetail和一个Trigger并注册成为Spring bean, * 这些bean会自动关联到调度器上,JobDetail和Trigger需要设置组名和自己的名字,用来作为唯一标识 * JobDetail里有一个StartOfDayJob类,这个类就是job接口的一个实现类,里面定义了任务的具体内容 */ @Bean public JobDetail jobDetail() { // 指定具体的定时任务类 JobDetail jobDetail = JobBuilder.newJob(MysqlAddEsScheduler.class) .withIdentity("myJob1", "myJobGroup1") .storeDurably() .build(); return jobDetail; } /** Trigger通过corn表达式指定了任务执行的周期。 */ @Bean public Trigger trigger() { Trigger trigger = TriggerBuilder.newTrigger() .forJob(jobDetail()) .withIdentity("myTrigger1", "myTriggerGroup1") .startNow() // 0 */1 * * * ? 每分钟执行 // */5 * * * * ? 每5s执行 .withSchedule(CronScheduleBuilder.cronSchedule("*/5 * * * * ?")) .build(); // 返回任务触发器 return trigger; } /** 这边你可以写另外一个定时任务 */ // @Bean // public JobDetail jobDetail2() { // // 指定具体的定时任务类 // JobDetail jobDetail = JobBuilder.newJob(ElasticSearchUtil.class) // .withIdentity("myJob2", "myJobGroup2") // .storeDurably() // .build(); // return jobDetail; // } // // @Bean // public Trigger trigger2() { // Trigger trigger = TriggerBuilder.newTrigger() // .forJob(jobDetail2()) // .withIdentity("myTrigger2", "myTriggerGroup2") // .startNow() // // 每天0点执行 0 0 0 * * ? 这里设定执行方式 // // 0 */1 * * * ? 每分钟执行 // // */5 * * * * ? 每5s执行 // .withSchedule(CronScheduleBuilder.cronSchedule("*/5 * * * * ?")) // .build(); // // 返回任务触发器 // return trigger; // } }
package com.example.gauditdemo.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * @author Frederic.Hu * @Description * @date 2021/12/02 13:37 */ @Component public class ESConfigClient { public final Logger logger = LoggerFactory.getLogger(this.getClass()); @Bean public RestHighLevelClient esClient(){ return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http"))); } }
package com.example.gauditdemo.utils; import com.example.gauditdemo.config.ESConfigClient; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.io.IOException; import java.util.function.BiConsumer; /** * @author Frederic.Hu * @Description * @date 2021/12/16 10:19 */ @Component public class ESCommonConfig { public final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private ESConfigClient esConfigClient; @PreDestroy public void destory() { try { esConfigClient.esClient().close(); logger.info("esClient客户端已经关闭:{}", esConfigClient.esClient()); } catch (Exception e) { logger.error("关闭restHighLevelClient异常:", e); } } /** * @return org.elasticsearch.action.bulk.BulkProcessor * @description: 构建bulkProcessor接口 异步执行 * */ @Bean @Scope("prototype") public BulkProcessor getBulkAsyncProcessor( ) { RestHighLevelClient restHighLevelClient = esConfigClient.esClient(); BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (bulkRequest, bulkResponseActionListener) -> restHighLevelClient.bulkAsync( bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener); logger.info("getBulkAsyncProcessor 中 ES客户端地址:{}", restHighLevelClient); return BulkProcessor.builder(consumer, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest bulkRequest) { //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数 int numberOfActions = bulkRequest.numberOfActions(); logger.info("同步数量 Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。 if (bulkResponse.hasFailures()) { logger.error("Bulk [{}] executed with failures,response = {}", executionId, bulkResponse.buildFailureMessage()); } else { logger.info("写入成功 Bulk [{}] completed in {} milliseconds", executionId, bulkResponse.getTook().getMillis()); try { restHighLevelClient.close(); logger.info("运行到最后时的es客户端地址:{}", restHighLevelClient); } catch (IOException e) { e.printStackTrace(); } } } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable failure) { //重写方法,如果发生错误就会调用。 logger.error("写入失败 Failed to execute bulk", failure); } }).setBulkActions(20000) // 达到刷新的条数 .setBulkSize(new ByteSizeValue(15L, ByteSizeUnit.MB)) // 达到 刷新的大小 .setConcurrentRequests(100) // 并发请求数量, 0不并发, 1并发允许执行 .setFlushInterval(TimeValue.timeValueSeconds(20)) // 固定刷新的时间频率 .setBackoffPolicy(BackoffPolicy.constantBackoff( TimeValue.timeValueSeconds(100L), 3)) // 重试补偿策略 .build(); } }
package com.example.gauditdemo.task; import com.example.gauditdemo.dao.OperationDao; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.xcontent.XContentType; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.List; import java.util.Map; /** * @author Frederic.Hu * @Description * @date 2021/12/15 10:32 */ @Component public class MysqlAddEsScheduler extends QuartzJobBean { public final Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private OperationDao operationDao; @Value("${audit.index.prefix.env}") private String auditIndexPrefixEnv; @Autowired private BulkProcessor bulkProcessor; @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { long startTime = System.currentTimeMillis(); List<Map<String, Object>> mapList = selectAll(); logger.info("同步数据 tongBuSize:{}条", mapList.size()); try { if (!mapList.isEmpty()) { mapList.parallelStream().forEach(item -> { try { SimpleDateFormat sdFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Calendar cal = Calendar.getInstance(); // 自动生成的operationtime自动映射成date类型 cal.setTime(sdFormat.parse(item.get("operationtime").toString())); // 插入es数据时间相差8小时 cal.add(Calendar.HOUR_OF_DAY, +8); item.replace("operationtime", cal.getTime()); } catch (ParseException e) { logger.error("Failed to convert time:", e); } bulkProcessor.add(new IndexRequest().index(auditIndexPrefixEnv + item.get("indexsuffix").toString()).source(item, XContentType.JSON)); }); } // 刷新 bulkProcessor.flush(); bulkProcessor.close(); } catch (Exception e) { logger.error("BulkProcessor,插入数据异常", e); } logger.info("tongbu use time: " + (System.currentTimeMillis() - startTime) + "ms"); } @Transactional List<Map<String, Object>> selectAll() { return operationDao.selectOperationAndChangeDate(); } }
原因及解决办法:mysql中有一个时间date类型,同步到es中,这个时间类型在es中是text类型,导致查询会报错,预想应该在es中也是date类型才对。解决办法:mybatis中查询出来的结果将时间进行转换,代码中有,我写了注释。最好将es中的索引先删除掉,然后es会自动创建索引和字段类型的。
原因及解决办法:同步的时候发现es中时间少8小时。解决办法:mybatis中查询出来的结果将时间加8个小时,代码中有,可以参考一下,我写了注释。
原因及解决办法:自己搜索0.0
原因及解决办法:比较方便,方便我调式
原因及解决办法:当你数据量特别大的时候,不用bulkProcessor,如果一次性同步几百万条数据,会将es弄崩掉的。解决办法:加上bulkProcessor,可以自行设置多少条同步一次,或者几s自动同步一次,也比较方便
原因及解决办法:定时任务类中,批量塞入数据时,只创建一个bulkProcessor对象,就不需要批量多少条数据就创建多少个对象。解决办法:将 bulkProcessor 对象给 Spring 容器管理
原因及解决办法:每次批量同步完,记得最后要将bulkProcessor关闭掉,不然长时间同步下去,服务就会宕机。解决办法:在最后同步完,加上bulkProcessor关闭掉。
现象及解决办法:当我定时任务设置每5分钟执行一次,都会整点执行,不会说你启动服务是在8点53分启动,定时任务就会在8点58分执行,不是这样的,定时任务会在8点55分开始执行一次,下一次则是在9点整执行一次。解决办法:当你有两个定时任务对同一张表操作时,可以将其时间错开,设定一个特定时间。
五、测试环境同步日志