Commit b937c9b0 by dufafei

1

parent bfbd3eb8
......@@ -16,6 +16,9 @@
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.0</spark.version>
<ignite.version>2.6.0</ignite.version>
......
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
public class Test {
public static void main(String[] args) {
DateTime eventTime = DateTime.parse("2019-10-17 8:51:36",DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")); //事件日期
String time = eventTime.toString("yyyy-MM-dd HH:mm:ss");
System.out.println(time);
}
}
package com.hikcreate.ignite.pojo;
package com.hikcreate.ignite.domain;
import java.io.Serializable;
......
package com.hikcreate.ignite.pojo.basic;
package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
......
package com.hikcreate.ignite.pojo.alarm;
package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
......
package com.hikcreate.ignite.pojo.alarm;
package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
public class DailyAlarmProcess implements Serializable {
public class DailyAlarmDeal implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -26,14 +26,16 @@ public class DailyAlarmProcess implements Serializable {
@QuerySqlField
private Boolean isDeal; //是否处理
public DailyAlarmProcess(String useNature, String supervisionId, String warnTime,Boolean isDeal) {
public DailyAlarmDeal(String vehicleNo, String vehicleColor, String useNature, String supervisionId, String warnTime, Boolean isDeal) {
this.vehicleNo = vehicleNo;
this.vehicleColor = vehicleColor;
this.useNature = useNature;
this.supervisionId = supervisionId;
this.warnTime = warnTime;
this.isDeal = isDeal;
}
public DailyAlarmProcess(Boolean isDeal) {
public DailyAlarmDeal(Boolean isDeal) {
this.isDeal = isDeal;
}
}
package com.hikcreate.ignite.pojo.alarm;
package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
......
package com.hikcreate.ignite.pojo.basic;
package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
public class AlarmInfo implements Serializable {
public class AlarmTypeInfo implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -25,7 +25,7 @@ public class AlarmInfo implements Serializable {
@QuerySqlField
private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss
public AlarmInfo(Long id, String warningBigType, String warningSmallType, String warningTypeCode, Long warningLevelCode, String warningTypeName, String status, String businessTime) {
public AlarmTypeInfo(Long id, String warningBigType, String warningSmallType, String warningTypeCode, Long warningLevelCode, String warningTypeName, String status, String businessTime) {
this.id = id;
this.warningBigType = warningBigType;
this.warningSmallType = warningSmallType;
......
package com.hikcreate.ignite.pojo.basic;
package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
import java.sql.Date;
public class CompanyInfo implements Serializable {
public class EnterpriseInfo implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -33,7 +33,7 @@ public class CompanyInfo implements Serializable {
@QuerySqlField
private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss
public CompanyInfo(String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) {
public EnterpriseInfo(String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) {
this.enterpriseCode = enterpriseCode;
this.enterpriseName = enterpriseName;
this.socialCreditCode = socialCreditCode;
......@@ -47,18 +47,15 @@ public class CompanyInfo implements Serializable {
this.businessTime = businessTime;
}
public CompanyInfo(Long id, String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) {
this.id = id;
this.enterpriseCode = enterpriseCode;
this.enterpriseName = enterpriseName;
this.socialCreditCode = socialCreditCode;
this.intoTime = intoTime;
this.province = province;
this.city = city;
this.area = area;
this.intoStatus = intoStatus;
this.status = status;
this.address = address;
this.businessTime = businessTime;
public String getProvince() {
return province;
}
public String getCity() {
return city;
}
public String getArea() {
return area;
}
}
package com.hikcreate.ignite.pojo.basic;
package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
......
package com.hikcreate.ignite.pojo.basic;
package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
......@@ -9,7 +9,7 @@ public class VehicleInfo implements Serializable{
private static final long serialVersionUID = 1L;
@QuerySqlField
private String id; //主键字段
private Long id; //主键字段
@QuerySqlField
private String vehicleCode; //车辆编号
@QuerySqlField
......@@ -50,7 +50,7 @@ public class VehicleInfo implements Serializable{
@QuerySqlField
private String area; //区
public VehicleInfo(String id, String vehicleCode, String plateNum, String plateColor, Date intoTime, String businessScope, String businessScopeDetail, String indentifier, Boolean intoStatus, String status, String operatingCertificateNo, String classLine, String useNature, String enterpriseCode, String drivingPermitNo, String vehicleBrand, String businessTime, String province, String city) {
public VehicleInfo(Long id, String vehicleCode, String plateNum, String plateColor, Date intoTime, String businessScope, String businessScopeDetail, String indentifier, Boolean intoStatus, String status, String operatingCertificateNo, String classLine, String useNature, String enterpriseCode, String drivingPermitNo, String vehicleBrand, String businessTime, String province, String city, String area) {
this.id = id;
this.vehicleCode = vehicleCode;
this.plateNum = plateNum;
......@@ -70,6 +70,7 @@ public class VehicleInfo implements Serializable{
this.businessTime = businessTime;
this.province = province;
this.city = city;
this.area = area;
}
public String getUseNature() {
......@@ -80,11 +81,11 @@ public class VehicleInfo implements Serializable{
this.useNature = useNature;
}
public String getId() {
public Long getId() {
return id;
}
public void setId(String id) {
public void setId(Long id) {
this.id = id;
}
......
package com.hikcreate.ignite.pojo.vehicles;
package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.pojo.vehicles;
package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
......
package com.hikcreate.ignite.pojo.vehicles;
package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.pojo.vehicles;
package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.pojo.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
public class DailyAlarmDeal implements Serializable {
private static final long serialVersionUID = 1L;
@QuerySqlField
private String useNature; //使用性质 公交
@QuerySqlField
private String supervisionId; //报警督办 ID
@QuerySqlField
private String warnTime;//报警时间
@QuerySqlField
private Boolean isDeal = false; //是否处理
public DailyAlarmDeal(String useNature, String supervisionId, String warnTime, Boolean isDeal) {
this.useNature = useNature;
this.supervisionId = supervisionId;
this.warnTime = warnTime;
this.isDeal = isDeal;
}
}
......@@ -7,11 +7,11 @@ import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import java.util.Map;
public class DailyAlarmProcessUpdate implements CacheEntryProcessor<BinaryObject,BinaryObject,Void> {
public class DailyAlarmDealUpdate implements CacheEntryProcessor<BinaryObject,BinaryObject,Void> {
private Map<String,Object> parameter;
public DailyAlarmProcessUpdate(Map<String,Object> parameter){
public DailyAlarmDealUpdate(Map<String,Object> parameter){
this.parameter = parameter;
}
......
......@@ -5,6 +5,7 @@
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="dff"/>
<property name="clientMode" value="true"/>
<property name="consistentId" value="node"/>
<property name="peerClassLoadingEnabled" value="true"/>
......
package com.hikcreate.data.client
import java.util
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
import com.hikcreate.ignite.pojo.PrimaryKey
import com.hikcreate.ignite.pojo.alarm.{DailyAlarm, DailyAlarmDeal, DailyAlarmDetail, DailyAlarmProcess}
import com.hikcreate.ignite.pojo.basic._
import com.hikcreate.ignite.pojo.vehicles.{AlarmNumber, DailyMileage, DriverNumber, VehicleNumber}
import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain.alarm.{AttachmentInfo, DailyAlarm, DailyAlarmDeal, DailyAlarmDetail}
import com.hikcreate.ignite.domain.basic._
import com.hikcreate.ignite.domain.vehicles.{AlarmNumber, DailyMileage, DriverNumber, VehicleNumber}
import com.hikcreate.ignite.processor._
import javax.cache.expiry.{CreatedExpiryPolicy, Duration}
import org.apache.ignite.binary.{BinaryObject, BinaryTypeConfiguration}
import org.apache.ignite.cache.{CacheMode, QueryEntity}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import org.joda.time.DateTime
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
/**
* binary type无法自动更改
......@@ -26,8 +21,6 @@ object IgniteClient {
lazy val ignite: Ignite = Ignition.start("ignite.xml")
def active(): Unit = if(!ignite.cluster.active) ignite.cluster.active(true)
def getBinaryObject(key:Object):BinaryObject = {
val binary = ignite.binary().builder(key.getClass.getSimpleName)
val fields = key.getClass.getDeclaredFields
......@@ -38,8 +31,9 @@ object IgniteClient {
binary.build()
}
/*********************************基础信息表****************************************************************/
ignite.destroyCache("AlarmInfo")
/*********************************基础信息表****************************************************************/
//平台基础信息表
lazy val basicPlatformInfo: IgniteCache[Long, PlatformInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,PlatformInfo]()
......@@ -49,16 +43,18 @@ object IgniteClient {
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[PlatformInfo])
)
//企业基础信息表 -- 企业接入情况
lazy val basicCompanyInfo: IgniteCache[Long, CompanyInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,CompanyInfo]()
//企业基础信息表--企业接入情况
lazy val basicEnterpriseInfo: IgniteCache[Long, EnterpriseInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,EnterpriseInfo]()
.setSqlSchema("BasicInfo")
.setName("CompanyInfo")
.setName("EnterpriseInfo")
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[CompanyInfo])
.setIndexedTypes(classOf[Long],classOf[EnterpriseInfo])
)
//车辆基础信息表 -- 接入车辆数
//车辆基础信息表--接入车辆数
lazy val basicVehicleInfo: IgniteCache[Long, VehicleInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,VehicleInfo]()
.setSqlSchema("BasicInfo")
......@@ -67,15 +63,17 @@ object IgniteClient {
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[VehicleInfo])
)
//报警类型配置基础表
lazy val basicAlarmInfo: IgniteCache[Long, AlarmInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,AlarmInfo]()
lazy val basicAlarmTypeInfo: IgniteCache[Long, AlarmTypeInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,AlarmTypeInfo]()
.setSqlSchema("BasicInfo")
.setName("AlarmInfo")
.setName("AlarmTypeInfo")
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[AlarmInfo])
.setIndexedTypes(classOf[Long],classOf[AlarmTypeInfo])
)
/*********************************营运车辆监测****************************************************************/
/**
......@@ -153,7 +151,7 @@ object IgniteClient {
/**
* 附件历史记录
* 主键:车牌号 车牌颜色 报警时间
* 主键:车牌号 车牌颜色 设备ID 报警时间 序列号
*/
lazy val attachmentCache: IgniteCache[PrimaryKey, AttachmentInfo] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,AttachmentInfo]()
......@@ -179,7 +177,7 @@ object IgniteClient {
.setCacheMode(CacheMode.REPLICATED)
)
def updateDailyAlarmCache(key:BinaryObject,time:String): Unit = {
def updateDailyAlarmNumberCache(key:BinaryObject,time:String): Unit = {
dailyAlarmNumberCache
.withKeepBinary()
.invoke(key,new DailyAlarmUpdate(time))
......@@ -190,27 +188,26 @@ object IgniteClient {
* 主键:车牌号 车牌颜色 报警督办ID
* 数据来源: 报警督办请求消息 报警督办应答消息
*/
lazy val dailyAlarmProcessCache: IgniteCache[PrimaryKey, DailyAlarmProcess] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,DailyAlarmProcess]()
lazy val dailyAlarmDealCache: IgniteCache[PrimaryKey, DailyAlarmDeal] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,DailyAlarmDeal]()
.setSqlSchema("Alarm")
.setName("DailyAlarmProcess")
.setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmProcess])
.setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmDeal])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
def updateDailyAlarmDealCache(key:BinaryObject,map:Map[String,AnyRef]): Unit = {
dailyAlarmProcessCache
dailyAlarmDealCache
.withKeepBinary()
.invoke(key,new DailyAlarmProcessUpdate(map.asJava))
.invoke(key,new DailyAlarmDealUpdate(map))
}
/**
* 今日报警详情
* 主键:车牌号 车牌颜色 报警时间 信息 ID
* 数据来源:上报报警信息消息,智能视频报警附件上传结果上报
* 根据 车牌号,车牌颜色,报警时间 关联 智能视频报警附件上传结果上报 获取视频地址信息
* 主键:车牌号 车牌颜色 报警时间
* 数据来源:上报报警信息消息
*/
lazy val dailyAlarmDetailCache: IgniteCache[PrimaryKey, DailyAlarmDetail] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,DailyAlarmDetail]()
......@@ -224,47 +221,22 @@ object IgniteClient {
def main(args: Array[String]): Unit = {
val a = ignite.binary.`type`("com.hikcreate.ignite.pojo.alarm.AlarmDetail")
ignite.binary()
println(a.fieldTypeName("warnType")) //无法自动更新修改后的字段类型
//ignite.cacheNames().asScala.foreach(println(_))
dailyAlarmProcessCache.clear()
val json1 = JSON.parseObject("{\"appId\":\"10030\",\"businessTime\":\"2019-10-18 17:42:40\",\"dataType\":\"0x9401\",\"msgId\":\"0x9400\",\"superVisionEndTime\":\"1571391220\",\"superVisionId\":\"-566810\",\"superVisionLevel\":\"0\",\"superVisor\":\"supervisor\",\"superVisorEmail\":\"supervisor\",\"superVisorTel\":\"supervisor\",\"vehicleColor\":\"2\",\"vehicleNo\":\"浙D22717\",\"warnSrc\":\"1\",\"warnTime\":\"1571391217\",\"warnType\":\"100\"}")
val vehicleNo = json1.getString("vehicleNo")
val vehicleColor = json1.getString("vehicleColor")
val superVisionId = json1.getString("superVisionId")
val timestamp = json1.getLong("warnTime") * 1000
val time = new DateTime(timestamp)
val warnTime = time.toString("yyyy-MM-dd HH:mm:ss")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmProcess("",superVisionId,warnTime,false)
if(!IgniteClient.dailyAlarmProcessCache.withKeepBinary().putIfAbsent(key,value)){
updateDailyAlarmDealCache(key,Map(
"useNature"->"",
"superVisionId"->superVisionId,
"warnTime"->warnTime))
}
val key1 = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value1 = new DailyAlarmProcess(true)
if(!IgniteClient.dailyAlarmProcessCache.withKeepBinary().putIfAbsent(key1,value1)){
println("aaaaa")
updateDailyAlarmDealCache(key1,Map("isDeal"-> (true:java.lang.Boolean)))
}
//alarmDetailCache.destroy()
//basicCompanyInfo.clear()
//basicVehicleInfo.clear()
/*basicPlatformInfo.destroy()
basicEnterpriseInfo.destroy()
basicVehicleInfo.destroy()
basicAlarmTypeInfo.destroy()*/
//mileageCache.clear()
//driverNumberCache.clear()
//vehicleNumberCache.clear()
//alarmNumberCache.clear()
/*mileageCache.destroy()
driverNumberCache.destroy()
vehicleNumberCache.destroy()
alarmNumberCache.destroy()*/
//dailyAlarmCache.clear()
//alarmDetailCache.clear()
//attachmentCache.clear()
//dailyAlarmDealCache.clear()
/*attachmentCache.destroy()
dailyAlarmNumberCache.destroy()
dailyAlarmDealCache.destroy()
dailyAlarmDetailCache.destroy()*/
ignite.close()
}
......
package com.hikcreate.data
package com.hikcreate.data.offline
import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.Sparking
import com.hikcreate.ignite.pojo.basic.{AlarmInfo, CompanyInfo, VehicleInfo}
import com.hikcreate.data.util.Tools
import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
*
* -DIGNITE_REST_START_ON_CLIENT=true
*/
object Etl extends Sparking{
def main(args: Array[String]): Unit = {
object FullSync extends Sparking{
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.OFF)
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//基本企业信息表
/*sparkSession.sqlContext.read.format("jdbc").options(Map(
sparkSession.sqlContext.read.format("jdbc").options(Map(
"driver"->"com.mysql.jdbc.Driver",
"url"->"jdbc:mysql://10.197.236.152:3306/db_tbd",
"dbtable"->"base_into_enterprise_info",
......@@ -36,12 +32,10 @@ object Etl extends Sparking{
val into_status = row.getBoolean(row.fieldIndex("into_status"))
val status = row.getString(row.fieldIndex("status"))
val address = row.getString(row.fieldIndex("address"))
val value = new CompanyInfo(enterprise_code,enterprise_name,social_credit_code,into_time,province,city,area,into_status,status,
address,"")
IgniteClient.basicCompanyInfo.withKeepBinary().put(id,value)
}*/
val value = new EnterpriseInfo(enterprise_code,enterprise_name,social_credit_code,
into_time,province,city,area,into_status,status, address,"")
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(id,value)
}
//基本车辆信息表
sparkSession.sqlContext.read.format("jdbc").options(Map(
"driver"->"com.mysql.jdbc.Driver",
......@@ -50,28 +44,33 @@ object Etl extends Sparking{
"user"->"root",
"password"->"123456"
)).load().rdd.foreach{ row =>
val id = row.getLong(row.fieldIndex("id")).toString
val id = row.getLong(row.fieldIndex("id"))
val vehicle_code = row.getString(row.fieldIndex("vehicle_code"))
val plate_num = row.getString(row.fieldIndex("plate_num"))
val plate_color = row.getString(row.fieldIndex("plate_color"))
val into_time = row.getDate(row.fieldIndex("into_time"))
val into_status = row.getBoolean(row.fieldIndex("into_status"))
val business_scope = row.getString(row.fieldIndex("business_scope"))
val business_scope_detail = row.getString(row.fieldIndex("business_scope"))
val indentifier = row.getString(row.fieldIndex("business_scope"))
val operating_certificate_no = row.getString(row.fieldIndex("business_scope"))
val class_line = row.getString(row.fieldIndex("business_scope"))
val use_nature = row.getString(row.fieldIndex("business_scope"))
val enterprise_code = row.getString(row.fieldIndex("business_scope"))
val driving_permit_no = row.getString(row.fieldIndex("business_scope"))
val status = row.getString(row.fieldIndex("business_scope"))
val vehicle_brand = row.getString(row.fieldIndex("business_scope"))
val value = new VehicleInfo(id,vehicle_code,plate_num,plate_color,into_time,business_scope,business_scope_detail,indentifier,into_status,
status,operating_certificate_no,class_line,use_nature,enterprise_code,driving_permit_no,vehicle_brand,"","","")
val business_scope_detail = row.getString(row.fieldIndex("business_scope_detail"))
val indentifier = row.getString(row.fieldIndex("indentifier"))
val operating_certificate_no = row.getString(row.fieldIndex("operating_certificate_no"))
val class_line = row.getString(row.fieldIndex("class_line"))
val use_nature = row.getString(row.fieldIndex("use_nature"))
val enterprise_code = row.getString(row.fieldIndex("enterprise_code"))
val driving_permit_no = row.getString(row.fieldIndex("driving_permit_no"))
val status = row.getString(row.fieldIndex("status"))
val vehicle_brand = row.getString(row.fieldIndex("vehicle_brand"))
val enterpriseInfo = Tools.getEnterpriseInfo(enterprise_code)
val province = enterpriseInfo.map(x => x.getProvince).getOrElse("")
val city = enterpriseInfo.map(x => x.getCity).getOrElse("")
val area = enterpriseInfo.map(x => x.getArea).getOrElse("")
val value = new VehicleInfo(
id,vehicle_code,plate_num,plate_color,into_time,business_scope,
business_scope_detail,indentifier,into_status,status,operating_certificate_no,
class_line,use_nature,enterprise_code,driving_permit_no,vehicle_brand,
"",province,city,area)
IgniteClient.basicVehicleInfo.withKeepBinary().put(id,value)
}
//报警类型配置基础表
sparkSession.sqlContext.read.format("jdbc").options(Map(
"driver"->"com.mysql.jdbc.Driver",
......@@ -87,9 +86,9 @@ object Etl extends Sparking{
val warning_big_type = row.getString(row.fieldIndex("warning_big_type"))
val warning_small_type = row.getString(row.fieldIndex("warning_small_type"))
val warning_type_code = row.getString(row.fieldIndex("warning_type_code"))
val value = new AlarmInfo(id,warning_big_type,warning_small_type,warning_type_code,warning_level_id,warning_type_name,status,"")
IgniteClient.basicAlarmInfo.withKeepBinary().put(id,value)
val value = new AlarmTypeInfo(id,warning_big_type,warning_small_type,warning_type_code,warning_level_id,warning_type_name,status,"")
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(id,value)
}
IgniteClient.ignite.close()
}
}
package com.hikcreate.data
package com.hikcreate.data.sync
import com.alibaba.fastjson.serializer.SerializerFeature
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializerFeature
import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const
import com.hikcreate.data.util.ZkManager
import com.hikcreate.ignite.pojo.basic.{CompanyInfo, PlatformInfo, VehicleInfo}
import org.apache.ignite.cache.query.SqlQuery
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain.basic.{EnterpriseInfo, VehicleInfo, PlatformInfo}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ArrayBuffer
object SyncBasic extends Sparking with Logging{
def main(args: Array[String]): Unit = {
IgniteClient.active()
val zkManager = ZkManager(Const.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap,Const.basicsGroupId)
val offsets = zkManager.getBeginOffset(Const.basicsInfoTopic,Const.basicsGroupId)
......@@ -60,17 +59,17 @@ object SyncBasic extends Sparking with Logging{
if(tableName.equals("baseIntoEnterpriseInfo")){ //企业
operation match {
case "add" =>
IgniteClient.basicCompanyInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[CompanyInfo]))
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[EnterpriseInfo]))
case "update" =>
IgniteClient.basicCompanyInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[CompanyInfo]))
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[EnterpriseInfo]))
case "delete" =>
IgniteClient.basicCompanyInfo.withKeepBinary().remove(json.getLong("id"))
IgniteClient.basicEnterpriseInfo.withKeepBinary().remove(json.getLong("id"))
}
}
if(tableName.equals("baseIntoVehicleInfo")){ //车辆
operation match {
case "add" =>
val companyInfo = getCompanyInfo(json.getString("enterpriseCode"))
val companyInfo = Tools.getEnterpriseInfo(json.getString("enterpriseCode"))
if(companyInfo.isDefined){
//json.put("province",companyInfo.get.getProvince)
//json.put("city",companyInfo.get.getCity)
......@@ -85,25 +84,13 @@ object SyncBasic extends Sparking with Logging{
if(tableName.equals("baseWarningType")){
operation match {
case "add" =>
IgniteClient.basicAlarmInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
case "update" =>
IgniteClient.basicAlarmInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
case "delete" =>
IgniteClient.basicAlarmInfo.withKeepBinary().remove(json.getLong("id"))
}
IgniteClient.basicAlarmTypeInfo.withKeepBinary().remove(json.getLong("id"))
}
}
}
//根据车辆信息表的单位编码关联企业基础信息表得到省 市等信息
def getCompanyInfo(enterpriseCode:String): Option[CompanyInfo] = {
val companyInfoSql = new SqlQuery[Long,CompanyInfo](classOf[CompanyInfo],"ENTERPRISECODE = ?")
val companyInfos = IgniteClient.basicCompanyInfo.query(companyInfoSql.setArgs(enterpriseCode)).getAll
if(companyInfos.size() == 1){
val companyInfo = companyInfos.get(0).getValue
Some(companyInfo)
}else{
None
}
}
}
package com.hikcreate.data
package com.hikcreate.data.sync
import java.sql.Connection
import java.util.Locale
import scala.collection.mutable.ArrayBuffer
import com.alibaba.fastjson.{JSON, JSONObject}
import com.hikcreate.data.client.DbClient
import com.hikcreate.data.common.{Logging, Sparking}
......@@ -14,6 +14,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer
object SyncHive extends Sparking with Logging {
def main(args: Array[String]): Unit = {
......
......@@ -3,8 +3,11 @@ package com.hikcreate.data.util
import scala.collection.mutable
import scala.collection.JavaConverters._
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.Logging
import com.hikcreate.data.constant.Const
import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.ignite.cache.query.SqlQuery
import scalaj.http.Http
object Tools extends Logging{
......@@ -46,6 +49,7 @@ object Tools extends Logging{
}
}
//根据经纬度获取地区编码
def getLocationCode(lon:Double,lat:Double): (String,String,String) = {
val locationCode = getAddressAndLocationCode(lon,lat)._2
val provinceCode = locationCode.substring(0,2)
......@@ -59,4 +63,40 @@ object Tools extends Logging{
val jSONObject = JSON.parseObject(jsonStr)
jSONObject
}
//根据企业编码得到企业信息
def getEnterpriseInfo(enterpriseCode:String): Option[EnterpriseInfo] = {
val vehicleInfoSql = new SqlQuery[Long,EnterpriseInfo](classOf[EnterpriseInfo],"enterpriseCode = ?")
val vehicleInfos = IgniteClient.basicEnterpriseInfo.query(vehicleInfoSql.setArgs(enterpriseCode)).getAll
if(vehicleInfos.size() == 1 ){
val vehicleInfo = vehicleInfos.get(0).getValue
Some(vehicleInfo)
}else{
None
}
}
//根据车牌号和车牌号码得到车辆信息
def getVehicleInfo(plateNum:String,plateColor:String): Option[VehicleInfo] = {
val vehicleInfoSql = new SqlQuery[Long,VehicleInfo](classOf[VehicleInfo],"PLATENUM = ? AND PLATECOLOR= ? ")
val vehicleInfos = IgniteClient.basicVehicleInfo.query(vehicleInfoSql.setArgs(plateNum,plateColor)).getAll
if(vehicleInfos.size() == 1 ){
val vehicleInfo = vehicleInfos.get(0).getValue
Some(vehicleInfo)
}else{
None
}
}
//根据报警类型和事件类型 得到 平台报警等级code 和 平台报警类型名称
def getAlarmInfo(warnType:String,eventType:String): Option[AlarmTypeInfo] = {
val alarmInfoSql = new SqlQuery[Long,AlarmTypeInfo](classOf[AlarmTypeInfo],"warningBigType = ? AND warningSmallType = ? ")
val alarmInfos = IgniteClient.basicAlarmTypeInfo.query(alarmInfoSql.setArgs(warnType,eventType)).getAll
if(alarmInfos.size() == 1 ){
val alarmInfo = alarmInfos.get(0).getValue
Some(alarmInfo)
}else{
None
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment