Commit d4b37168 by 杜发飞

1

parent 786c4450
...@@ -68,6 +68,11 @@ ...@@ -68,6 +68,11 @@
<version>1.1.0</version> <version>1.1.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId> <groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId> <artifactId>scalikejdbc_2.11</artifactId>
<version>3.3.0</version> <version>3.3.0</version>
...@@ -87,6 +92,13 @@ ...@@ -87,6 +92,13 @@
<artifactId>ignite-zookeeper</artifactId> <artifactId>ignite-zookeeper</artifactId>
<version>${ignite.version}</version> <version>${ignite.version}</version>
</dependency> </dependency>
<!--<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark_2.11</artifactId>
<version>2.4.0</version>
</dependency>-->
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId> <artifactId>curator-framework</artifactId>
......
...@@ -3,7 +3,7 @@ package com.hikcreate.ignite.pojo.alarm; ...@@ -3,7 +3,7 @@ package com.hikcreate.ignite.pojo.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
public class AlarmDetail implements Serializable { public class DailyAlarmDetail implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -26,27 +26,35 @@ public class AlarmDetail implements Serializable { ...@@ -26,27 +26,35 @@ public class AlarmDetail implements Serializable {
private String useNature; //使用性质 公交 private String useNature; //使用性质 公交
@QuerySqlField @QuerySqlField
private String deviceId; //终端ID
@QuerySqlField
private String warnTime; //报警时间 private String warnTime; //报警时间
@QuerySqlField @QuerySqlField
private String warnType; //报警类型 private Double lon; //经度
@QuerySqlField @QuerySqlField
private String warnInfo; //风险因素 private Double lat; //纬度
@QuerySqlField @QuerySqlField
private String filePath; //报警文件附件地址相对目录 private Long warnType; //报警类型
@QuerySqlField
private String warnInfo; //风险因素
public AlarmDetail(String province, String city, String area, String vehicleNo, String vehicleColor, String useNature, String warnTime, String warnType, String warnInfo, String filePath) { public DailyAlarmDetail(String province, String city, String area, String vehicleNo, String vehicleColor, String useNature, String deviceId, String warnTime, Double lon, Double lat, Long warnType, String warnInfo) {
this.province = province; this.province = province;
this.city = city; this.city = city;
this.area = area; this.area = area;
this.vehicleNo = vehicleNo; this.vehicleNo = vehicleNo;
this.vehicleColor = vehicleColor; this.vehicleColor = vehicleColor;
this.useNature = useNature; this.useNature = useNature;
this.deviceId = deviceId;
this.warnTime = warnTime; this.warnTime = warnTime;
this.lon = lon;
this.lat = lat;
this.warnType = warnType; this.warnType = warnType;
this.warnInfo = warnInfo; this.warnInfo = warnInfo;
this.filePath = filePath;
} }
} }
...@@ -9,7 +9,7 @@ public class AlarmInfo implements Serializable { ...@@ -9,7 +9,7 @@ public class AlarmInfo implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@QuerySqlField @QuerySqlField
private String id; //主键字段 private Long id; //主键字段
@QuerySqlField @QuerySqlField
private String warningBigType; //源数据报警大类型 private String warningBigType; //源数据报警大类型
@QuerySqlField @QuerySqlField
...@@ -17,7 +17,7 @@ public class AlarmInfo implements Serializable { ...@@ -17,7 +17,7 @@ public class AlarmInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String warningTypeCode; //平台报警类型code private String warningTypeCode; //平台报警类型code
@QuerySqlField @QuerySqlField
private String warningLevelCode; //平台报警等级code private Long warningLevelCode; //平台报警等级code
@QuerySqlField @QuerySqlField
private String warningTypeName; //平台报警类型名称 private String warningTypeName; //平台报警类型名称
@QuerySqlField @QuerySqlField
...@@ -25,7 +25,7 @@ public class AlarmInfo implements Serializable { ...@@ -25,7 +25,7 @@ public class AlarmInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss
public AlarmInfo(String id, String warningBigType, String warningSmallType, String warningTypeCode, String warningLevelCode, String warningTypeName, String status, String businessTime) { public AlarmInfo(Long id, String warningBigType, String warningSmallType, String warningTypeCode, Long warningLevelCode, String warningTypeName, String status, String businessTime) {
this.id = id; this.id = id;
this.warningBigType = warningBigType; this.warningBigType = warningBigType;
this.warningSmallType = warningSmallType; this.warningSmallType = warningSmallType;
...@@ -36,11 +36,11 @@ public class AlarmInfo implements Serializable { ...@@ -36,11 +36,11 @@ public class AlarmInfo implements Serializable {
this.businessTime = businessTime; this.businessTime = businessTime;
} }
public String getId() { public Long getId() {
return id; return id;
} }
public void setId(String id) { public void setId(Long id) {
this.id = id; this.id = id;
} }
...@@ -68,11 +68,11 @@ public class AlarmInfo implements Serializable { ...@@ -68,11 +68,11 @@ public class AlarmInfo implements Serializable {
this.warningTypeCode = warningTypeCode; this.warningTypeCode = warningTypeCode;
} }
public String getWarningLevelCode() { public Long getWarningLevelCode() {
return warningLevelCode; return warningLevelCode;
} }
public void setWarningLevelCode(String warningLevelCode) { public void setWarningLevelCode(Long warningLevelCode) {
this.warningLevelCode = warningLevelCode; this.warningLevelCode = warningLevelCode;
} }
......
...@@ -54,92 +54,4 @@ public class AttachmentInfo implements Serializable { ...@@ -54,92 +54,4 @@ public class AttachmentInfo implements Serializable {
this.filePath = filePath; this.filePath = filePath;
this.businessTime = businessTime; this.businessTime = businessTime;
} }
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getVehicleNo() {
return vehicleNo;
}
public void setVehicleNo(String vehicleNo) {
this.vehicleNo = vehicleNo;
}
public String getVehicleColor() {
return vehicleColor;
}
public void setVehicleColor(String vehicleColor) {
this.vehicleColor = vehicleColor;
}
public String getVehicleType() {
return vehicleType;
}
public void setVehicleType(String vehicleType) {
this.vehicleType = vehicleType;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getWarnTime() {
return warnTime;
}
public void setWarnTime(String warnTime) {
this.warnTime = warnTime;
}
public String getWarnSeq() {
return warnSeq;
}
public void setWarnSeq(String warnSeq) {
this.warnSeq = warnSeq;
}
public String getFileCount() {
return fileCount;
}
public void setFileCount(String fileCount) {
this.fileCount = fileCount;
}
public String getFileIndex() {
return fileIndex;
}
public void setFileIndex(String fileIndex) {
this.fileIndex = fileIndex;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
public String getBusinessTime() {
return businessTime;
}
public void setBusinessTime(String businessTime) {
this.businessTime = businessTime;
}
} }
...@@ -2,13 +2,14 @@ package com.hikcreate.ignite.pojo.basic; ...@@ -2,13 +2,14 @@ package com.hikcreate.ignite.pojo.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
import java.sql.Date;
public class CompanyInfo implements Serializable { public class CompanyInfo implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@QuerySqlField @QuerySqlField
private String id; //主键字段 private Long id; //主键字段
@QuerySqlField @QuerySqlField
private String enterpriseCode; //企业编码 private String enterpriseCode; //企业编码
@QuerySqlField @QuerySqlField
...@@ -16,7 +17,7 @@ public class CompanyInfo implements Serializable { ...@@ -16,7 +17,7 @@ public class CompanyInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String socialCreditCode; //统一社会信用代码 private String socialCreditCode; //统一社会信用代码
@QuerySqlField @QuerySqlField
private String intoTime; //接入时间yyyy-MM-dd private Date intoTime; //接入时间yyyy-MM-dd
@QuerySqlField @QuerySqlField
private String province; //省 private String province; //省
@QuerySqlField @QuerySqlField
...@@ -24,7 +25,7 @@ public class CompanyInfo implements Serializable { ...@@ -24,7 +25,7 @@ public class CompanyInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String area; //区 private String area; //区
@QuerySqlField @QuerySqlField
private String intoStatus; //是否接入 1是0否 private Boolean intoStatus; //是否接入 1是0否
@QuerySqlField @QuerySqlField
private String status; //是否启用(状态NORMAL-已启用 INITIAL-未启用) private String status; //是否启用(状态NORMAL-已启用 INITIAL-未启用)
@QuerySqlField @QuerySqlField
...@@ -32,103 +33,21 @@ public class CompanyInfo implements Serializable { ...@@ -32,103 +33,21 @@ public class CompanyInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss
public String getId() { public CompanyInfo(String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getEnterpriseCode() {
return enterpriseCode;
}
public void setEnterpriseCode(String enterpriseCode) {
this.enterpriseCode = enterpriseCode; this.enterpriseCode = enterpriseCode;
}
public String getEnterpriseName() {
return enterpriseName;
}
public void setEnterpriseName(String enterpriseName) {
this.enterpriseName = enterpriseName; this.enterpriseName = enterpriseName;
}
public String getSocialCreditCode() {
return socialCreditCode;
}
public void setSocialCreditCode(String socialCreditCode) {
this.socialCreditCode = socialCreditCode; this.socialCreditCode = socialCreditCode;
}
public String getIntoTime() {
return intoTime;
}
public void setIntoTime(String intoTime) {
this.intoTime = intoTime; this.intoTime = intoTime;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province; this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city; this.city = city;
}
public String getArea() {
return area;
}
public void setArea(String area) {
this.area = area; this.area = area;
}
public String getIntoStatus() {
return intoStatus;
}
public void setIntoStatus(String intoStatus) {
this.intoStatus = intoStatus; this.intoStatus = intoStatus;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status; this.status = status;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address; this.address = address;
}
public String getBusinessTime() {
return businessTime;
}
public void setBusinessTime(String businessTime) {
this.businessTime = businessTime; this.businessTime = businessTime;
} }
public CompanyInfo(String id, String enterpriseCode, String enterpriseName, String socialCreditCode, String intoTime, String province, String city, String area, String intoStatus, String status, String address, String businessTime) { public CompanyInfo(Long id, String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) {
this.id = id; this.id = id;
this.enterpriseCode = enterpriseCode; this.enterpriseCode = enterpriseCode;
this.enterpriseName = enterpriseName; this.enterpriseName = enterpriseName;
......
...@@ -2,6 +2,7 @@ package com.hikcreate.ignite.pojo.basic; ...@@ -2,6 +2,7 @@ package com.hikcreate.ignite.pojo.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
import java.sql.Date;
public class VehicleInfo implements Serializable{ public class VehicleInfo implements Serializable{
...@@ -16,7 +17,7 @@ public class VehicleInfo implements Serializable{ ...@@ -16,7 +17,7 @@ public class VehicleInfo implements Serializable{
@QuerySqlField @QuerySqlField
private String plateColor; //号牌颜色 private String plateColor; //号牌颜色
@QuerySqlField @QuerySqlField
private String intoTime; //接入时间yyyy-MM-dd private Date intoTime; //接入时间yyyy-MM-dd
@QuerySqlField @QuerySqlField
private String businessScope; //经营范围 private String businessScope; //经营范围
@QuerySqlField @QuerySqlField
...@@ -24,7 +25,7 @@ public class VehicleInfo implements Serializable{ ...@@ -24,7 +25,7 @@ public class VehicleInfo implements Serializable{
@QuerySqlField @QuerySqlField
private String indentifier; //车架号 private String indentifier; //车架号
@QuerySqlField @QuerySqlField
private String intoStatus; //是否接入1是0否 private Boolean intoStatus; //是否接入1是0否
@QuerySqlField @QuerySqlField
private String status; //是否启用(状态NORMAL-已启用 INITIAL-未启用) private String status; //是否启用(状态NORMAL-已启用 INITIAL-未启用)
@QuerySqlField @QuerySqlField
...@@ -49,7 +50,7 @@ public class VehicleInfo implements Serializable{ ...@@ -49,7 +50,7 @@ public class VehicleInfo implements Serializable{
@QuerySqlField @QuerySqlField
private String area; //区 private String area; //区
public VehicleInfo(String id, String vehicleCode, String plateNum, String plateColor, String intoTime, String businessScope, String businessScopeDetail, String indentifier, String intoStatus, String status, String operatingCertificateNo, String classLine, String useNature, String enterpriseCode, String drivingPermitNo, String vehicleBrand, String businessTime, String province, String city) { public VehicleInfo(String id, String vehicleCode, String plateNum, String plateColor, Date intoTime, String businessScope, String businessScopeDetail, String indentifier, Boolean intoStatus, String status, String operatingCertificateNo, String classLine, String useNature, String enterpriseCode, String drivingPermitNo, String vehicleBrand, String businessTime, String province, String city) {
this.id = id; this.id = id;
this.vehicleCode = vehicleCode; this.vehicleCode = vehicleCode;
this.plateNum = plateNum; this.plateNum = plateNum;
...@@ -71,102 +72,6 @@ public class VehicleInfo implements Serializable{ ...@@ -71,102 +72,6 @@ public class VehicleInfo implements Serializable{
this.city = city; this.city = city;
} }
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getVehicleCode() {
return vehicleCode;
}
public void setVehicleCode(String vehicleCode) {
this.vehicleCode = vehicleCode;
}
public String getPlateNum() {
return plateNum;
}
public void setPlateNum(String plateNum) {
this.plateNum = plateNum;
}
public String getPlateColor() {
return plateColor;
}
public void setPlateColor(String plateColor) {
this.plateColor = plateColor;
}
public String getIntoTime() {
return intoTime;
}
public void setIntoTime(String intoTime) {
this.intoTime = intoTime;
}
public String getBusinessScope() {
return businessScope;
}
public void setBusinessScope(String businessScope) {
this.businessScope = businessScope;
}
public String getBusinessScopeDetail() {
return businessScopeDetail;
}
public void setBusinessScopeDetail(String businessScopeDetail) {
this.businessScopeDetail = businessScopeDetail;
}
public String getIndentifier() {
return indentifier;
}
public void setIndentifier(String indentifier) {
this.indentifier = indentifier;
}
public String getIntoStatus() {
return intoStatus;
}
public void setIntoStatus(String intoStatus) {
this.intoStatus = intoStatus;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getOperatingCertificateNo() {
return operatingCertificateNo;
}
public void setOperatingCertificateNo(String operatingCertificateNo) {
this.operatingCertificateNo = operatingCertificateNo;
}
public String getClassLine() {
return classLine;
}
public void setClassLine(String classLine) {
this.classLine = classLine;
}
public String getUseNature() { public String getUseNature() {
return useNature; return useNature;
} }
...@@ -174,60 +79,4 @@ public class VehicleInfo implements Serializable{ ...@@ -174,60 +79,4 @@ public class VehicleInfo implements Serializable{
public void setUseNature(String useNature) { public void setUseNature(String useNature) {
this.useNature = useNature; this.useNature = useNature;
} }
public String getEnterpriseCode() {
return enterpriseCode;
}
public void setEnterpriseCode(String enterpriseCode) {
this.enterpriseCode = enterpriseCode;
}
public String getDrivingPermitNo() {
return drivingPermitNo;
}
public void setDrivingPermitNo(String drivingPermitNo) {
this.drivingPermitNo = drivingPermitNo;
}
public String getVehicleBrand() {
return vehicleBrand;
}
public void setVehicleBrand(String vehicleBrand) {
this.vehicleBrand = vehicleBrand;
}
public String getBusinessTime() {
return businessTime;
}
public void setBusinessTime(String businessTime) {
this.businessTime = businessTime;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getArea() {
return area;
}
public void setArea(String area) {
this.area = area;
}
} }
...@@ -26,8 +26,6 @@ public class DailyAlarmUpdate implements CacheEntryProcessor<BinaryObject,Binary ...@@ -26,8 +26,6 @@ public class DailyAlarmUpdate implements CacheEntryProcessor<BinaryObject,Binary
LocalDateTime eventTime = LocalDateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));//事件日期 LocalDateTime eventTime = LocalDateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));//事件日期
LocalDate eventDate = eventTime.toLocalDate(); LocalDate eventDate = eventTime.toLocalDate();
System.out.println(statisticalDate.isBefore(today));
if(statisticalDate.isBefore(today)){ //统计时间不是当前日,需要重置 if(statisticalDate.isBefore(today)){ //统计时间不是当前日,需要重置
builder.setField("date",today.toString("yyyy-MM-dd")); builder.setField("date",today.toString("yyyy-MM-dd"));
if(eventDate.equals(today)){ if(eventDate.equals(today)){
......
...@@ -55,25 +55,23 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina ...@@ -55,25 +55,23 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina
builder.setField("time",time); builder.setField("time",time);
mutableEntry.setValue(builder.build()); mutableEntry.setValue(builder.build());
}else{ }else{
Double differDistance = getDistance(value.<Double>field("lat"), value.<Double>field("lng"), lat, lng);
DateTime lastTime = DateTime.parse(value.<String>field("time"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));
Interval interval = new Interval(lastTime,eventTime);
Period period = interval.toPeriod(PeriodType.millis());
int differTime = period.getMillis() / 1000;
System.out.println(value.<String>field("time")); DateTime lastTime = DateTime.parse(value.<String>field("time"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println(time);
System.out.println(differTime);
builder.setField("lat",lat); if(lastTime.isBefore(eventTime)){
builder.setField("lng",lng); Double differDistance = getDistance(value.<Double>field("lat"), value.<Double>field("lng"), lat, lng);
builder.setField("time",time); Interval interval = new Interval(lastTime,eventTime);
Object travelMileage = value.<Double>field("travelMileage") == null ? 0D : value.<Double>field("travelMileage") + differDistance; Period period = interval.toPeriod(PeriodType.millis());
builder.setField("travelMileage",travelMileage); int differTime = period.getMillis() / 1000;
Long travelTime = value.<Long>field("travelTime") == null ? 0L : value.<Long>field("travelTime") + differTime; builder.setField("lat",lat);
builder.setField("travelTime",travelTime); builder.setField("lng",lng);
mutableEntry.setValue(builder.build()); builder.setField("time",time);
Object travelMileage = value.<Double>field("travelMileage") == null ? 0D : value.<Double>field("travelMileage") + differDistance;
builder.setField("travelMileage",travelMileage);
Long travelTime = value.<Long>field("travelTime") == null ? 0L : value.<Long>field("travelTime") + differTime;
builder.setField("travelTime",travelTime);
mutableEntry.setValue(builder.build());
}
} }
} }
return null; return null;
......
# Set everything to be logged to the console # Set everything to be logged to the console
log4j.rootCategory=info, console log4j.rootCategory=warn, console
log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout=org.apache.log4j.PatternLayout
......
package com.hikcreate.data
import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.Sparking
import com.hikcreate.ignite.pojo.basic.{AlarmInfo, CompanyInfo, VehicleInfo}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
*
* -DIGNITE_REST_START_ON_CLIENT=true
*/
object Etl extends Sparking{
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.OFF)
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//基本企业信息表
/*sparkSession.sqlContext.read.format("jdbc").options(Map(
"driver"->"com.mysql.jdbc.Driver",
"url"->"jdbc:mysql://10.197.236.152:3306/db_tbd",
"dbtable"->"base_into_enterprise_info",
"user"->"root",
"password"->"123456"
)).load().rdd.foreach{ row =>
val id = row.getLong(row.fieldIndex("id"))
val enterprise_code = row.getString(row.fieldIndex("enterprise_code"))
val enterprise_name = row.getString(row.fieldIndex("enterprise_name"))
val social_credit_code = row.getString(row.fieldIndex("social_credit_code"))
val into_time = row.getDate(row.fieldIndex("into_time"))
val province = row.getString(row.fieldIndex("province"))
val city = row.getString(row.fieldIndex("city"))
val area = row.getString(row.fieldIndex("area"))
val into_status = row.getBoolean(row.fieldIndex("into_status"))
val status = row.getString(row.fieldIndex("status"))
val address = row.getString(row.fieldIndex("address"))
val value = new CompanyInfo(enterprise_code,enterprise_name,social_credit_code,into_time,province,city,area,into_status,status,
address,"")
IgniteClient.basicCompanyInfo.withKeepBinary().put(id,value)
}*/
//基本车辆信息表
sparkSession.sqlContext.read.format("jdbc").options(Map(
"driver"->"com.mysql.jdbc.Driver",
"url"->"jdbc:mysql://10.197.236.152:3306/db_tbd",
"dbtable"->"base_into_vehicle_info",
"user"->"root",
"password"->"123456"
)).load().rdd.foreach{ row =>
val id = row.getLong(row.fieldIndex("id")).toString
val vehicle_code = row.getString(row.fieldIndex("vehicle_code"))
val plate_num = row.getString(row.fieldIndex("plate_num"))
val plate_color = row.getString(row.fieldIndex("plate_color"))
val into_time = row.getDate(row.fieldIndex("into_time"))
val into_status = row.getBoolean(row.fieldIndex("into_status"))
val business_scope = row.getString(row.fieldIndex("business_scope"))
val business_scope_detail = row.getString(row.fieldIndex("business_scope"))
val indentifier = row.getString(row.fieldIndex("business_scope"))
val operating_certificate_no = row.getString(row.fieldIndex("business_scope"))
val class_line = row.getString(row.fieldIndex("business_scope"))
val use_nature = row.getString(row.fieldIndex("business_scope"))
val enterprise_code = row.getString(row.fieldIndex("business_scope"))
val driving_permit_no = row.getString(row.fieldIndex("business_scope"))
val status = row.getString(row.fieldIndex("business_scope"))
val vehicle_brand = row.getString(row.fieldIndex("business_scope"))
val value = new VehicleInfo(id,vehicle_code,plate_num,plate_color,into_time,business_scope,business_scope_detail,indentifier,into_status,
status,operating_certificate_no,class_line,use_nature,enterprise_code,driving_permit_no,vehicle_brand,"","","")
IgniteClient.basicVehicleInfo.withKeepBinary().put(id,value)
}
//报警类型配置基础表
sparkSession.sqlContext.read.format("jdbc").options(Map(
"driver"->"com.mysql.jdbc.Driver",
"url"->"jdbc:mysql://10.197.236.152:3306/db_tbd",
"dbtable"->"base_warning_type",
"user"->"root",
"password"->"123456"
)).load().rdd.foreach{ row =>
val id = row.getLong(row.fieldIndex("id"))
val warning_type_name = row.getString(row.fieldIndex("warning_type_name"))
val warning_level_id = row.getLong(row.fieldIndex("warning_level_id"))
val status = row.getString(row.fieldIndex("status"))
val warning_big_type = row.getString(row.fieldIndex("warning_big_type"))
val warning_small_type = row.getString(row.fieldIndex("warning_small_type"))
val warning_type_code = row.getString(row.fieldIndex("warning_type_code"))
val value = new AlarmInfo(id,warning_big_type,warning_small_type,warning_type_code,warning_level_id,warning_type_name,status,"")
IgniteClient.basicAlarmInfo.withKeepBinary().put(id,value)
}
}
}
...@@ -15,6 +15,7 @@ import scala.collection.mutable.ArrayBuffer ...@@ -15,6 +15,7 @@ import scala.collection.mutable.ArrayBuffer
object SyncBasic extends Sparking with Logging{ object SyncBasic extends Sparking with Logging{
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
IgniteClient.active()
val zkManager = ZkManager(Const.zkKafka) val zkManager = ZkManager(Const.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap,Const.basicsGroupId) val kafkaParams = getKafkaParams(Const.bootstrap,Const.basicsGroupId)
val offsets = zkManager.getBeginOffset(Const.basicsInfoTopic,Const.basicsGroupId) val offsets = zkManager.getBeginOffset(Const.basicsInfoTopic,Const.basicsGroupId)
...@@ -30,7 +31,6 @@ object SyncBasic extends Sparking with Logging{ ...@@ -30,7 +31,6 @@ object SyncBasic extends Sparking with Logging{
rdd rdd
}.map(x=>x.value()).foreachRDD{ rdd => }.map(x=>x.value()).foreachRDD{ rdd =>
if(!rdd.isEmpty()){ if(!rdd.isEmpty()){
IgniteClient.active()
rdd.foreachPartition(iterator=>processRow(iterator)) rdd.foreachPartition(iterator=>processRow(iterator))
zkManager.saveEndOffset(offsetRanges,Const.basicsGroupId) zkManager.saveEndOffset(offsetRanges,Const.basicsGroupId)
} }
...@@ -41,7 +41,6 @@ object SyncBasic extends Sparking with Logging{ ...@@ -41,7 +41,6 @@ object SyncBasic extends Sparking with Logging{
def processRow(iterator:Iterator[String]): Unit = { def processRow(iterator:Iterator[String]): Unit = {
iterator.foreach{ x => iterator.foreach{ x =>
println(x)
val json = JSON.parseObject(x) val json = JSON.parseObject(x)
val tableName = json.getString("dataType") val tableName = json.getString("dataType")
val operation = json.getString("operationType") val operation = json.getString("operationType")
...@@ -73,8 +72,8 @@ object SyncBasic extends Sparking with Logging{ ...@@ -73,8 +72,8 @@ object SyncBasic extends Sparking with Logging{
case "add" => case "add" =>
val companyInfo = getCompanyInfo(json.getString("enterpriseCode")) val companyInfo = getCompanyInfo(json.getString("enterpriseCode"))
if(companyInfo.isDefined){ if(companyInfo.isDefined){
json.put("province",companyInfo.get.getProvince) //json.put("province",companyInfo.get.getProvince)
json.put("city",companyInfo.get.getCity) //json.put("city",companyInfo.get.getCity)
} }
IgniteClient.basicVehicleInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo])) IgniteClient.basicVehicleInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
case "update" => case "update" =>
......
...@@ -126,7 +126,7 @@ object SyncHive extends Sparking with Logging { ...@@ -126,7 +126,7 @@ object SyncHive extends Sparking with Logging {
val day = if (keys.contains("dateTime")) { val day = if (keys.contains("dateTime")) {
DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
} else if (keys.contains("warnTime")) { } else if (keys.contains("warnTime")) {
new DateTime(json.getLong("warnTime")).toString("yyyy-MM-dd",Locale.CHINESE) new DateTime(json.getLong("warnTime")*1000).toString("yyyy-MM-dd",Locale.CHINESE)
} else { } else {
DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
} }
......
package com.hikcreate.data.client package com.hikcreate.data.client
import java.util
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import com.hikcreate.ignite.pojo.PrimaryKey import com.hikcreate.ignite.pojo.PrimaryKey
import com.hikcreate.ignite.pojo.alarm.{AlarmDetail, DailyAlarm, DailyAlarmDeal} import com.hikcreate.ignite.pojo.alarm.{DailyAlarm, DailyAlarmDeal, DailyAlarmDetail}
import com.hikcreate.ignite.pojo.basic._ import com.hikcreate.ignite.pojo.basic._
import com.hikcreate.ignite.pojo.vehicles.{AlarmNumber, DailyMileage, DriverNumber, VehicleNumber} import com.hikcreate.ignite.pojo.vehicles.{AlarmNumber, DailyMileage, DriverNumber, VehicleNumber}
import com.hikcreate.ignite.processor._ import com.hikcreate.ignite.processor._
import javax.cache.expiry.{CreatedExpiryPolicy, Duration} import javax.cache.expiry.{CreatedExpiryPolicy, Duration}
import org.apache.ignite.binary.BinaryObject import org.apache.ignite.binary.{BinaryObject, BinaryTypeConfiguration}
import org.apache.ignite.cache.CacheMode import org.apache.ignite.cache.{CacheMode, QueryEntity}
import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.{Ignite, IgniteCache, Ignition} import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import scala.collection.JavaConverters._
/** /**
* binary type无法自动更改
* 闭包序列化发送到服务器只会发送一次,代码结构改变需要重启ignite服务(待解决) * 闭包序列化发送到服务器只会发送一次,代码结构改变需要重启ignite服务(待解决)
*/ */
object IgniteClient { object IgniteClient {
lazy val ignite: Ignite = Ignition.start("ignite.xml") lazy val ignite: Ignite = Ignition.start("ignite.xml")
def active(): Unit = if (!ignite.cluster.active) ignite.cluster.active(true) def active(): Unit = if(!ignite.cluster.active) ignite.cluster.active(true)
def getBinaryObject(key:Object):BinaryObject = { def getBinaryObject(key:Object):BinaryObject = {
val binary = ignite.binary().builder(key.getClass.getSimpleName) val binary = ignite.binary().builder(key.getClass.getSimpleName)
...@@ -59,7 +63,7 @@ object IgniteClient { ...@@ -59,7 +63,7 @@ object IgniteClient {
.setCacheMode(CacheMode.REPLICATED) .setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[VehicleInfo]) .setIndexedTypes(classOf[Long],classOf[VehicleInfo])
) )
//报警类型配置 //报警类型配置基础表
lazy val basicAlarmInfo: IgniteCache[Long, AlarmInfo] = ignite.getOrCreateCache( lazy val basicAlarmInfo: IgniteCache[Long, AlarmInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,AlarmInfo]() new CacheConfiguration[Long,AlarmInfo]()
.setSqlSchema("BasicInfo") .setSqlSchema("BasicInfo")
...@@ -144,11 +148,25 @@ object IgniteClient { ...@@ -144,11 +148,25 @@ object IgniteClient {
/*********************************安全行驶报警监测****************************************************************/ /*********************************安全行驶报警监测****************************************************************/
/** /**
* 今日报警数 今日区域报警数 --完成 * 附件历史记录
* 主键:省 市 区 使用性质 +1操作 * 主键:车牌号 车牌颜色 报警时间
*/
lazy val attachmentCache: IgniteCache[PrimaryKey, AttachmentInfo] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,AttachmentInfo]()
.setSqlSchema("Alarm")
.setName("AttachmentInfo")
.setIndexedTypes(classOf[PrimaryKey],classOf[AttachmentInfo])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
/**
* 今日报警数 今日区域报警数
* 主键:省 市 区 使用性质
* 数据来源:上报报警信息消息 * 数据来源:上报报警信息消息
*/ */
lazy val dailyAlarmCache: IgniteCache[PrimaryKey, DailyAlarm] = ignite.getOrCreateCache( lazy val dailyAlarmNumberCache: IgniteCache[PrimaryKey, DailyAlarm] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,DailyAlarm]() new CacheConfiguration[PrimaryKey,DailyAlarm]()
.setSqlSchema("Alarm") .setSqlSchema("Alarm")
.setName("DailyAlarm") .setName("DailyAlarm")
...@@ -158,39 +176,25 @@ object IgniteClient { ...@@ -158,39 +176,25 @@ object IgniteClient {
) )
def updateDailyAlarmCache(key:BinaryObject,time:String): Unit = { def updateDailyAlarmCache(key:BinaryObject,time:String): Unit = {
dailyAlarmCache dailyAlarmNumberCache
.withKeepBinary() .withKeepBinary()
.invoke(key,new DailyAlarmUpdate(time)) .invoke(key,new DailyAlarmUpdate(time))
} }
/** /**
* 报警详情 --完成 * 今日报警详情
* 主键:车牌号 车牌颜色 报警时间 信息 ID * 主键:车牌号 车牌颜色 报警时间 信息 ID
* 数据来源:上报报警信息消息,智能视频报警附件上传结果上报 * 数据来源:上报报警信息消息,智能视频报警附件上传结果上报
* 根据 车牌号,车牌颜色,报警时间 关联 智能视频报警附件上传结果上报 获取视频地址信息 * 根据 车牌号,车牌颜色,报警时间 关联 智能视频报警附件上传结果上报 获取视频地址信息
*/ */
lazy val alarmDetailCache: IgniteCache[PrimaryKey, AlarmDetail] = ignite.getOrCreateCache( lazy val dailyAlarmDetailCache: IgniteCache[PrimaryKey, DailyAlarmDetail] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,AlarmDetail]() new CacheConfiguration[PrimaryKey,DailyAlarmDetail]()
.setSqlSchema("Alarm") .setSqlSchema("Alarm")
.setName("AlarmDetail") .setName("DailyAlarmDetail")
.setIndexedTypes(classOf[PrimaryKey],classOf[AlarmDetail]) .setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmDetail])
.setDataRegionName("500MB_Region") .setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED) .setCacheMode(CacheMode.REPLICATED)
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24))) //.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
/**
* 附件历史记录
* 主键:车牌号 车牌颜色 报警时间
*/
lazy val attachmentCache: IgniteCache[PrimaryKey, AttachmentInfo] = ignite.getOrCreateCache(
new CacheConfiguration[PrimaryKey,AttachmentInfo]()
.setSqlSchema("Alarm")
.setName("AttachmentInfo")
.setIndexedTypes(classOf[PrimaryKey],classOf[AttachmentInfo])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
) )
/** /**
...@@ -205,7 +209,7 @@ object IgniteClient { ...@@ -205,7 +209,7 @@ object IgniteClient {
.setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmDeal]) .setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmDeal])
.setDataRegionName("500MB_Region") .setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED) .setCacheMode(CacheMode.REPLICATED)
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24))) //.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
) )
def updateDailyAlarmDealCache(key:BinaryObject): Unit = { def updateDailyAlarmDealCache(key:BinaryObject): Unit = {
...@@ -216,7 +220,16 @@ object IgniteClient { ...@@ -216,7 +220,16 @@ object IgniteClient {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
mileageCache.clear() val a = ignite.binary.`type`("com.hikcreate.ignite.pojo.alarm.AlarmDetail")
val builder = ignite.binary().builder("com.hikcreate.ignite.pojo.alarm.AlarmDetail")
val build =builder.setField("warnType",1L,classOf[Long]).build()
println( a.fieldTypeName("warnType")) //无法自动更新修改后的字段类型
//alarmDetailCache.destroy()
//basicCompanyInfo.clear()
//basicVehicleInfo.clear()
//mileageCache.clear()
//driverNumberCache.clear() //driverNumberCache.clear()
//vehicleNumberCache.clear() //vehicleNumberCache.clear()
//alarmNumberCache.clear() //alarmNumberCache.clear()
......
...@@ -50,7 +50,7 @@ object Const { ...@@ -50,7 +50,7 @@ object Const {
//智能视频动态信息交换消息类 //智能视频动态信息交换消息类
TableKey(Some("0x1D00"),Some("0x1D01"))->Config.getString("hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table"),//视频报警设备参数查询请求 TableKey(Some("0x1D00"),Some("0x1D01"))->Config.getString("hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table"),//视频报警设备参数查询请求
TableKey(Some("0x1D00"),Some("0x1d02"))->Config.getString("hive.UP_PREVENTION_EXG_MSG_REPORT_DRIVER.table"),//定时上传驾驶员身份识别信息 TableKey(Some("0x1D00"),Some("0x1d02"))->Config.getString("hive.UP_PREVENTION_EXG_MSG_REPORT_DRIVER.table"),//定时上传驾驶员身份识别信息
TableKey(Some("0x1C00"),Some("0x1C02"))->Config.getString("hive.UP_PREVENTION_MSG_FILE_COMPLETE.table"),//智能视频报警附件上传结果上报 TableKey(Some("0x1C00"),Some("0x1c02"))->Config.getString("hive.UP_PREVENTION_MSG_FILE_COMPLETE.table"),//智能视频报警附件上传结果上报
//基础信息类 //基础信息类
TableKey(None,Some("baseIntoPlatformInfo"))->Config.getString("hive.KAFKA_base_into_platform_info.table"),//接入平台 TableKey(None,Some("baseIntoPlatformInfo"))->Config.getString("hive.KAFKA_base_into_platform_info.table"),//接入平台
TableKey(None,Some("baseIntoEnterpriseInfo"))->Config.getString("hive.KAFKA_base_into_enterprise_info.table"),//企业信息 TableKey(None,Some("baseIntoEnterpriseInfo"))->Config.getString("hive.KAFKA_base_into_enterprise_info.table"),//企业信息
......
...@@ -29,7 +29,12 @@ object Tools extends Logging{ ...@@ -29,7 +29,12 @@ object Tools extends Logging{
lonAndLat.put("latitude",lat) lonAndLat.put("latitude",lat)
arr.add(lonAndLat) arr.add(lonAndLat)
json.put("locations",arr) json.put("locations",arr)
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString).header("content-type","application/json").asString val response = Http(Const.areaCodeAndAddressUrl)
.postData(json.toJSONString)
.header("content-type","application/json")
//.charset("ISO-8859-1")
.timeout(connTimeoutMs = 8000, readTimeoutMs = 8000)
.asString
if(response.code == 200){ if(response.code == 200){
val body = JSON.parseObject(response.body) val body = JSON.parseObject(response.body)
val item = body.getJSONObject("result").getJSONArray("regeoItems").getJSONObject(0) val item = body.getJSONObject("result").getJSONArray("regeoItems").getJSONObject(0)
...@@ -41,7 +46,7 @@ object Tools extends Logging{ ...@@ -41,7 +46,7 @@ object Tools extends Logging{
} }
} }
def getLocationCode(lon:Double,lat:Double): (String,String,String) ={ def getLocationCode(lon:Double,lat:Double): (String,String,String) = {
val locationCode = getAddressAndLocationCode(lon,lat)._2 val locationCode = getAddressAndLocationCode(lon,lat)._2
val provinceCode = locationCode.substring(0,2) val provinceCode = locationCode.substring(0,2)
val cityCode = locationCode.substring(2,4) val cityCode = locationCode.substring(2,4)
...@@ -49,7 +54,7 @@ object Tools extends Logging{ ...@@ -49,7 +54,7 @@ object Tools extends Logging{
(provinceCode,cityCode,areaCode) (provinceCode,cityCode,areaCode)
} }
def getInfoContentJsonobj(infoStr:String):JSONObject={ def getInfoContentJsonobj(infoStr:String):JSONObject = {
val jsonStr=("{\""+infoStr.replace(":=","\":\"").replace(";","\",\"")+"\"}").replace(",\"\"}","}") val jsonStr=("{\""+infoStr.replace(":=","\":\"").replace(";","\",\"")+"\"}").replace(",\"\"}","}")
val jSONObject = JSON.parseObject(jsonStr) val jSONObject = JSON.parseObject(jsonStr)
jSONObject jSONObject
......
import java.util
import java.util.{ArrayList, List}
import com.alibaba.fastjson.JSONObject
import com.hikcreate.data.util.Tools
import org.apache.commons.lang3.tuple.Pair
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
object Test {
def main(args: Array[String]): Unit = {
//println(Tools.getAddressAndLocationCode(116.481488,39.990464))
//println(Tools.getAddressAndLocationCode(120680600/1000000,30916400/1000000))
//val time = "2019-2-16 13:2:57"
//println(DateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toLocalDate.toString("yyyy-MM-dd"))
}
}
import com.hikcreate.data.util.Tools
object Test1 {
def main(args: Array[String]): Unit = {
val a = Tools.getAddressAndLocationCode(115.0031,27.2773)
println(a._1+"----"+a._2)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment