Commit 8f5107bd by wangkai

Merge branch 'master' of 10.197.236.199:wangkai/update_hbase

git commit -m "xiugaifenzhihebing"
parents c308a80a 1d4fc257
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.AcdAll;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface AcdAllRepository extends ElasticsearchRepository<AcdAll,String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.AcdHuman;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface AcdHumanRepository extends ElasticsearchRepository<AcdHuman,String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.DrvFlow;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface DrvFlowRepository extends ElasticsearchRepository<DrvFlow, String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.Surveil;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface SurveilEs extends ElasticsearchRepository<Surveil, String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.Surveil;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface SurveilRepository extends ElasticsearchRepository<Surveil,String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.VehFlow;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface VehFlowRepository extends ElasticsearchRepository<VehFlow,String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.VehicleEs;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface VehicleEsRepository extends ElasticsearchRepository<VehicleEs,String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.Vio;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface VioRepository extends ElasticsearchRepository<Vio,String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.Violation;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
@Component
public interface ViolationEs extends ElasticsearchRepository<Violation, String> {
}
package com.hikcreate.update_hbase.Repo;
import com.hikcreate.update_hbase.entity.Violation;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface ViolationRepository extends ElasticsearchRepository<Violation,String> {
}
package com.hikcreate.update_hbase;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(exclude = {
DataSourceAutoConfiguration.class
})
@EnableConfigurationProperties
@EnableScheduling
@MapperScan("com.hikcreate.update_hbase.dao")
public class UpdateHbaseApplication {
public static void main(String[] args) {
System.setProperty("es.set.netty.runtime.available.processors", "false");
SpringApplication.run(UpdateHbaseApplication.class, args);
}
}
package com.hikcreate.update_hbase.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Created by lifuyi on 2018/10/24.
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DBSource {
String name();
}
package com.hikcreate.update_hbase.aspect;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
/**
* Created by lifuyi on 2018/10/24.
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
//保存自己线程的本地变量
private static final ThreadLocal<String> datasourceHolder = new ThreadLocal<>();
@Override
protected Object determineCurrentLookupKey() {
return datasourceHolder.get();
}
static void setDataSource(String sourceName) {
datasourceHolder.set(sourceName);
}
static void clearDataSource() {
datasourceHolder.remove();
}
}
package com.hikcreate.update_hbase.aspect;
import com.hikcreate.update_hbase.annotation.DBSource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* Created by lifuyi on 2018/10/24.
*/
@Aspect
@Order(-1)// 保证该AOP在@Transactional之前执行
@Component
public class DynamicDataSourceAspect {
@Before(value = "execution(* com.hikcreate.update_hbase.dao..*.*(..))")
public void changeDataSource(JoinPoint point) throws Throwable {
String sourceName = null;
//获得当前访问的class
Class<?> classes = point.getTarget().getClass();
//获得访问的方法名称
String methodName = point.getSignature().getName();
//定义的接口方法
Method abstractMethod = ((MethodSignature) point.getSignature()).getMethod();
//是否含有注解DBSource
if (abstractMethod.isAnnotationPresent(DBSource.class)) {
sourceName = abstractMethod.getAnnotation(DBSource.class).name();
}
//接口方法参数类型
Class<?>[] parameterTypes = abstractMethod.getParameterTypes();
try {
//实现类中的该方法
Method method = classes.getMethod(methodName, parameterTypes);
if (method.isAnnotationPresent(DBSource.class)) {
sourceName = method.getAnnotation(DBSource.class).name();
}
} catch (Exception e) {
e.printStackTrace();
}
if (sourceName != null) {
DynamicDataSource.setDataSource(sourceName);
}
}
@Pointcut("execution(* com.hikcreate.update_hbase.dao..*.*(..))")
public void pointCut() {
}
@After("pointCut()")
public void after(JoinPoint point) {
DynamicDataSource.clearDataSource();
}
}
package com.hikcreate.update_hbase.conf;
import com.hikcreate.update_hbase.aspect.DynamicDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
//import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
/**
* Created by lifuyi on 2018/10/24.
*/
@Configuration
public class DataSourceConfig {
@Value("${spring.datasource.type}")
private Class<? extends DataSource> dataSourceType;
@Bean(name="masterDataSource", destroyMethod = "close", initMethod="init")
@ConfigurationProperties(prefix = "spring.datasource.hik")
public DataSource masterDataSource() {
return DataSourceBuilder.create().type(dataSourceType).build();
}
@Bean(name="slaveDataSource", destroyMethod = "close", initMethod="init")
@ConfigurationProperties(prefix = "spring.datasource.bokang")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().type(dataSourceType).build();
}
@Bean(name = "dataSource")
public DataSource dataSource() {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 配置多数据源
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("hik", masterDataSource());
targetDataSources.put("bokang", slaveDataSource());
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(slaveDataSource());
return dynamicDataSource;
}
}
package com.hikcreate.update_hbase.conf;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* @author alanc
* @ClassName MyConfig
* @Description TODO 配置类
* @date: 2018/11/3 11:03
*/
//@Configuration
public class MyConfig {
// @Value("${esIpCluster}")
// String ipStr;
// @Value("${esName}")
// String clusterName;
// @Bean
// public TransportClient client() throws UnknownHostException {
// Settings settings = Settings.builder().put("cluster.name", clusterName)
// .put("client.transport.sniff", true).build();
// TransportClient client = new PreBuiltTransportClient(settings);
// String[] split = ipStr.split(",");
// for(String string:split){
// client.addTransportAddress( new TransportAddress(InetAddress.getByName(string), 9300));
// }
// return client;
// }
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.DmsmpService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 放管服的预警点
*/
@RestController
@RequestMapping("/dmsmp")
public class DmsmpController {
@Autowired
private DmsmpService dmsmpService;
/**
* 临时号牌:同一机动车车辆四次临牌办理
* @param times 次数
*/
@GetMapping("/tempplate")
public void tempplateWarn(@RequestParam("times") Integer times){
dmsmpService.tempplateWarn(times);
}
/**
* 所有人和车牌都改变预警
*/
@GetMapping("/plateAndSyrChange")
public void plateAndSyrChange(){
dmsmpService.plateAndSyrChange(0);
}
/**
* 共同所有人
*/
@GetMapping("/commonSyr")
public void commonSyr(){
dmsmpService.commonSyr(0);
}
/**
* 姓名和身份证号码同时改变
*/
@GetMapping("/drvNameAndCard")
public void drvNameAndCard(){
dmsmpService.drvNameAndCard(0);
}
/**
* 制证后推办
*/
@GetMapping("/certicateQuit")
public void certicateQuit(){
dmsmpService.certicateQuit(0);
}
/**
* 同一所有人多次变更号牌
*/
@GetMapping("/hisSyrPlateChangeMany")
public void hisSyrPlateChangeMany(){
dmsmpService.syrPlateChangeMany(0);
}
/**
*车辆短期解押
*/
@GetMapping("/getHisMortgageCancel")
public void getHisMortgageCancel(){
dmsmpService.getHisMortgageCancel(0);
}
/**
*退办业务短期办结
*/
@GetMapping("/getHisRetireDealAgain")
public void getHisRetireDealAgain(){
dmsmpService.getHisRetireDealAgain(0);
}
/**
* 同步残疾人车辆:规则---审批通过,符合条件,核发通过
*/
@GetMapping("/synDeformedproposer")
public void synDeformedproposer(){
dmsmpService.synDeformedproposer(0);
}
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.DriverService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/driver")
public class DriverController {
@Autowired
DriverService driverService;
@RequestMapping("/insertDriver")
void insertDriver(String idcard) {
driverService.insertDriver2Hbase(idcard);
}
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.VioEsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/es")
public class EsController {
@Autowired
private VioEsService vioEsService;
@GetMapping("/insert2es12")
public void getHisVioToEs(@RequestParam("start") String start,@RequestParam("end") String end){
vioEsService.getHisVioToEs(start,end);
}
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.EveryDayScheduleService;
import com.hikcreate.update_hbase.service.VehicleIndexService;
import com.hikcreate.update_hbase.service.VioIndexService;
import com.hikcreate.update_hbase.service.impl.VehicleIndexServiceImpl;
import com.hikcreate.update_hbase.service.impl.VioIndexServiceImpl;
import com.hikcreate.update_hbase.utils.HbaseUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(path = "/dataStation")
public class OracleToHbaseController {
private Logger logger = LoggerFactory.getLogger(RestController.class);
@Autowired
private VehicleIndexService vehicleIndexService;
@Autowired
private VioIndexService vioIndexService;
@Autowired
private EveryDayScheduleService everyDayScheduleService;
/**
* 从Oracle导入全量机动车索引数据到hbase
*/
@RequestMapping(path = "/insertVehicleIndex/all")
public void insertVehicleIndexAll() {
vehicleIndexService.listVehIndexall();
}
/**
* 从Oracle导入昨日机动车索引数据到hbase
*/
@RequestMapping(path = "/insertVehicleIndex/yesterday")
public void insertVehicleIndexYesterday() {
vehicleIndexService.listVehIndexyesterday();
}
/**
* 从Oracle导入全量违法索引数据到hbase
*/
@RequestMapping(path = "/insertVioIndex/all")
public void insertVioIndexAll() {
vioIndexService.listVioIndexAll();
}
/**
* 从Oracle导入昨日违法索引数据到hbase
*/
@RequestMapping(path = "/insertVioIndex/yesterday")
public void insertVioIndexYesterday() {
vioIndexService.listVioIndexYesterday();
}
/**
* 从Oracle导入几天前的当日数据 ep:days=1 表示导入昨日数据,插入hbase ,发Kafka
*
* @param days 天
*/
@RequestMapping(path = "/insertVio/DaysAgo")
public void insertVioDaysAgo(@RequestParam("days") int days) {
everyDayScheduleService.getViolationData(days);
}
/**
* 从Oracle导入几天前的当日数据 ep:days=1 表示导入昨日数据,插入hbase ,发Kafka
*
* @param days 天
*/
@RequestMapping(path = "/insertVeh/DaysAgo")
public void insertVehDaysAgo(@RequestParam("days") int days) {
everyDayScheduleService.getVehicleData(days);
}
/**
* 从Oracle导入几天前的当日数据 ep:days=1 表示导入昨日数据,插入hbase ,发Kafka
*
* @param days 天
*/
@RequestMapping(path = "/insertDrv/DaysAgo")
public void insertDrvDaysAgo(@RequestParam("days") int days) {
everyDayScheduleService.getDrivinglicenseData(days);
}
/**
* 从Oracle导入 days天前的当日数据
*
* @param days 天
*/
@RequestMapping(path = "/insertDrvFlow/DaysAgo")
public void insertDrvFlowDaysAgo(@RequestParam("days") int days) {
everyDayScheduleService.getDrvFlowDayData(days);
}
/**
* 导入全量的驾驶证业务流水索引
*/
@RequestMapping(path = "/insertDrvFlow/All")
public void insertDrvFlowIndexAll() {
everyDayScheduleService.getDrvFlowIndexAll();
}
@RequestMapping(path = "/insertVehIndex/Extra")
public void insertVehIndexExtra() {
vehicleIndexService.listVehIndexExtra();
}
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.SurveilService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yulong shu
* @version 1.0
* @date 2021/5/12 9:20
*/
@RestController
@RequestMapping("/surveil")
public class SurveilCOntroller {
@Autowired
SurveilService surveilService;
@RequestMapping("/insertSurveil2Es")
public void insertSurveil2Es(@RequestParam("ids") String... ids) {
surveilService.imDataOracle2EsById(ids);
}
@Autowired
SurveilService surServiceBatch;
@RequestMapping("/insertDataOrc2EsBatch")
public void insertDataOrc2EsBatch(@RequestParam("day") String day){
surServiceBatch.insertDataOrc2EsBatch(day);
}
}
package com.hikcreate.update_hbase.controller;
import antlr.ASdebug.IASDebugStream;
import com.hikcreate.update_hbase.service.EveryDayScheduleService;
import com.hikcreate.update_hbase.service.UpdateHbaseService;
import com.hikcreate.update_hbase.service.VehFlowService;
import org.apache.zookeeper.ZooDefs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/updateHbase")
public class UpdateHbaseController {
@Autowired
private VehFlowService vehFlowService;
@Autowired
private UpdateHbaseService updateHbaseService;
@Autowired
private EveryDayScheduleService everyDayScheduleService;
@GetMapping("/testVio")
public void testVio(@RequestParam("day") String day){
updateHbaseService.vioUpdateHbase(day+" 00:00:00");
}
@GetMapping("/testVehicle")
public void testVehicle(@RequestParam("day") String day){
updateHbaseService.vehicleUpdateHbase(day+" 00:00:00");
}
@GetMapping("/testDrv")
public void testDrv(@RequestParam("day") String day){
updateHbaseService.driverUpdateHbase(day+" 00:00:00");
}
@GetMapping("/delColumn")
public ResponseEntity delColumn(@RequestParam("table") String tableStr,@RequestParam("family") String family, @RequestParam("col") String col){
updateHbaseService.delColumn(tableStr,family,col);
return new ResponseEntity("complete", HttpStatus.OK);
}
@GetMapping("/putVehTOHbase")
public ResponseEntity putVehTOHbase(){
everyDayScheduleService.getVehicleData(10);
return new ResponseEntity("complete", HttpStatus.OK);
}
@GetMapping("putDrivinglicense")
public ResponseEntity putDrivinglicense(){
everyDayScheduleService.getDrivinglicenseData(5);
return new ResponseEntity("complete", HttpStatus.OK);
}
/* @RequestMapping("/putDriverById")
public ResponseEntity putDriverById(@RequestParam("ids") String... ids){
updateHbaseService.insertDriverById(ids);
return new ResponseEntity("complete",HttpStatus.OK);
}*/
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.VehFlowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/veh_flow")
public class VehFlowController {
@Autowired
private VehFlowService vehFlowService;
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.SurveilService;
import com.hikcreate.update_hbase.service.VehicleService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/vehicle")
public class VehicleController {
@Autowired
private VehicleService vehicleService;
@GetMapping("/getVehicle2Hbase")
public void getVehicle2Hbase(@RequestParam("hphm") String hphm, @RequestParam("hpzl") String hpzl){
vehicleService.getVehicle2Hbase(hphm,hpzl);
}
@RequestMapping("/insertVehicle2Es")
public void insertVehicle2Es(@RequestParam("ids") String... ids) {
vehicleService.imDataOracle2EsById(ids);
}
@GetMapping("/getVmVehicle2Hbase")
public void getVmVehicle2Hbase(@RequestParam("start") String start, @RequestParam("end") String end){
vehicleService.getVmVehicle2Hbase(start,end);
}
/**
* 同步实时表所有数据
* @param day
*/
@GetMapping("/getVehicle2HbaseAllRealTime")
public void getVehicle2HbaseAllRealTime(@RequestParam("day") String day){
vehicleService.getVehicle2HbaseAllRealTime(day);
}
@GetMapping("/delVehCheck")
public ResponseEntity delVehCheck(){
try {
vehicleService.delVehCheck();
return new ResponseEntity("ok", HttpStatus.OK);
}catch (Exception e){
return new ResponseEntity("failed", HttpStatus.OK);
}
}
/**
* 将veh_out表中
* @return
*/
@GetMapping("/vehEsOutMerge")
public ResponseEntity vehEsOutMerge(){
try {
vehicleService.vehDay();
return new ResponseEntity("ok", HttpStatus.OK);
}catch (Exception e){
e.printStackTrace();
return new ResponseEntity("failed", HttpStatus.OK);
}
}
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.entity.Violation;
import com.hikcreate.update_hbase.service.SurveilService;
import com.hikcreate.update_hbase.service.VioEsService;
import com.hikcreate.update_hbase.service.ViolationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/vio")
public class VioController {
@Autowired
private VioEsService vioEsService;
@Autowired
private ViolationService violationService;
@Autowired
private SurveilService surveilService;
@Value("${thread.num}")
private int threadNum;
@GetMapping("/insert2es12")
public void getHisVioToEs(@RequestParam("start") String start,@RequestParam("end") String end,@RequestParam("table") String table){
vioEsService.getHisVioToEsByTable(start,end,table);
}
@GetMapping("/insert2es")
public void getHisVioToEsByHphm(@RequestParam("start") String start,@RequestParam("end") String end,@RequestParam("table") String table,
@RequestParam("hphm") String hphm){
vioEsService.getHisVioToEsByHphm(start,end,table,hphm);
}
@GetMapping("/vioDelHisRange")
public ResponseEntity vioDelHisRange(@RequestParam("start") String start, @RequestParam("end") String end, @RequestParam("table") String table){
vioEsService.vioDelHisRange(start,end,table);
return new ResponseEntity("complete", HttpStatus.OK);
}
@GetMapping("/testVioDay")
public ResponseEntity testVioDay(){
long s = System.currentTimeMillis();
for(int i=0;i<threadNum;i++){
int finalI = i;
new Thread(() -> {vioEsService.toEsPastDay(finalI);}).start();
}
long e = System.currentTimeMillis();
return new ResponseEntity(s+"------"+e, HttpStatus.OK);
}
@GetMapping("/delVio")
public ResponseEntity delVio(){
vioEsService.vioDelDay();
return new ResponseEntity("---okok---", HttpStatus.OK);
}
/**
* 同步小数据量表到Es的现场违法表:vio_violation,vm_vio_violation_del
* @return
*/
@GetMapping("/sysViolationSmallData")
public ResponseEntity sysViolationSmallData(){
violationService.sysViolationSmallData();
return new ResponseEntity("---okok---", HttpStatus.OK);
}
/**
* 同步小数据量表到Es的非现场违法表:vio_surveil,vm_vio_surveil_del
* @return
*/
@GetMapping("/sysSurSmallData")
public ResponseEntity sysSurSmallData(){
surveilService.sysSurSmallData();
return new ResponseEntity("---okok---", HttpStatus.OK);
}
}
package com.hikcreate.update_hbase.controller;
import com.hikcreate.update_hbase.service.SurveilService;
import com.hikcreate.update_hbase.service.ViolationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yulong shu
* @version 1.0
* @date 2021/6/4 15:47
*/
@RestController
@RequestMapping("/violation")
public class ViolationController {
@Autowired
ViolationService violationService;
@RequestMapping("/insertViolation2Es")
public void insertViolation2Es(@RequestParam("ids") String... ids) {
System.out.println("ids"+ids);
violationService.imDataOracle2EsById(ids);
}
@RequestMapping("/insertViolation2EsByDte")
public void insertViolation2EsByDte(@RequestParam("start") String start,@RequestParam("end") String end){
violationService.insertViolation2EsByDte(start, end);
};
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.AcdAll;
import com.hikcreate.update_hbase.entity.AcdHuman;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public interface AcdMapper {
@DBSource(name = "bokang")
List<AcdAll> getFileAcdPastDay(@Param("past") int past);
@DBSource(name = "bokang")
List<AcdAll> getDutySimpleAcdPastDay(@Param("past") int past);
@DBSource(name = "bokang")
List<AcdAll> getQuickAcdPastDay(@Param("past") int past);
@DBSource(name = "bokang")
List<AcdAll> getIncreAcd(@Param("GXTIME") String GXTIME);
@DBSource(name = "bokang")
@Select({
"<script>",
"select sgbh sgbh,xzqh xzqh,djbh djbh,'null' kskcsj_1,'null' jskcsj_1,xq xq,sgfssj sgfssj,lh lh,lm lm,gls gls,ms ms,'null' qdms_1,jdwz jdwz,sgdd sgdd,'null' zhdmwz_1,'null' zyglss_1,'null' dlaqsx_1,'null' jtxhfs_1,'null' fhsslx_1,'null' dlwlgl_1,'null' lmzk_1,lbqk lbqk,'null' lmjg_1,'null' lkldlx_1,'null' dlxx_1,dllx dllx,'null' kcr1_1,'null' kcr2_1,'null' bar1_1,'null' bar2_1,'null' swrs_1,'null' swrsq_1,'null' swrs24_1,'null' ssrs24_1,'null' swrs3_1,'null' ssrs3_1,'null' swrs7_1,'null' ssrs7_1,'null' swrs30_1,'null' ssrs30_1,'null' szrs_1,'null' zsrs_1,'null' qsrs_1,'null' ssrs_1,'null' jdcsl_1,'null' fjdcsl_1,'null' xrsl_1,'null' xsglbm_1,'null' xsbadw_1,'null' xsbar_1,'null' tpzs_1,'null' xctzs_1,'null' xczpzs_1,zjccss zjccss,'null' sglx_1,lwsglx lwsglx,'null' ccyyfl_1,rdyyfl rdyyfl,'null' sgccyy_1,sgrdyy sgrdyy,'null' jyaq_1,tq tq,'null' njd_1,xc xc,swsg swsg,sgxt sgxt,'null' sfty_1,cljsg cljsg,dcsg dcsg,pzfs pzfs,'null' tysgzp_1,'null' tyzpsj_1,'null' dx_1,'null' zmtj_1,tjr1 tjr1,'null' tjr2_1,'null' yzwxp_1,'null' yzwxphg_1,cclrsj cclrsj,jllx jllx,scsjd scsjd,jbr jbr,'null' tjsj_1,gxsj gxsj,sszd sszd,glxzdj glxzdj,dah dah,'null' jnh_1,'null' sxxz_1,sb sb,tjsgbh tjsgbh,glbm glbm,'null' ylzd1_1,'null' ylzd2_1,'null' ylzd3_1,'null' ylzd4_1,'null' ylzd5_1,dzzb dzzb,'null' tdyl1_1,'null' tdyl2_1,'null' tdyl3_1,'null' tdyl4_1,'null' tdyl5_1,'null' tdyl6_1,'null' tdyl7_1,'null' tdyl8_1,'null' tdyl9_1,'null' tdyl10_1,'null' balxfs_1,badw badw,'null' xyxdm_1,jyw jyw,'null' sfecsg_1,'null' sfdxsg_1,'null' dlaqyhdj_1,'null' dsrzs_1,'null' fsjg_1,'null' ffjg_1,'null' sdsgdscwz_1,'null' hlzt_1,ssrs ssrs_2,wsbh wsbh_2,sgss sgss_2,zrtjjg zrtjjg_2,jar1 jar1_2,jar2 jar2_2,jafs jafs_2,tjfs tjfs_2,'null' driver1infoid_3,'null' driver2infoid_3,'null' finishtime_3,'null' accepttime_3,'null' userid_3,'null' status_3,'null' driver1fault_3,'null' driver2fault_3,'null' driver1responsibility_3,'null' driver2responsibility_3,'null' driver3infoid_3,'null' driver3fault_3,'null' driver3responsibility_3,'null' recordtype_3,'null' customid_3,'null' kcbh_3,'null' serialnumber_3,'null' kccxm_3,'null' handlestatus_3,'null' username_3,'null' zrpduserid_3,'null' zrpdusername_3,'null' modifydate_3,'null' modifyusername_3,'null' modifyuserid_3,'null' driver1pzbw_3,'null' driver2pzbw_3,'null' driver3pzbw_3,'null' addrtype_3,'null' longitude_3,'null' latitude_3,'null' caseyjstatus_3,'null' caseyjreason_3,'2' data_source,concat('2#',sgbh) esid from ZCKJ.VM_ACD_DUTYSIMPLE_A where GXSJ<![CDATA[ > ]]>to_date(#{GXTIME}, 'YYYY-MM-DD HH24:MI:SS') order by GXSJ desc",
"</script>"
})
List<AcdAll> getIncreDutySimpleAcd(@Param("GXTIME") String updateTime);
@DBSource(name = "bokang")
@Select({
"<script>",
"select accidentinfoid sgbh,'null' xzqh,'null' djbh,'null' kskcsj_1,'null' jskcsj_1,'null' xq,accidenttime sgfssj,'null' lh,'null' lm,'null' gls,'null' ms,'null' qdms_1,'null' jdwz,accidentaddr sgdd,'null' zhdmwz_1,'null' zyglss_1,'null' dlaqsx_1,'null' jtxhfs_1,'null' fhsslx_1,'null' dlwlgl_1,'null' lmzk_1,'null' lbqk,'null' lmjg_1,'null' lkldlx_1,'null' dlxx_1,'null' dllx,'null' kcr1_1,'null' kcr2_1,'null' bar1_1,'null' bar2_1,'null' swrs_1,'null' swrsq_1,'null' swrs24_1,'null' ssrs24_1,'null' swrs3_1,'null' ssrs3_1,'null' swrs7_1,'null' ssrs7_1,'null' swrs30_1,'null' ssrs30_1,'null' szrs_1,'null' zsrs_1,'null' qsrs_1,'null' ssrs_1,'null' jdcsl_1,'null' fjdcsl_1,'null' xrsl_1,'null' xsglbm_1,'null' xsbadw_1,'null' xsbar_1,'null' tpzs_1,'null' xctzs_1,'null' xczpzs_1,'null' zjccss,'null' sglx_1,'null' lwsglx,'null' ccyyfl_1,'null' rdyyfl,'null' sgccyy_1,'null' sgrdyy,'null' jyaq_1,weather tq,'null' njd_1,'null' xc,'null' swsg,'null' sgxt,'null' sfty_1,'null' cljsg,'null' dcsg,'null' pzfs,'null' tysgzp_1,'null' tyzpsj_1,'null' dx_1,'null' zmtj_1,'null' tjr1,'null' tjr2_1,'null' yzwxp_1,'null' yzwxphg_1,'null' cclrsj,'null' jllx,'null' scsjd,'null' jbr,'null' tjsj_1,finishtime gxsj,'null' sszd,'null' glxzdj,'null' dah,'null' jnh_1,'null' sxxz_1,'null' sb,'null' tjsgbh,'null' glbm,'null' ylzd1_1,'null' ylzd2_1,'null' ylzd3_1,'null' ylzd4_1,'null' ylzd5_1,'null' dzzb,'null' tdyl1_1,'null' tdyl2_1,'null' tdyl3_1,'null' tdyl4_1,'null' tdyl5_1,'null' tdyl6_1,'null' tdyl7_1,'null' tdyl8_1,'null' tdyl9_1,'null' tdyl10_1,'null' balxfs_1,'null' badw,'null' xyxdm_1,'null' jyw,'null' sfecsg_1,'null' sfdxsg_1,'null' dlaqyhdj_1,'null' dsrzs_1,'null' fsjg_1,'null' ffjg_1,'null' sdsgdscwz_1,'null' hlzt_1,'null' ssrs_2,'null' wsbh_2,'null' sgss_2,'null' zrtjjg_2,'null' jar1_2,'null' jar2_2,'null' jafs_2,'null' tjfs_2,driver1infoid driver1infoid_3,driver2infoid driver2infoid_3,finishtime finishtime_3,accepttime accepttime_3,userid userid_3,status status_3,driver1fault driver1fault_3,driver2fault driver2fault_3,driver1responsibility driver1responsibility_3,driver2responsibility driver2responsibility_3,driver3infoid driver3infoid_3,driver3fault driver3fault_3,driver3responsibility driver3responsibility_3,recordtype recordtype_3,customid customid_3,kcbh kcbh_3,serialnumber serialnumber_3,kccxm kccxm_3,handlestatus handlestatus_3,username username_3,zrpduserid zrpduserid_3,zrpdusername zrpdusername_3,modifydate modifydate_3,modifyusername modifyusername_3,modifyuserid modifyuserid_3,driver1pzbw driver1pzbw_3,driver2pzbw driver2pzbw_3,driver3pzbw driver3pzbw_3,addrtype addrtype_3,longitude longitude_3,latitude latitude_3,caseyjstatus caseyjstatus_3,caseyjreason caseyjreason_3,'3' data_source,concat('3#',accidentinfoid) esid from ZCKJ.VM_QUICK_ACCIDENTINFO where FINISHTIME<![CDATA[ > ]]>to_date(#{GXTIME}, 'YYYY-MM-DD HH24:MI:SS') order by FINISHTIME desc",
"</script>"
})
List<AcdAll> getIncreQuickAcd(@Param("GXTIME") String updateTime);
@DBSource(name = "bokang")
List<AcdHuman> getIncreAcdHuman(@Param("past")int past);
@DBSource(name = "bokang")
List<AcdHuman> getIncreAcdDutySimpleHuman(@Param("past") int past);
@DBSource(name = "bokang")
List<AcdHuman> getIncreAcdQuickHuman(@Param("past") int past);
@DBSource(name = "bokang")
List<AcdHuman> getIncreAcdHuman20Min(@Param("GXTIME") String GXTIME);
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.*;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
@Component
public interface DmsmpMapper {
@DBSource(name = "bokang")
List<VehTempPlate> getTempPlateWarn(@Param("times") int times);
@DBSource(name = "hik")
void insertTempPlateWarn(VehTempPlate vehTempPlate);
@DBSource(name = "bokang")
List<PlateAndSyrChange> getPlateAndSyrChange(List<String> xhList);
@DBSource(name = "bokang")
String getGlbmMcByCode(@Param("glbm") String glbm);
@DBSource(name = "bokang")
String getDmsmByDmz(@Param("dmz") String dmz,@Param("xtlb") String xtlb,@Param("dmlb") String dmlb);
@DBSource(name = "hik")
void insertPlateAndSyrChange(PlateAndSyrChange plateAndSyrChange);
@DBSource(name = "bokang")
String getLastSyr(@Param("xh") String xh,@Param("lsh") String lsh);
@DBSource(name = "bokang")
List<String> getYNXh(@Param("day") int day);
@DBSource(name = "hik")
List<String> getMysqlYNXh();
@DBSource(name = "bokang")
List<VehModify> getVehModifyXhYN(@Param("xh") String xh);
@DBSource(name = "bokang")
List<CommonSyr> getHisCommonSyr();
@DBSource(name = "bokang")
List<CommonSyr> getIncrCommonSyr(@Param("day") int day);
@DBSource(name = "hik")
void insertCommonSyr(CommonSyr commonSyr);
@DBSource(name = "bokang")
List<DrvSfzmhmXmChange> getHisDrvNameCardChange(List<String> dabhList);
@DBSource(name = "bokang")
List<String> getHisDrvNameCardChangeDabh();
@DBSource(name = "hik")
void insertDrvSfzmhmXmChange(DrvSfzmhmXmChange drvSfzmhmXmChange);
@DBSource(name = "hik")
List<CerticateQuit> getAllCerticateQuit();
@DBSource(name = "hik")
void saveCerticateQuit(CerticateQuit certicateQuit);
@DBSource(name = "bokang")
List<String> getPlateAndSyrChangeXh();
@DBSource(name = "bokang")
List<CerticateQuit> getIncrCerticateQuit(int day);
@DBSource(name = "bokang")
List<SyrPlateChangeMany> hisSyrPlateChangeMany();
@DBSource(name = "hik")
void insertSyrPlateChangeMany(SyrPlateChangeMany syrPlateChangeMany);
@DBSource(name = "hik")
void delSyrPlateChangeManyBySyr(String id_);
@DBSource(name = "hik")
List<SyrPlateChangeMany> getAllSyrPlateChangeMany();
@DBSource(name = "bokang")
List<MortgageCancel> getHisMortgageCancel();
@DBSource(name = "hik")
void insertMortgageCancel(MortgageCancel mortgageCancel);
@DBSource(name = "bokang")
VehLog getNextVehLog4h(@Param("xh") String xh,@Param("ywlx") String ywlx,@Param("ywyy") String ywyy,
@Param("bjrq") Date bjrq,@Param("jbr") String jbr);
@DBSource(name = "bokang")
Integer getCountNextVehLog4h(@Param("xh") String xh,@Param("ywlx") String ywlx,@Param("ywyy") String ywyy,
@Param("bjrq") Date bjrq,@Param("jbr") String jbr);
@DBSource(name = "hik")
void insertRetireDealAgain(RetireDealAgain retireDealAgain);
@DBSource(name = "hik")
void insertDeformedproposer(Deformedproposer deformedproposer);
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.Driver;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public interface DriverHBMapper {
@DBSource(name = "bokang")
List<Driver> getDriverById(@Param("ids") List<String> ids);
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.entity.DrvFlow;
import org.springframework.stereotype.Component;
@Component
public interface DrvFlowMapper {
DrvFlow getByLsh(String lsh);
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.Surveil;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public interface SurveilMapper {
@DBSource(name = "bokang")
List<Surveil> getVmSurveil(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Surveil> getSurveil(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Surveil> getVmSurveilHis(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Surveil> getVmSurveilDel(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Surveil> getSurveilDelAll();
@DBSource(name="bokang")
List<Surveil> getSurveilById(@Param("ids") List<String> ids);
@DBSource(name="bokang")
List<Surveil> insertSurveilBatch(@Param("day") String day);
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.IndexHtable;
import com.hikcreate.update_hbase.entity.VehFlow;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public interface VehFlowMapper {
@DBSource(name = "bokang")
List<VehFlow> getVmPast(@Param("day") int day);
@DBSource(name = "bokang")
List<VehFlow> getTablePast(@Param("day") int day);
@DBSource(name = "bokang")
VehFlow getByLsh(@Param("lsh") String lsh);
@DBSource(name = "bokang")
List<VehFlow> getRetireFlow();
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.*;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
@Component
public interface VehicleMapper {
Vehicle getVehicle2Hbase(String hphm, String hpzl);
List<Vehicle> getVehicle2HbaseAllRealTime(String day);
@DBSource(name = "bokang")
Set<String> getAllPlateNum();
@DBSource(name = "bokang")
List<VehicleEs> getVmVeh(@Param("day") int day);
@DBSource(name = "bokang")
List<Vehicle> getVmVehicle2Hbase(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<VehicleEs> getVeh(@Param("day") int day);
@DBSource(name = "bokang")
Vehicle getVehByXh(@Param("xh") String xh);
@DBSource(name = "bokang")
VehOut getClshdhByXhFromVehOut(@Param("xh") String xh);
@DBSource(name = "bokang")
VehLog getVehLogByLshFrist(@Param("id")String id_);
@DBSource(name = "bokang")
List<Deformedproposer> getHisDeformedCar();
@DBSource(name = "bokang")
List<Deformedproposer> getDeformedCar(int day);
@DBSource(name="bokang")
List<VehicleEs> getVehicleById(@Param("ids") List<String> ids);
}
package com.hikcreate.update_hbase.dao;
import com.hikcreate.update_hbase.annotation.DBSource;
import com.hikcreate.update_hbase.entity.Surveil;
import com.hikcreate.update_hbase.entity.Violation;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public interface ViolationMapper {
@DBSource(name = "bokang")
List<Violation> getVmViolation(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Violation> getViolation(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Violation> getVmViolationHis(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Violation> getVmViolationDel(@Param("start") String start,@Param("end") String end);
@DBSource(name = "bokang")
List<Violation> getViolationDelAll();
@DBSource(name="bokang")
List<Violation> getViolationById(@Param("ids") List<String> ids);
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 制证后退办
*/
@Data
public class CerticateQuit {
String id_;
String process_id; //
String insert_date; //
String glbm;
String glbm_value; //
String clxh;
String sfzmhm; //
String syr;
String hphm;
String hpzl;
String ywlx;
Date zzrq;
String xh;
Date tbrq;
String jbr;
String lsh;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 共同所有人预警
*/
@Data
public class CommonSyr {
String id_;
String process_id; //
String insert_date; //
String glbm; //
String glbm_value; //
String xh;
String fzjg;
String hpzl;
String hphm;
String clsbdh; //
String clxh; //
String cllx; //
String cllx_value; //
String syxz; //
String syxz_value; //
String sfzmhm;
String sfzmmc;
String syr;
String syq;
Date ccdjrq; //
Date bgrq;
String bgnr;
String bgqcs;
String xcs;
String lsh;
String jbr;
Integer cs;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 残疾人车辆
*/
@Data
public class Deformedproposer {
String deformedproposerid;
String proposername;
String proposeraddress;
String idcard;
String drivecartype;
String deformedcard;
String linktel;
String deformedtype;
String deformedtypeother;
String socialsecuritypay;
Date proposaldate;
String proposalno;
String auditingstatus;
Date auditingdate;
String auditingusername;
String auditingremark;
Date createdate;
String createuserid;
String status;
Date modifydate;
String modifyuserid;
String approvestatus;
Date approvedate;
String approveremark;
String approveuserid;
String approveusername;
String issuedstatus;
Date issueddate;
String issuedremark;
String issueduserid;
String issuedusername;
String identifycode;
String carcode;
String firstauditingstatus;
Date firstauditingdate;
String firstauditingremark;
String firstauditinguserid;
String firstauditingusername;
String whetherpda;
}
package com.hikcreate.update_hbase.entity;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
@Data
public class Driver implements Serializable {
private static final long serialVersionUID = 7198721568676225642L;
@JSONField(name="info:dabh")
String dabh;
@JSONField(name="info:sfzmhm")
String sfzmhm;
@JSONField(name="info:zjcx")
String zjcx;
@JSONField(name="info:yzjcx")
String yzjcx;
@JSONField(name="info:qfrq")
String qfrq;
@JSONField(name="info:syrq")
String syrq;
@JSONField(name="info:cclzrq")
String cclzrq;
@JSONField(name="info:ccfzjg")
String ccfzjg;
@JSONField(name="info:jzqx")
String jzqx;
@JSONField(name="info:yxqs")
String yxqs;
@JSONField(name="info:yxqz")
String yxqz;
@JSONField(name="info:ljjf")
String ljjf;
@JSONField(name="info:cfrq")
String cfrq;
@JSONField(name="info:xxtzrq")
String xxtzrq;
@JSONField(name="info:bzcs")
String bzcs;
@JSONField(name="info:zt")
String zt;
@JSONField(name="info:ly")
String ly;
@JSONField(name="info:jxmc")
String jxmc;
@JSONField(name="info:jly")
String jly;
@JSONField(name="info:xzqh")
String xzqh;
@JSONField(name="info:xzqj")
String xzqj;
@JSONField(name="info:fzrq")
String fzrq;
@JSONField(name="info:jbr")
String jbr;
@JSONField(name="info:glbm")
String glbm;
@JSONField(name="info:fzjg")
String fzjg;
@JSONField(name="info:gxsj")
String gxsj;
@JSONField(name="info:lsh")
String lsh;
@JSONField(name="info:xgzl")
String xgzl;
@JSONField(name="info:bz")
String bz;
@JSONField(name="info:jyw")
String jyw;
@JSONField(name="info:ydabh")
String ydabh;
@JSONField(name="info:sqdm")
String sqdm;
@JSONField(name="info:zxbh")
String zxbh;
@JSONField(name="info:xh")
String xh;
@JSONField(name="info:sfzmmc")
String sfzmmc;
@JSONField(name="info:hmcd")
String hmcd;
@JSONField(name="info:xm")
String xm;
@JSONField(name="info:xb")
String xb;
@JSONField(name="info:csrq")
String csrq;
@JSONField(name="info:gj")
String gj;
@JSONField(name="info:djzsxzqh")
String djzsxzqh;
@JSONField(name="info:djzsxxdz")
String djzsxxdz;
@JSONField(name="info:lxzsxzqh")
String lxzsxzqh;
@JSONField(name="info:lxzsxxdz")
String lxzsxxdz;
@JSONField(name="info:lxzsyzbm")
String lxzsyzbm;
@JSONField(name="info:lxdh")
String lxdh;
@JSONField(name="info:sjhm")
String sjhm;
@JSONField(name="info:dzyx")
String dzyx;
@JSONField(name="info:zzzm")
String zzzm;
@JSONField(name="info:zzfzjg")
String zzfzjg;
@JSONField(name="info:zzfzrq")
String zzfzrq;
@JSONField(name="info:sfbd")
String sfbd;
@JSONField(name="info:dwbh")
String dwbh;
@JSONField(name="info:syyxqz")
String syyxqz;
@JSONField(name="info:xczg")
String xczg;
@JSONField(name="info:xczjcx")
String xczjcx;
@JSONField(name="info:ryzt")
String ryzt;
@JSONField(name="info:jzqx_value")
String jzqx_value;
@JSONField(name="info:zt_value")
String zt_value;
@JSONField(name="info:ly_value")
String ly_value;
@JSONField(name="info:jxmc_value")
String jxmc_value;
@JSONField(name="info:xzqh_value")
String xzqh_value;
@JSONField(name="info:xzqj_value")
String xzqj_value;
@JSONField(name="info:glbm_value")
String glbm_value;
@JSONField(name="info:xgzl_value")
String xgzl_value;
@JSONField(name="info:sfzmmc_value")
String sfzmmc_value;
@JSONField(name="info:hmcd_value")
String hmcd_value;
@JSONField(name="info:xb_value")
String xb_value;
@JSONField(name="info:gj_value")
String gj_value;
@JSONField(name="info:djzsxzqh_value")
String djzsxzqh_value;
@JSONField(name="info:lxzsxzqh_value")
String lxzsxzqh_value;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
@Data
public class DrvChangeBgnr {
String item;
String before;
String after;
}
package com.hikcreate.update_hbase.entity;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
@Data
@Document(indexName = "es_drv_flow", type = "es_drv_flow_doc")
public class DrvFlow implements Serializable {
private static final long serialVersionUID = -5599425880598899728L;
@JSONField(name = "lsh")
@Field(type = FieldType.Text)
private String lsh;
@JSONField(name = "sfzmhm")
@Field(type = FieldType.Text)
private String sfzmhm;
@JSONField(name = "dabh")
@Field(type = FieldType.Text)
private String dabh;
@JSONField(name = "xm")
@Field(type = FieldType.Text)
private String xm;
@JSONField(name = "ywlx")
@Field(type = FieldType.Text)
private String ywlx;
@JSONField(name = "ywyy")
@Field(type = FieldType.Text)
private String ywyy;
@JSONField(name = "kssj")
@Field(type = FieldType.Text)
private String kssj;
@JSONField(name = "jssj")
@Field(type = FieldType.Text)
private String jssj;
@JSONField(name = "ywgw")
@Field(type = FieldType.Text)
private String ywgw;
@JSONField(name = "kskm")
@Field(type = FieldType.Text)
private String kskm;
@JSONField(name = "xygw")
@Field(type = FieldType.Text)
private String xygw;
@JSONField(name = "glbm")
@Field(type = FieldType.Text)
private String glbm;
@JSONField(name = "ffbz")
@Field(type = FieldType.Text)
private String ffbz;
@JSONField(name = "rkbz")
@Field(type = FieldType.Text)
private String rkbz;
@JSONField(name = "hdbz")
@Field(type = FieldType.Text)
private String hdbz;
@JSONField(name = "xgzl")
@Field(type = FieldType.Text)
private String xgzl;
@JSONField(name = "zjcx")
@Field(type = FieldType.Text)
private String zjcx;
@JSONField(name = "ywzt")
@Field(type = FieldType.Text)
private String ywzt;
@JSONField(name = "ywblbm")
@Field(type = FieldType.Text)
private String ywblbm;
@JSONField(name = "fzjg")
@Field(type = FieldType.Text)
private String fzjg;
@JSONField(name = "dcbj")
@Field(type = FieldType.Text)
private String dcbj;
@JSONField(name = "gxsj")
@Field(type = FieldType.Text)
private String gxsj;
@Id
private String id;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 驾驶人信息变更:姓名与身份证号同时变更,同时不要求同一时刻
*/
@Data
public class DrvSfzmhmXmChange {
String id_;
String process_id; //
String insert_date; //
String glbm; //
String glbm_value;//
String fzjg;
String dabh;
String xm;
String sfzmmc;
String sfzmhm;
String lsh;
String jbr;
String bgnr;
Date bgrq;
String scjbr;
String scbgnr;
Date scbgrq;
String ywlx;
}
package com.hikcreate.update_hbase.entity;
import org.springframework.stereotype.Component;
@Component
public class IndexHtable {
private String rowkey;
private String col;
private String value;
public IndexHtable(String rowkey, String col, String value) {
this.rowkey = rowkey;
this.col = col;
this.value = value;
}
public IndexHtable() {
}
@Override
public String toString() {
return "IndexHtable{" +
"rowkey='" + rowkey + '\'' +
", col='" + col + '\'' +
", value='" + value + '\'' +
'}';
}
public String getRowkey() {
return rowkey;
}
public void setRowkey(String rowkey) {
this.rowkey = rowkey;
}
public String getCol() {
return col;
}
public void setCol(String col) {
this.col = col;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
package com.hikcreate.update_hbase.entity;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
@Data
public class KafkaJson implements Serializable {
private static final long serialVersionUID = 1541597316430548824L;
@JSONField(name="row")
String row;
@JSONField(name="row_index")
String row_index;
@JSONField(name="data")
Vehicle data;
}
package com.hikcreate.update_hbase.entity;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
@Data
public class KafkaJsonConNewAndOld implements Serializable {
@JSONField(name = "row")
String row;
@JSONField(name = "row_index")
String row_index;
@JSONField(name = "new_data")
JSONObject new_data;
@JSONField(name = "old_data")
JSONObject old_data;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
@Data
public class MortgageCancel {
String id_;
String process_id;
String insert_date; //
String glbm;
String glbm_value; //
String sqlx;
String xh;
String clsbdh;
String clxh;
String sfzmhm; //
String jbr; //
String syr;
Integer cs;
Date scblsj;
String yjbs;
Date blsj;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 所有人和车牌号都改变预警
*/
@Data
public class PlateAndSyrChange {
String id_;
String process_id; //
String glbm; //
String xh;
Date blsj;
String fzjg;
String hpzl;
String hphm;
String clsbdh; //
String clxh; //
String cllx; //
String syxz; //
String sfzmhm;
String sfzmmc;
String syr; //
String syq;
String ccdjrq; //
Date bgrq;
String bgnr;
String bgqnr;
String xnr;
String lsh;
String jbr;
Date scbgrq;
String scbgnr;
String scbgqnr;
String scxnr;
String sclsh;
String scjbr;
String glbm_value; //
String cllx_value; //
String syxz_value; //
String sfzmmc_value; //
String insert_date; //
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
@Data
public class RetireDealAgain {
String id_ ;
String process_id;
String insert_date;
String glbm;
String glbm_value;
String sqlx;
String clxh;
String sfzmhm;
String syr;
Integer cs;
Date blsj;
String jbr;
String bjzt;
Date scblsj;
String scjbr;
String scbjzt;
String xh;
String lsh;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
@Data
@Document(indexName = "surveil", type = "doc")
public class Surveil {
@Id
private String xh;
private String bz;
private String cjfs;
private String cjjg;
private String cjjgmc;
private String clbj;
private String cldxbj;
private String clfl;
private String cljg;
private String cljgmc;
private String clpp;
private String clsbdh;
private String clsj;
private String clyt;
private String csys;
private String ddms;
private String dh;
private String dkbj;
private String dllx;
private String dsr;
private String esid;
private String fdjh;
private String fsjg;
private String fzjg;
private String glxzdj;
private String gxsj;
private String hphm;
private String hpzl;
private String jbr;
private String jdcsyr;
private String jdsbh;
private String jdslb;
private String jkbj;
private String jkfs;
private String jkrq;
private String jllx;
private String jsjg;
private String jtfs;
private String lddm;
private String lrr;
private String lrsj;
private String lxfs;
private String pzbh;
private String qzclbj;
private String sbbh;
private String spdz;
private String syxz;
private String type;
private String tzbj;
private String tzrq;
private String tzsh;
private String wfbh;
private String wfdd;
private String wfdz;
private String wfsj;
private String wfxw;
private String wsjyw;
private String xcfw;
private String xrms;
private String xzqh;
private String ydclbj;
private String ylzz1;
private String ylzz12;
private String ylzz13;
private String ylzz14;
private String ylzz15;
private String ylzz16;
private String ylzz17;
private String ylzz18;
private String ylzz2;
private String ylzz4;
private String ylzz5;
private String ylzz6;
private String ylzz7;
private String ylzz8;
private String zdbj;
private String zdjlbj;
private String zqmj;
private String zsxxdz;
private String zsxzqh;
private long bzz;
private long fkje;
private long scz;
private long update_time;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 同一所有人多次变更号牌
*/
@Data
public class SyrPlateChangeMany {
String id_; //
String process_id;
String insert_date;
String glbm;
String glbm_value; //
String sqlx;
String xh;
String clsbdh;
String clxh;
String sfzmhm; //
String syr;
Date bgrq;
Integer bgcs;
String bgqnr;
String xnr;
String jbr;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
@Data
@Document(indexName = "Vehicle_gy", type = "doc")
public class VehEs {
@Id
private String xh;
private String hpzl;
private String hphm;
private String clpp1;
private String clxh;
private String clpp2;
private String gcjk;
private String zzg;
private String zzcmc;
private String clsbdh;
private String fdjh;
private String cllx;
private String csys;
private String syxz;
private String sfzmhm;
private String sfzmmc;
private String syr;
private String syq;
private String ccdjrq;
private String djrq;
private String yxqz;
private String qzbfqz;
private String fzjg;
private String glbm;
private String fprq;
private String fzrq;
private String fdjrq;
private String fhgzrq;
private String bxzzrq;
private String bpcs;
private String bzcs;
private String bdjcs;
private String djzsbh;
private String zdjzshs;
private String dabh;
private String xzqh;
private String zt;
private String dybj;
private String jbr;
private String clly;
private String lsh;
private String fdjxh;
private String rlzl;
private String pl;
private String gl;
private String zxxs;
private String cwkc;
private String cwkk;
private String cwkg;
private String hxnbcd;
private String hxnbkd;
private String hxnbgd;
private String gbthps;
private String zs;
private String zj;
private String qlj;
private String hlj;
private String ltgg;
private String lts;
private String zzl;
private String zbzl;
private String hdzzl;
private String hdzk;
private String zqyzl;
private String qpzk;
private String hpzk;
private String hbdbqk;
private String ccrq;
private String hdfs;
private String llpz1;
private String pzbh1;
private String llpz2;
private String pzbh2;
private String xsdw;
private String xsjg;
private String xsrq;
private String jkpz;
private String jkpzhm;
private String hgzbh;
private String nszm;
private String nszmbh;
private String gxrq;
private String xgzl;
private String qmbh;
private String hmbh;
private String bz;
private String jyw;
private String zsxzqh;
private String zsxxdz;
private String yzbm1;
private String lxdh;
private String zzz;
private String zzxzqh;
private String zzxxdz;
private String yzbm2;
private String zdyzt;
private String yxh;
private String cyry;
private String dphgzbh;
private String sqdm;
private String clyt;
private String ytsx;
private String dzyx;
private String xszbh;
private String sjhm;
private String jyhgbzbh;
private String dwbh;
private String syqsrq;
private String yqjyqzbfqz;
private String yqjyqz2;
private String fdjgs;
private String sfyzhgn;
private String zzjglx;
private String wxmbc;
private String ncdqsy;
private String hpqysj;
private String dzbsxlh;
private String sfxny;
private String xnyzl;
private String qddjxh;
private String qddjh;
private String qddjgl;
private String cnzzzl;
private String cnzzxs;
private String cnzzzdl;
private String cnzzzdy;
private String cnzzdtdy;
private String hdwjcd;
private String cdqdxslcgk;
private String del_flag;
private String type;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import java.util.Date;
@Data
@Document(indexName = "veh_flow",type = "_doc")
public class VehFlow {
private Date bjrq;
private String cllx;
private String clpp1;
private String clsbdh;
private String clxh;
private String ffbj;
private String fpbj;
private String glbm;
private Date gxrq;
private String hphm;
private String hpzl;
@Id
private String lsh;
private String lszt;
private String ly;
private String rkbj;
private Date sqrq;
private String syr;
private String type;
private long update_time;
private String xh;
private String xygw;
private String xzqh;
private String ywlc;
private String ywlx;
private String ywyy;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
@Data
public class VehLog {
String lsh;
Date clrq;
String ywlx;
String ywyy;
String hpzl;
String hphm;
String xh;
String ywgw;
String czgw;
String jbr;
String glbm;
String bz;
String ip;
String ly;
String ywlybz;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import javax.swing.*;
import java.util.Date;
/**
* 机动车变更历史记录信息表
*/
@Data
public class VehModify {
String xh;
String yhpzl;
String yhphm;
String bgnr;
String bgqcs;
String xcs;
String xgzl;
Date bgrq;
String jbr;
String fzjg;
String bz;
String lsh;
String llpz1;
String pzbh1;
String llpz2;
String pzbh2;
Date gxrq;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
@Data
public class VehOut {
String xh;
String hpzl;
String hphm;
String clsbdh;
String fdjh;
String zylx;
String syr;
String sfzmmc;
String sfzmhm;
String zrd;
String hdfs;
Date djrq;
String xsdw;
Long xsjg;
String jbr;
String fzjg;
String xgzl;
String lsh;
String xxdz;
String yzbm;
String lxdh;
String llpz1;
String pzbh1;
String llpz2;
String pzbh2;
Date gxrq;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import java.util.Date;
/**
* 临时号牌
*/
@Data
public class VehTempPlate {
String hphm;
Date djrq;
String sqlx;
String syr;
String sfzmhm;
String clpp1;
String clxh;
String cllx;
String fdjh;
String clsbdh;
Integer hdzzl;
Integer hdzk;
String xsqd;
String xszd;
Integer yxq;
String xh;
String jbr;
String fzjg;
String lsh;
String bz;
String xxdz;
String yzbm;
String lxdh;
String bxgs;
Date sxrq;
Date zzrq;
String bxpzh;
String sfzmmc;
String xstjd;
Date yxqz;
String glbm;
String lszxlh;
String hplx;
String yhpzl;
String yhphm;
String hgzbh;
String jkpz;
String jkpzhm;
Date gxrq;
Integer cs;
}
package com.hikcreate.update_hbase.entity;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
/**
* @author lifuyi
* @date ${Date}
* @description
*/
@Data
public class Vehicle implements Serializable{
private static final long serialVersionUID = 2182693969801815163L;
@JSONField(name="info:xh")
String xh;
@JSONField(name="info:hpzl")
String hpzl;
@JSONField(name="info:hphm")
String hphm;
@JSONField(name="info:clpp1")
String clpp1;
@JSONField(name="info:clxh")
String clxh;
@JSONField(name="info:clpp2")
String clpp2;
@JSONField(name="info:gcjk")
String gcjk;
@JSONField(name="info:zzg")
String zzg;
@JSONField(name="info:zzcmc")
String zzcmc;
@JSONField(name="info:clsbdh")
String clsbdh;
@JSONField(name="info:fdjh")
String fdjh;
@JSONField(name="info:cllx")
String cllx;
@JSONField(name="info:csys")
String csys;
@JSONField(name="info:syxz")
String syxz;
@JSONField(name="info:sfzmhm")
String sfzmhm;
@JSONField(name="info:sfzmmc")
String sfzmmc;
@JSONField(name="info:syr")
String syr;
@JSONField(name="info:syq")
String syq;
@JSONField(name="info:ccdjrq")
String ccdjrq;
@JSONField(name="info:djrq")
String djrq;
@JSONField(name="info:yxqz")
String yxqz;
@JSONField(name="info:qzbfqz")
String qzbfqz;
@JSONField(name="info:fzjg")
String fzjg;
@JSONField(name="info:glbm")
String glbm;
@JSONField(name="info:fprq")
String fprq;
@JSONField(name="info:fzrq")
String fzrq;
@JSONField(name="info:fdjrq")
String fdjrq;
@JSONField(name="info:fhgzrq")
String fhgzrq;
@JSONField(name="info:bxzzrq")
String bxzzrq;
@JSONField(name="info:bpcs")
String bpcs;
@JSONField(name="info:bzcs")
String bzcs;
@JSONField(name="info:bdjcs")
String bdjcs;
@JSONField(name="info:djzsbh")
String djzsbh;
@JSONField(name="info:zdjzshs")
String zdjzshs;
@JSONField(name="info:dabh")
String dabh;
@JSONField(name="info:xzqh")
String xzqh;
@JSONField(name="info:zt")
String zt;
@JSONField(name="info:dybj")
String dybj;
@JSONField(name="info:jbr")
String jbr;
@JSONField(name="info:clly")
String clly;
@JSONField(name="info:lsh")
String lsh;
@JSONField(name="info:fdjxh")
String fdjxh;
@JSONField(name="info:rlzl")
String rlzl;
@JSONField(name="info:pl")
String pl;
@JSONField(name="info:gl")
String gl;
@JSONField(name="info:zxxs")
String zxxs;
@JSONField(name="info:cwkc")
String cwkc;
@JSONField(name="info:cwkk")
String cwkk;
@JSONField(name="info:cwkg")
String cwkg;
@JSONField(name="info:hxnbcd")
String hxnbcd;
@JSONField(name="info:hxnbkd")
String hxnbkd;
@JSONField(name="info:hxnbgd")
String hxnbgd;
@JSONField(name="info:gbthps")
String gbthps;
@JSONField(name="info:zs")
String zs;
@JSONField(name="info:zj")
String zj;
@JSONField(name="info:qlj")
String qlj;
@JSONField(name="info:hlj")
String hlj;
@JSONField(name="info:ltgg")
String ltgg;
@JSONField(name="info:lts")
String lts;
@JSONField(name="info:zzl")
String zzl;
@JSONField(name="info:zbzl")
String zbzl;
@JSONField(name="info:hdzzl")
String hdzzl;
@JSONField(name="info:hdzk")
String hdzk;
@JSONField(name="info:zqyzl")
String zqyzl;
@JSONField(name="info:qpzk")
String qpzk;
@JSONField(name="info:hpzk")
String hpzk;
@JSONField(name="info:hbdbqk")
String hbdbqk;
@JSONField(name="info:ccrq")
String ccrq;
@JSONField(name="info:hdfs")
String hdfs;
@JSONField(name="info:llpz1")
String llpz1;
@JSONField(name="info:pzbh1")
String pzbh1;
@JSONField(name="info:llpz2")
String llpz2;
@JSONField(name="info:pzbh2")
String pzbh2;
@JSONField(name="info:xsdw")
String xsdw;
@JSONField(name="info:xsjg")
String xsjg;
@JSONField(name="info:xsrq")
String xsrq;
@JSONField(name="info:jkpz")
String jkpz;
@JSONField(name="info:jkpzhm")
String jkpzhm;
@JSONField(name="info:hgzbh")
String hgzbh;
@JSONField(name="info:nszm")
String nszm;
@JSONField(name="info:nszmbh")
String nszmbh;
@JSONField(name="info:gxrq")
String gxrq;
@JSONField(name="info:xgzl")
String xgzl;
@JSONField(name="info:qmbh")
String qmbh;
@JSONField(name="info:hmbh")
String hmbh;
@JSONField(name="info:bz")
String bz;
@JSONField(name="info:jyw")
String jyw;
@JSONField(name="info:zsxzqh")
String zsxzqh;
@JSONField(name="info:zsxxdz")
String zsxxdz;
@JSONField(name="info:yzbm1")
String yzbm1;
@JSONField(name="info:lxdh")
String lxdh;
@JSONField(name="info:zzz")
String zzz;
@JSONField(name="info:zzxzqh")
String zzxzqh;
@JSONField(name="info:zzxxdz")
String zzxxdz;
@JSONField(name="info:yzbm2")
String yzbm2;
@JSONField(name="info:zdyzt")
String zdyzt;
@JSONField(name="info:yxh")
String yxh;
@JSONField(name="info:cyry")
String cyry;
@JSONField(name="info:dphgzbh")
String dphgzbh;
@JSONField(name="info:sqdm")
String sqdm;
@JSONField(name="info:clyt")
String clyt;
@JSONField(name="info:ytsx")
String ytsx;
@JSONField(name="info:dzyx")
String dzyx;
@JSONField(name="info:xszbh")
String xszbh;
@JSONField(name="info:sjhm")
String sjhm;
@JSONField(name="info:jyhgbzbh")
String jyhgbzbh;
@JSONField(name="info:dwbh")
String dwbh;
@JSONField(name="info:syqsrq")
String syqsrq;
@JSONField(name="info:yqjyqzbfqz")
String yqjyqzbfqz;
@JSONField(name="info:yqjyqz2")
String yqjyqz2;
@JSONField(name="info:fdjgs")
String fdjgs;
@JSONField(name="info:sfyzhgn")
String sfyzhgn;
@JSONField(name="info:zzjglx")
String zzjglx;
@JSONField(name="info:wxmbc")
String wxmbc;
@JSONField(name="info:ncdqsy")
String ncdqsy;
@JSONField(name="info:hpqysj")
String hpqysj;
@JSONField(name="info:dzbsxlh")
String dzbsxlh;
@JSONField(name="info:sfxny")
String sfxny;
@JSONField(name="info:xnyzl")
String xnyzl;
@JSONField(name="info:qddjxh")
String qddjxh;
@JSONField(name="info:qddjh")
String qddjh;
@JSONField(name="info:qddjgl")
String qddjgl;
@JSONField(name="info:cnzzzl")
String cnzzzl;
@JSONField(name="info:cnzzxs")
String cnzzxs;
@JSONField(name="info:cnzzzdl")
String cnzzzdl;
@JSONField(name="info:cnzzzdy")
String cnzzzdy;
@JSONField(name="info:cnzzdtdy")
String cnzzdtdy;
@JSONField(name="info:hdwjcd")
String hdwjcd;
@JSONField(name="info:cdqdxslcgk")
String cdqdxslcgk;
@JSONField(name="info:cdqdxslcds")
String cdqdxslcds;
}
package com.hikcreate.update_hbase.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
@Data
@Document(indexName = "violation", type = "doc")
public class Violation {
@Id
private String wfbh;
private String bz;
private String cclzrq;
private String cfzl;
private String cjfs;
private String clbj;
private String cldxbj;
private String clfl;
private String cljg;
private String cljgmc;
private String clsj;
private String clyt;
private String dabh;
private String ddms;
private String ddms1;
private String dh;
private String dkbj;
private String dllx;
private String dsr;
private String esid;
private String fsjg;
private String fxjg;
private String fxjgmc;
private String fzjg;
private String glxzdj;
private String gxsj;
private String hcbj;
private String hphm;
private String hpzl;
private String jbr1;
private String jbr2;
private String jd;
private String jdccldxbj;
private String jdcsyr;
private String jdsbh;
private String jdslb;
private String jkbj;
private String jkfs;
private String jkrq;
private String jllx;
private String jmznjbj;
private String jsjg;
private String jsjqbj;
private String jsrxz;
private String jszh;
private String jtfs;
private String lddm;
private String lddm1;
private String lrr;
private String lrsj;
private String lxfs;
private String nl;
private String pzbh;
private String ryfl;
private String sgdj;
private String syxz;
private String type;
private String wd;
private String wfdd;
private String wfdd1;
private String wfdz;
private String wfsj;
private String wfsj1;
private String wfxw;
private String wsjyw;
private String xb;
private String xcfw;
private String xrms;
private String xxly;
private String xyxm;
private String xzqh;
private String ylzz1;
private String ylzz11;
private String ylzz12;
private String ylzz13;
private String ylzz14;
private String ylzz15;
private String ylzz16;
private String ylzz17;
private String ylzz18;
private String ylzz2;
private String ylzz3;
private String ylzz4;
private String ylzz5;
private String ylzz6;
private String ylzz7;
private String ylzz8;
private String ywjyw;
private String zdbj;
private String zdjlbj;
private String zjcx;
private String zjmc;
private String zqmj;
private String zsxxdz;
private String zsxzqh;
private long bzz;
private long fkje;
private long scz;
private long update_time;
private long wfjfs;
private long znj;
}
package com.hikcreate.update_hbase.entity.test;
import lombok.Data;
import java.util.Date;
@Data
public class PlateNumHis {
private String plateNum;
private String plateType;
private Date startDate;
private Date endDate;
}
package com.hikcreate.update_hbase.entity.test;
import lombok.Data;
/**
* @author MOUBK
* @create 2019/7/5 15:01
*/
@Data
public class ViolationRes {
// 1代表此字段VIO_VIOLATION中有,2代表此字段VIO_SURVEIL中有
// rowkey
private String id;
// 1 2 违法编号
private String wfbh;
//1 2 决定书类别
private String jdslb;
//1 2 决定书编号
private String jdsbh;
//1 2 文书校验位
private String wsjyw;
//1 #N/A 人员分类
private String ryfl;
//1 #N/A 驾驶证号
private String jszh;
//1 #N/A 档案编号
private String dabh;
//1 2 发证机关
private String fzjg;
//1 #N/A 准驾车型
private String zjcx;
//1 2 当事人
private String dsr;
//1 2 住所行政区划
private String zsxzqh;
//1 2 住所详细地址
private String zsxxdz;
//1 2 电话
private String dh;
//1 2 联系方式
private String lxfs;
//1 2 车辆分类
private String clfl;
//1 2 号牌种类
private String hpzl;
//1 2 号牌号码
private String hphm;
//1 2 机动车所有人
private String jdcsyr;
//1 2 机动车使用性质
private String syxz;
//1 2 交通方式
private String jtfs;
//1 2 违法时间
private String wfsj;
//1 2 行政区划
private String xzqh;
//1 2 道路类型 DMLB = 3124
private String dllx;
//1 2 公路行政等级 DMLB3116
private String glxzdj;
//1 2 违法地点
private String wfdd;
//1 2 路口路段代码 当为城市道路时存放路口号,为高速、省道等时存放公里数
private String lddm;
//1 2 地点米数
private String ddms;
//1 2 地点绝对位置
private String ddjdwz;
//1 2 违法地址
private String wfdz;
//1 2 违法行为
private String wfxw;
//1 #N/A 违法记分数
private String wfjfs;
//1 2 罚款金额
private String fkje;
//1 2 实测值
private String scz;
//1 2 标准值
private String bzz;
//1 #N/A 滞纳金
private String znj;
//1 2 执勤民警
private String zqmj;
//1 2 交款方式
private String jkfs;
//1 #N/A 发现机关
private String fxjg;
//1 #N/A 发现机关名称
private String fxjgmc;
//1 2 处理机关
private String cljg;
//1 2 处理机关名称
private String cljgmc;
//1 #N/A 处罚种类
private String cfzl;
//1 2 处理时间
private String clsj;
//1 2 交款标记
private String jkbj;
//1 2 交款日期
private String jkrq;
//1 2 强制措施凭证号
private String pzbh;
//1 #N/A 拒收拒签标记
private String jsjqbj;
//1 2 记录类型
private String jllx;
//1 2 录入人
private String lrr;
//1 2 录入时间
private String lrsj;
//1 #N/A 经办人1
private String jbr1;
//1 #N/A 经办人2
private String jbr2;
//1 #N/A 事故等级 code61
private String sgdj;
//1 2 处理对象标记 0-本地;1-本省外地市;2-外省
private String cldxbj;
//1 #N/A 机动车处理对象标记
private String jdccldxbj;
//1 2 转递记录标记 0-本地;1-异地转入
private String zdjlbj;
//1 #N/A 信息来源 1-现场处罚,2-非现场处罚
private String xxly;
//1 2 写入模式 1-系统写入;2-接口写入;3-内部其他系统写入
private String xrms;
//1 2 导库标记
private String dkbj;
//1 #N/A 减免滞纳金标记 code64
private String jmznjbj;
//1 2 转递标记
private String zdbj;
//1 2 接受机关
private String jsjg;
//1 2 发送机关
private String fsjg;
//1 2 更新时间
private String gxsj;
//1 2 备注
private String bz;
//1 2 校验位
private String ywjyw;
//1 #N/A 证件名称
private String zjmc;
//1 #N/A 初次领证日期
private String cclzrq;
//1 #N/A 年龄
private String nl;
//1 #N/A 性别 code50
private String xb;
//1 #N/A 0-否;1-是;2-未核查
private String hcbj;
//1 #N/A 精度
private String jd;
//1 #N/A 维度
private String wd;
//1 2 预留字段
private String ylzz1;
//1 2 预留字段
private String ylzz2;
//1 2 预留字段
private String ylzz3;
//1 2 预留字段
private String ylzz4;
//1 2 预留字段
private String ylzz5;
//1 2 存放支队工作库接受时间
private String ylzz6;
//1 2 预留字段
private String ylzz7;
//1 2 预留字段
private String ylzz8;
//1 2 noKey
private String cjfs;
//1 #N/A noKey
private String wfsj1;
//1 #N/A noKey
private String wfdd1;
//1 #N/A noKey
private String lddm1;
//1 #N/A noKey
private String ddms1;
//1 #N/A 驾驶人性质
private String jsrxz;
//1 2 车辆用途
private String clyt;
//1 2 是否提供校车服务
private String xcfw;
//1 2 电子坐标
private String dzzb;
//1 #N/A 是否指导人员
private String sfzdry;
//1 #N/A 学员身份证明号码
private String xysfzmhm;
//1 #N/A 学员姓名
private String xyxm;
//1 2 ylzz11
private String ylzz11;
//1 2 ylzz12
private String ylzz12;
//1 2 ylzz13
private String ylzz13;
//1 2 ylzz14
private String ylzz14;
//1 2 ylzz15
private String ylzz15;
//1 2 ylzz16
private String ylzz16;
//1 2 ylzz17
private String ylzz17;
//1 2 ylzz18
private String ylzz18;
//#N/A 2 序号
private String xh;
//#N/A 2 违法采集机关
private String cjjg;
//#N/A 2 采集机关名称
private String cjjgmc;
//#N/A 2 发动机号
private String fdjh;
//#N/A 2 车辆识别代号
private String clsbdh;
//#N/A 2 车身颜色
private String csys;
//#N/A 2 车辆品牌
private String clpp;
//#N/A 2 通知书号
private String tzsh;
//#N/A 2 通知标记:0未通知,1已通知,2无需通知
private String tzbj;
//#N/A 2 通知日期
private String tzrq;
//#N/A 2 处理标记
private String clbj;
//#N/A 2 强制措施处理标记: 0,简易处罚;1,一般处罚完毕;2,转入一般处罚但尚未完毕)
private String qzclbj;
//#N/A 2 经办人
private String jbr;
//#N/A 2 异地处理标记:0未异地处理,1本地处理本省外地市记录,2本地处理外省记录,3本省外地市处理本地记录,4发现地处理记录
private String ydclbj;
// #N/A 2 视频地址
private String spdz;
// #N/A 2 设备编号
private String sbbh;
// 图片
private String fastDfsUrl1;
private String fastDfsUrl2;
private String fastDfsUrl3;
private String fastDfsUrl4;
private String fastDfsUrl5;
private String fastDfsUrl6;
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.DmsmpService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class DmsmpSchedule {
@Autowired
private DmsmpService dmsmpService;
/**
* 临时号牌办理次数预警
*/
@Scheduled(cron = "0 10 9 * * ?")
public void tempPlateWarn(){
dmsmpService.tempplateWarn(4);
}
/**
* 制证后退办预警
*/
// @Scheduled(cron = "0 13 9 * * ?")
public void certicateQuit(){
dmsmpService.certicateQuit(5);
}
/**
* 共同所有人变更次数预警
*/
// @Scheduled(cron = "0 15 9 * * ?")
public void commonSyr(){
dmsmpService.commonSyr(5);
}
/**
* 驾驶证名称和身份证明号码同时变化预警
*/
// @Scheduled(cron = "0 17 9 * * ?")
public void drvNameAndCard(){
dmsmpService.drvNameAndCard(5);
}
/**
* 号牌号码和所有人同时变化预警
*/
// @Scheduled(cron = "0 17 9 * * ?")
public void plateAndSyrChange(){
dmsmpService.plateAndSyrChange(10);
}
/**
* 同一所有人多次变更号牌
*/
// @Scheduled(cron = "0 19 9 * * ?")
public void syrPlateChangeMany(){
dmsmpService.syrPlateChangeMany(5);
}
/**
* 车辆短期解押
*/
// @Scheduled(cron = "0 21 9 * * ?")
public void getHisMortgageCancel(){
dmsmpService.getHisMortgageCancel(5);
}
/**
* 退办业务短期办结
*/
// @Scheduled(cron = "0 23 9 * * ?")
public void getHisRetireDealAgain(){
dmsmpService.getHisRetireDealAgain(5);
}
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.EveryDayScheduleService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class EveryDayScheduleTask {
@Autowired
private EveryDayScheduleService everyDayScheduleService;
/**
* 机动车 每天
*/
@Scheduled(cron = "0 30 06 * * ?")
public void putVehTOHbase() {
everyDayScheduleService.getVehicleData(5);
}
/**
* 违法 每天
*/
@Scheduled(cron = "0 30 06 * * ?")
public void putVioToHbase() {
everyDayScheduleService.getViolationData(1);
}
/**
* 驾驶证 每天
*/
@Scheduled(cron = "0 30 6 * * ?")
public void putDrivinglicense() {
everyDayScheduleService.getDrivinglicenseData(5);
}
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.UpdateEsAcdService;
import com.hikcreate.update_hbase.service.VehFlowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class UpdateEs {
@Autowired
private UpdateEsAcdService updateEsAcdService;
@Autowired
private VehFlowService vehFlowService;
@Value("${es.acd.day.int}")
private int pastDayAcd;
@Value("${es.vehflow.day.int}")
private int pastDayVehflow;
@Scheduled(cron = "0 10 8 * * ?")
public void AcdEs10Day() {
updateEsAcdService.AcdEsDay(pastDayAcd);
}
@Scheduled(cron = "0 5/20 * * * ?")
public void AcdEs20Min(){
updateEsAcdService.AcdEs20Min();
}
@Scheduled(cron = "0 15 8 * * ?")
public void AcdHuman10day(){
updateEsAcdService.AcdHumanDay(pastDayAcd);
}
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.UpdateHbaseService;
import com.hikcreate.update_hbase.service.VioEsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class UpdateHbase {
@Autowired
private UpdateHbaseService updateHbaseService;
@Autowired
private VioEsService vioEsService;
@Value("${thread.num}")
private int threadNum;
/**
* 机动车调度
*/
@Scheduled(cron = "0 3/20 * * * ?")
public void vehicleUpdateHbase(){
updateHbaseService.vehicleUpdateHbase(null);
}
/**
* 违法调度
*/
@Scheduled(cron = "0 0/20 * * * ?")
public void vioUpdateHbase(){
updateHbaseService.vioUpdateHbase(null);
}
/**
* 驾驶人调度
*/
@Scheduled(cron = "0 3/20 * * * ?")
public void driverUpdateHbase(){
updateHbaseService.driverUpdateHbase(null);
}
/**
* 违法每天
*/
@Scheduled(cron = "0 0 8 * * ?")
public void toEsVio() {
for(int i=0;i<threadNum;i++){
int finalI = i;
new Thread(() -> {vioEsService.toEsPastDay(finalI);}).start();
}
}
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.VehFlowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class VehFlowSchedule {
@Autowired
private VehFlowService vehFlowService;
// @Scheduled(cron = "0 0 8 * * ?")
public void vehFlow2EsDay(){
vehFlowService.vehFlow2EsDay(10);
}
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.VehicleService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class VehSchedule {
@Autowired
private VehicleService vehicleService;
/**
* 机动车每天同步
*/
@Scheduled(cron = "0 0 7 * * ?")
public void vehDay(){
try {
vehicleService.vehDay();
}catch (Exception e){
e.printStackTrace();
}
}
}
package com.hikcreate.update_hbase.schedule;
import com.hikcreate.update_hbase.service.SurveilService;
import com.hikcreate.update_hbase.service.VioEsService;
import com.hikcreate.update_hbase.service.ViolationService;
import com.hikcreate.update_hbase.utils.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class VioSchedule {
@Autowired
private VioEsService vioEsService;
@Autowired
private ViolationService violationService;
@Autowired
private SurveilService surveilService;
/**
* 违法删除数据每天同步
*/
@Scheduled(cron = "0 0 11 * * ?")
public void vioDelDay(){
vioEsService.vioDelDay();
}
/**
* violation表ES数据同步:vm_vio_violation,vm_vio_violation_his,vm_vio_violation_del,vio_violation
*/
@Scheduled(cron = "0 0 8 * * ?")
public void violationDay(){
violationService.violationDay(5);
}
/**
* surveil表数据同步:vm_vio_surveil,vm_vio_surveil_his,vm_vio_surveil_del,vio_surveil
*/
@Scheduled(cron = "0 0 7 * * ?")
public void surveilDay(){
surveilService.surveilDay(5);
}
}
package com.hikcreate.update_hbase.service;
public interface DmsmpService {
void tempplateWarn(int i);
void plateAndSyrChange(int i);
void commonSyr(int i);
void drvNameAndCard(int day);
void certicateQuit(int i);
void syrPlateChangeMany(int i);
void getHisMortgageCancel(int i);
void getHisRetireDealAgain(int i);
void synDeformedproposer(int i);
}
package com.hikcreate.update_hbase.service;
/**
* @author yulong shu
* @version 1.0
* @date 2021/5/12 16:26
*/
public interface DriverService {
void insertDriver2Hbase(String idcard);
}
package com.hikcreate.update_hbase.service;
public interface EveryDayScheduleService {
void getViolationData(int days);
void getVehicleData(int days);
void getDrivinglicenseData(int days);
void getDrvFlowDayData(int day);
void getDrvFlowMinuData(String day);
void getDrvFlowIndexAll();
}
package com.hikcreate.update_hbase.service;
public interface SurveilService {
void surveilDay(int i);
void sysSurSmallData();
void imDataOracle2EsById(String... ids);
void insertDataOrc2EsBatch(String day);
}
package com.hikcreate.update_hbase.service;
public interface UpdateEsAcdService {
void AcdEsDay(int i);
void AcdEs20Min();
void AcdHumanDay(int pastDay);
}
package com.hikcreate.update_hbase.service;
public interface UpdateHbaseService {
void vehicleUpdateHbase(String day);
void driverUpdateHbase(String day);
void vioUpdateHbase(String day);
void delColumn(String tableStr, String family, String col);
void insertDriverById(String... ids);
}
package com.hikcreate.update_hbase.service;
public interface VehFlowService {
void vehFlow2EsDay(int i);
}
package com.hikcreate.update_hbase.service;
import com.hikcreate.update_hbase.entity.IndexHtable;
import java.sql.SQLException;
import java.util.List;
public interface VehicleIndexService {
void listVehIndexall();
void listVehIndexyesterday();
void listVehIndexExtra();
}
package com.hikcreate.update_hbase.service;
import java.io.IOException;
public interface VehicleService {
void getVehicle2Hbase(String hphm, String hpzl);
void getVehicle2HbaseAllRealTime(String day);
void delVehCheck() throws Exception;
void vehDay() throws Exception;
void getVmVehicle2Hbase(String start, String end);
void imDataOracle2EsById(String... ids);
}
package com.hikcreate.update_hbase.service;
public interface VioEsService {
void getHisVioToEs(String start, String end);
void toEsPastDay(int finalI);
void getHisVioToEsByTable(String start, String end, String table);
void vioDelHisRange(String start, String end, String table);
void vioDelDay();
void getHisVioToEsByHphm(String start, String end, String table, String hphm);
}
package com.hikcreate.update_hbase.service;
import com.hikcreate.update_hbase.entity.IndexHtable;
import java.sql.SQLException;
import java.util.List;
public interface VioIndexService {
void listVioIndexAll();
void listVioIndexYesterday();
}
package com.hikcreate.update_hbase.service;
public interface ViolationService {
void violationDay(int len);
void sysViolationSmallData();
void imDataOracle2EsById(String... ids);
void insertViolation2EsByDte(String start,String end);
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.dao.OracleDao;
import com.hikcreate.update_hbase.entity.Driver;
import com.hikcreate.update_hbase.service.DriverService;
import com.hikcreate.update_hbase.utils.HbaseEntityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import java.util.List;
/**
* @author yulong shu
* @version 1.0
* @date 2021/5/12 16:37
*/
@Service("DriverServiceImpl")
public class DriverServiceImpl implements DriverService {
private static Configuration hbaseConf;
@Value("${hbase.zookeeper.property.clientPort}")
private String hbaseAddress;
@Value("${hbase.zookeeper.quorum}")
private String hbaseZkQuorum;
private static org.apache.hadoop.hbase.client.Connection hbaseConn;
@Value("${hbase.driver.table}")
private String driverTableStr;
@Value("${hbase.driver_index.table}")
private String driverIndexTableStr;
private static TableName driverTableName;
private static Table driverTable;
private static TableName driverIndexTableName;
private static Table driverIndexTable;
@Autowired
private OracleDao oracleDao;
private void openHbaseCon() {
hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseAddress);
hbaseConf.set("hbase.zookeeper.quorum", hbaseZkQuorum);
}
@Override
public void insertDriver2Hbase(String idcard) {
String rowkey;
//flag=0:不插入,flag=1:插入hbase并发kafka并写索引表,
int flag;
Put put;
openHbaseCon();
try {
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
driverTable = hbaseConn.getTable(TableName.valueOf(driverTableStr));
driverIndexTable = hbaseConn.getTable(TableName.valueOf(driverIndexTableStr));
Driver driver = oracleDao.getDriverByIdcard(idcard);
int count = 0;
driver.setSfzmhm(driver.getSfzmhm().trim());
driver.setXm(driver.getXm().trim());
flag = 1;
rowkey = (driver.getSfzmhm() == null ? "null" : driver.getSfzmhm());
Get get = new Get(rowkey.getBytes());
get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gxsj"));
Result result = driverTable.get(get);
if (result.size() > 0) {//已存在hbase中,比较gxsj
for (Cell cell : result.rawCells()) {
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
if ("gxsj".equals(colName) && value.length() >= 19 && driver.getGxsj().length() >= 19
&& driver.getGxsj().substring(0, 19).compareTo(value.substring(0, 19)) < 0) {
flag = 0;
} else {
flag = 1;
}
}
} else {
flag = 1;
}
if (flag == 1) {
put = new Put(rowkey.getBytes());
put = HbaseEntityUtil.getHbaseEntityPut(driver, put);
driverTable.put(put);
// logger.info("drv-1--rowkey:" + rowkey);
Put put1 = new Put((driver.getDabh() == null ? "null" : driver.getDabh()).getBytes());
put1.addColumn("key_fam".getBytes(), "keyone".getBytes(), rowkey.getBytes());
driverIndexTable.put(put1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
closeHbase();
}
}
private void closeHbase() {
if (hbaseConn != null) {
try {
hbaseConn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
closeHbaseTable(driverTable);
closeHbaseTable(driverIndexTable);
}
private void closeHbaseTable(Table table) {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.hikcreate.update_hbase.service.impl;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
/**
* @author lifuyi
* @date ${Date}
* @description
*/
@Component
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
}
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.Repo.SurveilRepository;
import com.hikcreate.update_hbase.dao.SurveilMapper;
import com.hikcreate.update_hbase.entity.Surveil;
import com.hikcreate.update_hbase.entity.Violation;
import com.hikcreate.update_hbase.service.SurveilService;
import com.hikcreate.update_hbase.utils.DateUtil;
import com.hikcreate.update_hbase.utils.EsUtils;
import com.hikcreate.update_hbase.utils.HbaseEntityUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class SurveilServiceImpl implements SurveilService {
@Autowired
private SurveilMapper surveilMapper;
@Autowired
private SurveilRepository surveilRepository;
@Value("${hbase.es.surveil}")
private String hbaseTableStr;
@Autowired
private EsUtils esUtils;
@Value("${hbase.zookeeper.property.clientPort}")
private String hbaseAddress;
@Value("${hbase.zookeeper.quorum}")
private String hbaseZkQuorum;
@Override
public void imDataOracle2EsById(String... ids) {
System.out.println(Arrays.toString(ids));
List<Surveil> list = surveilMapper.getSurveilById(Arrays.asList(ids));
System.out.println("list size:"+list.size());
for (Surveil surveil : list) {
surveilRepository.save(surveil);
}
}
/**
* 指定同步莫一天的数据
* @param day
*/
@Override
public void insertDataOrc2EsBatch(String day){
try {
List<Surveil> list = surveilMapper.insertSurveilBatch(day);
System.out.println("list size:" + list.size());
if (list.size() == 0) {
System.out.println("当天没有数据");
} else {
for (Surveil surveil : list) {
Optional<Surveil> byId = surveilRepository.findById(surveil.getWfbh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(surveil.getGxsj()) >= 0) {
continue;
} else {
surveilRepository.save(surveil);
}
}
}
}
catch(Exception e){
e.printStackTrace();
}
}
@Override
public void surveilDay(int len) {
for (int i = 0; i < len; i++) {
int finalI = i;
new Thread(() -> {
try {
List<Surveil> vmList = surveilMapper.getVmSurveil(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Surveil> list = surveilMapper.getSurveil(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Surveil> vmHisList = surveilMapper.getVmSurveilHis(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Surveil> vmDelList = surveilMapper.getVmSurveilDel(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Surveil> finalList = new ArrayList<>();
finalList.addAll(vmList);
finalList.addAll(list);
finalList.addAll(vmHisList);
finalList.addAll(vmDelList);
org.apache.hadoop.hbase.client.Connection hbaseConn = esUtils.getHbseCon(hbaseAddress, hbaseZkQuorum);
Table violationTable = hbaseConn.getTable(TableName.valueOf(hbaseTableStr));
for (Surveil surveil : finalList) {
Optional<Surveil> byId = surveilRepository.findById(surveil.getXh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(surveil.getGxsj()) >= 0) {
continue;
}else {
surveilRepository.save(surveil);
Put put=new Put(surveil.getXh().getBytes());
put= HbaseEntityUtil.getHbaseEntityPutNoPrefix(surveil,put);
violationTable.put(put);
}
}
esUtils.closeHbaseTable(violationTable);
esUtils.closeHbaseCon(hbaseConn);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
/**
* 同步小数据量表到Es的非现场违法表:vio_surveil,vm_vio_surveil_del
* @return
*/
@Override
public void sysSurSmallData() {
//vio_surveil
for (int i = 1; i < 16; i++) {
int finalI = i;
new Thread(() -> {
try {
List<Surveil> list = surveilMapper.getSurveil(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
org.apache.hadoop.hbase.client.Connection hbaseConn = esUtils.getHbseCon(hbaseAddress, hbaseZkQuorum);
Table violationTable = hbaseConn.getTable(TableName.valueOf(hbaseTableStr));
for (Surveil surveil : list) {
Optional<Surveil> byId = surveilRepository.findById(surveil.getXh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(surveil.getGxsj()) >= 0) {
continue;
} else {
surveilRepository.save(surveil);
Put put = new Put(surveil.getXh().getBytes());
put = HbaseEntityUtil.getHbaseEntityPutNoPrefix(surveil, put);
violationTable.put(put);
}
}
esUtils.closeHbaseTable(violationTable);
esUtils.closeHbaseCon(hbaseConn);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
//vm_vio_surveil_del
try {
List<Surveil> vmDelList = surveilMapper.getSurveilDelAll();
org.apache.hadoop.hbase.client.Connection hbaseConn = esUtils.getHbseCon(hbaseAddress, hbaseZkQuorum);
Table violationTable = hbaseConn.getTable(TableName.valueOf(hbaseTableStr));
for (Surveil surveil : vmDelList) {
Optional<Surveil> byId = surveilRepository.findById(surveil.getXh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(surveil.getGxsj()) >= 0) {
continue;
} else {
surveilRepository.save(surveil);
Put put = new Put(surveil.getXh().getBytes());
put = HbaseEntityUtil.getHbaseEntityPutNoPrefix(surveil, put);
violationTable.put(put);
}
}
esUtils.closeHbaseTable(violationTable);
esUtils.closeHbaseCon(hbaseConn);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.Repo.AcdAllRepository;
import com.hikcreate.update_hbase.Repo.AcdHumanRepository;
import com.hikcreate.update_hbase.dao.AcdMapper;
import com.hikcreate.update_hbase.entity.AcdAll;
import com.hikcreate.update_hbase.entity.AcdHuman;
import com.hikcreate.update_hbase.redis.RedisUtil;
import com.hikcreate.update_hbase.service.UpdateEsAcdService;
import com.hikcreate.update_hbase.utils.EsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Optional;
@Service("updateEsAcdServiceImpl")
public class UpdateEsAcdServiceImpl implements UpdateEsAcdService {
private static Logger logger = LoggerFactory.getLogger(UpdateEsAcdServiceImpl.class);
@Autowired
private RedisUtil redisUtil;
@Autowired
private AcdMapper acdDao;
@Autowired
private AcdAllRepository acdAllRepository;
@Autowired
private AcdHumanRepository acdHumanRepository;
@Autowired
private EsUtils esUtils;
@Value("${app.kafka.acd.topic}")
private String acdTopic;
@Value("${app.kafka.acd_human.topic}")
private String acdHumanTopic;
/**
* 事故每天调度
* @param i
*/
@Override
public void AcdEsDay(int i) {
//查询三个事故表中前10天数据
List<AcdAll> acdAlls = acdDao.getFileAcdPastDay(i);
List<AcdAll> dutySimpleAcdAlls = acdDao.getDutySimpleAcdPastDay(i);
List<AcdAll> quickAcdAlls = acdDao.getQuickAcdPastDay(i);
acdAlls.addAll(dutySimpleAcdAlls);
acdAlls.addAll(quickAcdAlls);
if (acdAlls.size() > 0) {
for (AcdAll acdAll : acdAlls) {
insertEsAcd(acdAll);
}
}
}
private void insertEsAcd(AcdAll acdAll){
Optional<AcdAll> byId = acdAllRepository.findById(acdAll.getId());
if(byId.isPresent()){
AcdAll acdAll1 = byId.get();
if(acdAll1.getGxsj().substring(0,19).compareTo(acdAll.getGxsj().substring(0,19))<0){
acdAllRepository.save(acdAll);
esUtils.sendKafka(acdAll.getId(),acdAll,acdAll1,acdTopic);
}
}else {
acdAllRepository.save(acdAll);
esUtils.sendKafka(acdAll.getId(),acdAll,null,acdTopic);
}
}
private void insertEsAcdHuman(AcdHuman acdHuman){
Optional<AcdHuman> byId = acdHumanRepository.findById(acdHuman.getEsid());
if(byId.isPresent()){
acdHumanRepository.save(acdHuman);
}else {
acdHumanRepository.save(acdHuman);
esUtils.sendKafka(acdHuman.getId(),acdHuman,null,acdHumanTopic);
}
}
/**
* 事故定时调度
*/
@Override
public void AcdEs20Min() {
//获取时间
String updateTimeFile = esUtils.getGxsj(redisUtil, null, "es_acdFile_updateTime");
//查询增量数据
new Thread(() -> {
List<AcdAll> acdAlls = acdDao.getIncreAcd(updateTimeFile);
List<AcdHuman> acdHumanList=acdDao.getIncreAcdHuman20Min(updateTimeFile);
if (acdAlls.size() > 0) {
// acdAlls.forEach(acdAll -> EsUtils.insertEsIdVo(acdAll, transportClient, acdIndex, acdType, kafkaTemplate, acdTopic, "esid", "gxsj"));
acdAlls.forEach(acdAll -> {
insertEsAcd(acdAll);
});
redisUtil.set("es_acdFile_updateTime", acdAlls.get(0).getGxsj());
}
if(acdHumanList.size()>0){
acdHumanList.forEach(acdHuman -> insertEsAcdHuman(acdHuman));
}
}).start();
}
/**
* 事故人员每天调度
* @param pastDay
*/
@Override
public void AcdHumanDay(int pastDay) {
List<AcdHuman> acdHumanList= acdDao.getIncreAcdHuman(pastDay);
List<AcdHuman> acdDutySimpleHumanList= acdDao.getIncreAcdDutySimpleHuman(pastDay);
List<AcdHuman> acdQuickHumanList=acdDao.getIncreAcdQuickHuman(pastDay);
new Thread(()->{
if(acdHumanList.size()>0){
acdHumanList.forEach(acdHuman -> insertEsAcdHuman(acdHuman));
}
}).start();
new Thread(()->{
if(acdDutySimpleHumanList.size()>0){
acdDutySimpleHumanList.forEach(acdHuman -> insertEsAcdHuman(acdHuman));
}
}).start();
new Thread(()->{
if(acdQuickHumanList.size()>0){
acdQuickHumanList.forEach(acdHuman -> insertEsAcdHuman(acdHuman));
}
}).start();
}
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.Repo.VehFlowRepository;
import com.hikcreate.update_hbase.dao.VehFlowMapper;
import com.hikcreate.update_hbase.entity.VehFlow;
import com.hikcreate.update_hbase.service.VehFlowService;
import com.hikcreate.update_hbase.utils.EsConfig;
import org.apache.lucene.index.Terms;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Service("vehFlowServiceImpl")
public class VehFlowServiceImpl implements VehFlowService {
private static Logger logger = LoggerFactory.getLogger(VehFlowServiceImpl.class);
@Autowired
private VehFlowMapper vehFlowMapper;
@Autowired
private VehFlowRepository vehFlowRepository;
/**
* 每天同步机动车流水表数据,现在只同步到了ES
* @param day
*/
@Override
public void vehFlow2EsDay(int day) {
List<VehFlow> vehFlowList=vehFlowMapper.getVmPast(day);
List<VehFlow> vehFlowList1=vehFlowMapper.getTablePast(day);
vehFlowList.addAll(vehFlowList1);
if(vehFlowList.size()>0){
for(VehFlow vehFlow:vehFlowList){
Optional<VehFlow> byId = vehFlowRepository.findById(vehFlow.getLsh());
if(byId.isPresent() && byId.get().getGxrq().compareTo(vehFlow.getGxrq())>0){
continue;
}else {
vehFlowRepository.save(vehFlow);
}
}
}
logger.info("veh_flow完成");
}
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.entity.IndexHtable;
import com.hikcreate.update_hbase.service.VehicleIndexService;
import com.hikcreate.update_hbase.utils.ConnectUtil;
import com.hikcreate.update_hbase.utils.HbaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@Service("VehicleIndexServiceImpl")
public class VehicleIndexServiceImpl implements VehicleIndexService {
private static Logger logger = LoggerFactory.getLogger(VehicleIndexServiceImpl.class);
@Autowired
private ConnectUtil connectUtil;
@Value("${listVehIndexall.sql}")
private String listVehIndexall_sql;
@Value("${listVehIndexyesterday.sql}")
private String listVehIndexyesterday_sql;
@Value("${listVehIndexExtra.sql}")
private String listVehIndexExtra_sql;
@Autowired
private HbaseUtil hbaseUtil;
/**
* 机动车全量数据
*/
@Override
public void listVehIndexall() {
logger.info("开始全量查询查询Oracle表:ZCKJ.VM_VEHICLE_GY");
List<IndexHtable> indexHtables = null;
try {
indexHtables = selectVeh(listVehIndexall_sql);
} catch (SQLException e) {
e.printStackTrace();
}
List<Put> puts = new ArrayList<>();
logger.info("ZCKJ.VM_VEHICLE_GY全量查询结果组建put集合");
if (indexHtables != null && indexHtables.size() > 0) {
for (IndexHtable indexHtable : indexHtables) {
Put put = new Put(Bytes.toBytes(indexHtable.getRowkey()));
put.addColumn(Bytes.toBytes("key_fam"), Bytes.toBytes(indexHtable.getCol()), Bytes.toBytes(indexHtable.getValue()));
puts.add(put);
}
logger.info("put集合put到hbase表:veh:veh_index 中");
}
try {
hbaseUtil.addPuts("veh:veh_index", puts);
logger.info("put集合put到hbase表:veh:veh_index 成功");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 机动车昨日数据
*/
@Override
public void listVehIndexyesterday() {
logger.info("开始查询Oracle表昨天数据:ZCKJ.VM_VEHICLE_GY");
List<IndexHtable> indexHtables = null;
try {
indexHtables = selectVeh(listVehIndexyesterday_sql);
} catch (SQLException e) {
e.printStackTrace();
}
List<Put> puts = new ArrayList<>();
logger.info("ZCKJ.VM_VEHICLE_GY昨日查询结果组建put集合");
if (indexHtables != null && indexHtables.size() > 0) {
for (IndexHtable indexHtable : indexHtables) {
Put put = new Put(Bytes.toBytes(indexHtable.getRowkey()));
put.addColumn(Bytes.toBytes("key_fam"), Bytes.toBytes(indexHtable.getCol()), Bytes.toBytes(indexHtable.getValue()));
puts.add(put);
}
logger.info("put集合put到hbase表:veh:veh_index 中");
}
try {
hbaseUtil.addPuts("veh:veh_index", puts);
logger.info("put集合put到hbase表:veh:veh_index 成功");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void listVehIndexExtra() {
logger.info("开始查询Oracle表实时表额外数据:ZCKJ.VEHICLE_GY");
List<IndexHtable> indexHtables = null;
try {
indexHtables = selectVeh(listVehIndexExtra_sql);
} catch (SQLException e) {
e.printStackTrace();
}
List<Put> puts = new ArrayList<>();
logger.info("ZCKJ.VEHICLE_GY相对于ZCKJ.VM_VEHICLE_GY额外数据组建put集合");
if (indexHtables != null && indexHtables.size() > 0) {
for (IndexHtable indexHtable : indexHtables) {
Put put = new Put(Bytes.toBytes(indexHtable.getRowkey()));
put.addColumn(Bytes.toBytes("key_fam"), Bytes.toBytes(indexHtable.getCol()), Bytes.toBytes(indexHtable.getValue()));
puts.add(put);
}
logger.info("put集合put到hbase表:veh:veh_index 中");
}
try {
hbaseUtil.addPuts("veh:veh_index", puts);
logger.info("put集合put到hbase表:veh:veh_index 成功");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 查询Oracle的机动车基本信息表
*
* @param sql 查询语句
* @return 索引字段结果
* @throws SQLException sql异常
*/
private List<IndexHtable> selectVeh(String sql) throws SQLException {
Connection connection = connectUtil.getConnect();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
List<IndexHtable> Indexlist = new ArrayList<>();
while (resultSet.next()) {
IndexHtable indexHtable = new IndexHtable();
indexHtable.setCol(resultSet.getString("ccdjrq"));
indexHtable.setRowkey(resultSet.getString("sfzmhm"));
indexHtable.setValue(resultSet.getString("hphm") + "#" + resultSet.getString("hpzl"));
Indexlist.add(indexHtable);
}
ConnectUtil.close(preparedStatement);
ConnectUtil.close(connection);
return Indexlist;
}
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.entity.IndexHtable;
import com.hikcreate.update_hbase.utils.ConnectUtil;
import com.hikcreate.update_hbase.utils.HbaseUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@Component
public class VioIndexServiceImpl implements com.hikcreate.update_hbase.service.VioIndexService {
@Autowired
private ConnectUtil connectUtil;
@Value("${listVioIndexAll.sql}")
private String listVioIndexAll_sql;
@Value("${listVioIndexYesterday.sql}")
private String listVioIndexYesterday_sql;
@Autowired
private HbaseUtil hbaseUtil;
private static Logger logger = LoggerFactory.getLogger(VioIndexServiceImpl.class);
@Override
public void listVioIndexAll() {
List<IndexHtable> indexHtables = null;
try {
logger.info("start query -->> " + listVioIndexAll_sql);
indexHtables = selectVio(listVioIndexAll_sql);
} catch (SQLException e) {
e.printStackTrace();
}
List<Put> puts = new ArrayList<>();
logger.info("getting List<IndexHtable>");
if (indexHtables != null && indexHtables.size() > 0) {
for (IndexHtable indexHtable : indexHtables) {
Put put = new Put(Bytes.toBytes(indexHtable.getRowkey()));
put.addColumn(Bytes.toBytes("key_fam"), Bytes.toBytes(indexHtable.getCol()), Bytes.toBytes(indexHtable.getValue()));
puts.add(put);
}
logger.info("get List<vio_index> Success");
}
try {
logger.info("Starting put List<vio_index> to Hbase");
hbaseUtil.addPuts("vio:vio_index", puts);
logger.info("Put vio:vio_index 全表 Success");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void listVioIndexYesterday() {
List<IndexHtable> indexHtables = null;
try {
logger.info("start query -->> " + listVioIndexYesterday_sql);
indexHtables = selectVio(listVioIndexYesterday_sql);
logger.info("query success");
} catch (SQLException e) {
e.printStackTrace();
}
List<Put> puts = new ArrayList<>();
logger.info("getting List<IndexHtable>");
if (indexHtables != null && indexHtables.size() > 0) {
for (IndexHtable indexHtable : indexHtables) {
Put put = new Put(Bytes.toBytes(indexHtable.getRowkey()));
put.addColumn(Bytes.toBytes("key_fam"), Bytes.toBytes(indexHtable.getCol()), Bytes.toBytes(indexHtable.getValue()));
puts.add(put);
}
logger.info("get List<vio_index> Success");
}
try {
logger.info("Starting put List<vio_index> to Hbase");
hbaseUtil.addPuts("vio:vio_index", puts);
logger.info("Put vio:vio_index 天表 Success");
} catch (IOException e) {
e.printStackTrace();
}
}
//查违法vio_vioaltion表
private List<IndexHtable> selectVio(String sql) throws SQLException {
Connection connection = connectUtil.getConnect();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
List<IndexHtable> Indexlist = new ArrayList<>();
while (resultSet.next()) {
IndexHtable indexHtable = new IndexHtable();
indexHtable.setCol(resultSet.getString("col"));
indexHtable.setRowkey(resultSet.getString("rowkey"));
indexHtable.setValue(resultSet.getString("value"));
Indexlist.add(indexHtable);
}
ConnectUtil.close(preparedStatement);
ConnectUtil.close(connection);
return Indexlist;
}
}
package com.hikcreate.update_hbase.service.impl;
import com.hikcreate.update_hbase.Repo.ViolationEs;
import com.hikcreate.update_hbase.Repo.ViolationRepository;
import com.hikcreate.update_hbase.dao.ViolationMapper;
import com.hikcreate.update_hbase.entity.Violation;
import com.hikcreate.update_hbase.service.ViolationService;
import com.hikcreate.update_hbase.utils.DateUtil;
import com.hikcreate.update_hbase.utils.EsUtils;
import com.hikcreate.update_hbase.utils.HbaseEntityUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class ViolationServiceImpl implements ViolationService {
@Autowired
private ViolationMapper violationMapper;
@Autowired
private ViolationEs violationEs;
@Autowired
private ViolationRepository violationRepository;
@Value("${hbase.es.violation}")
private String hbaseTableStr;
@Autowired
private EsUtils esUtils;
@Value("${hbase.zookeeper.property.clientPort}")
private String hbaseAddress;
@Value("${hbase.zookeeper.quorum}")
private String hbaseZkQuorum;
@Override
public void imDataOracle2EsById(String... ids) {
System.out.println(Arrays.toString(ids));
List<Violation> list = violationMapper.getViolationById(Arrays.asList(ids));
System.out.println("list size:"+list.size());
for (Violation violation : list) {
violationEs.save(violation);
}
}
/**
* 同步指定时间段的violation数据:vm_vio_violation,vm_vio_violation_his,vm_vio_violation_del,vio_violation
*/
@Override
public void violationDay(int len) {
for (int i = 0; i < len; i++) {
int finalI = i;
new Thread(() -> {
try {
List<Violation> vmList = violationMapper.getVmViolation(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Violation> list = violationMapper.getViolation(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Violation> vmHisList = violationMapper.getVmViolationHis(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Violation> vmDelList = violationMapper.getVmViolationDel(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
List<Violation> finalList = new ArrayList<>();
finalList.addAll(vmList);
finalList.addAll(list);
finalList.addAll(vmHisList);
finalList.addAll(vmDelList);
org.apache.hadoop.hbase.client.Connection hbaseConn = esUtils.getHbseCon(hbaseAddress, hbaseZkQuorum);
Table violationTable = hbaseConn.getTable(TableName.valueOf(hbaseTableStr));
for (Violation violation : finalList) {
Optional<Violation> byId = violationRepository.findById(violation.getWfbh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(violation.getGxsj()) >= 0) {
continue;
} else {
violationRepository.save(violation);
Put put = new Put(violation.getWfbh().getBytes());
put = HbaseEntityUtil.getHbaseEntityPutNoPrefix(violation, put);
violationTable.put(put);
}
}
esUtils.closeHbaseTable(violationTable);
esUtils.closeHbaseCon(hbaseConn);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
/**
*指定时间段同步到violation数据到es
*/
@Override
public void insertViolation2EsByDte(String start,String end){
try {
List<Violation> list = violationMapper.getVmViolation(start, end);
for (Violation violation : list) {
Optional<Violation> byId = violationRepository.findById(violation.getWfbh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(violation.getGxsj()) >= 0) {
continue;
} else {
violationRepository.save(violation);
}
}
}
catch(Exception e){
e.printStackTrace();
}
}
/**
* 同步小数据量表到Es的现场违法表:vio_violation,vm_vio_violation_del
*/
@Override
public void sysViolationSmallData() {
//vio_violation
for (int i = 1; i < 16; i++) {
int finalI = i;
new Thread(() -> {
try {
List<Violation> list = violationMapper.getViolation(DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI)), DateUtil.formatDate(DateUtil.getPastDate(new Date(), -finalI + 1)));
org.apache.hadoop.hbase.client.Connection hbaseConn = esUtils.getHbseCon(hbaseAddress, hbaseZkQuorum);
Table violationTable = hbaseConn.getTable(TableName.valueOf(hbaseTableStr));
for (Violation violation : list) {
Optional<Violation> byId = violationRepository.findById(violation.getWfbh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(violation.getGxsj()) >= 0) {
continue;
} else {
violationRepository.save(violation);
Put put = new Put(violation.getWfbh().getBytes());
put = HbaseEntityUtil.getHbaseEntityPutNoPrefix(violation, put);
violationTable.put(put);
}
}
esUtils.closeHbaseTable(violationTable);
esUtils.closeHbaseCon(hbaseConn);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
//vm_vio_violation_del
try {
List<Violation> vmDelList = violationMapper.getViolationDelAll();
org.apache.hadoop.hbase.client.Connection hbaseConn = esUtils.getHbseCon(hbaseAddress, hbaseZkQuorum);
Table violationTable = hbaseConn.getTable(TableName.valueOf(hbaseTableStr));
for (Violation violation : vmDelList) {
Optional<Violation> byId = violationRepository.findById(violation.getWfbh());
if (byId.isPresent() && byId.get().getGxsj().compareTo(violation.getGxsj()) >= 0) {
continue;
} else {
violationRepository.save(violation);
Put put = new Put(violation.getWfbh().getBytes());
put = HbaseEntityUtil.getHbaseEntityPutNoPrefix(violation, put);
violationTable.put(put);
}
}
esUtils.closeHbaseTable(violationTable);
esUtils.closeHbaseCon(hbaseConn);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.hikcreate.update_hbase;
import com.alibaba.druid.support.json.JSONUtils;
import java.util.Arrays;
public class testCase {
public static void main(String[] args) {
String[] ids=new String[]{"1","2","3"};
StringBuffer sb=new StringBuffer();
for (String id : ids) {
sb.append(id+',');
}
sb.deleteCharAt(sb.length()-1);
System.out.println(sb.toString());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment