Commit bcc380a2 by 杜发飞

1

parent 58e16078
{"appId":"test123456789","businessTime":"2019-09-09 17:02:10","dataType":"UP_WARN_MSG_ADPT_INFO","infoContent":"报警信息内容:2019-09-09 17:02:10","infoId":"3780","msgId":"UP_WARN_MSG","vehicleColor":"4","vehicleNo":"浙AL0Z96","warnSrc":"2","warnTime":"2019-09-09 17:02:10","warnType":"9"} mavan打包
mvn clean scala:compile compile package
kafka写入数据
/home/kafka/soft/kafka_2.11/bin/kafka-console-producer.sh --broker-list 10.197.236.154:9092 --topic operating_vehicle
kafka消费数据
/home/kafka/soft/kafka_2.11/bin/kafka-console-consumer.sh --bootstrap-server 10.197.236.154:9092 --topic operating_vehicle --from-beginning
ignite使用注意事项 ignite使用注意事项
1. 1.
...@@ -18,8 +14,3 @@ WARN TcpCommunicationSpi: Message queue limit is set to 0 which may lead to pote ...@@ -18,8 +14,3 @@ WARN TcpCommunicationSpi: Message queue limit is set to 0 which may lead to pote
class org.apache.ignite.binary.BinaryInvalidTypeException: TestIgnite$1 class org.apache.ignite.binary.BinaryInvalidTypeException: TestIgnite$1
每个闭包都是一个特定类的对象。当它要被发送时会序列化成二进制的形式,通过线路发送到一个远程节点然后在那里反序列化。该远程节点在类路径中应该有该闭包类,或者开启peerClassLoading以从发送端加载该类。 每个闭包都是一个特定类的对象。当它要被发送时会序列化成二进制的形式,通过线路发送到一个远程节点然后在那里反序列化。该远程节点在类路径中应该有该闭包类,或者开启peerClassLoading以从发送端加载该类。
cdh - spark2 目录
mvn clean scala:compile compile package
/opt/cloudera-manager/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/bin/spark2-submit
...@@ -40,8 +40,8 @@ public class AttachmentInfo implements Serializable { ...@@ -40,8 +40,8 @@ public class AttachmentInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String businessTime; //业务发生时间,格式 yyyy-MM-dd HH:mm:ss private String businessTime; //业务发生时间,格式 yyyy-MM-dd HH:mm:ss
public AttachmentInfo(String appId, String vehicleNo, String vehicleColor, String vehicleType, String deviceId, String warnTime, String warnSeq,
public AttachmentInfo(String appId, String vehicleNo, String vehicleColor, String vehicleType, String deviceId, String warnTime, String warnSeq, String fileCount, String fileIndex, String filePath, String businessTime) { String fileCount, String fileIndex, String filePath, String businessTime) {
this.appId = appId; this.appId = appId;
this.vehicleNo = vehicleNo; this.vehicleNo = vehicleNo;
this.vehicleColor = vehicleColor; this.vehicleColor = vehicleColor;
......
...@@ -2,7 +2,9 @@ package com.hikcreate.ignite.domain.alarm; ...@@ -2,7 +2,9 @@ package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
/**
* 每日报警数
*/
public class DailyAlarm implements Serializable { public class DailyAlarm implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -23,14 +25,14 @@ public class DailyAlarm implements Serializable { ...@@ -23,14 +25,14 @@ public class DailyAlarm implements Serializable {
private String date; //统计日期 - 当天 private String date; //统计日期 - 当天
@QuerySqlField @QuerySqlField
private Long alarmNumber; //对应报警数 private Long number; //对应报警数
public DailyAlarm(String province, String city, String area, String useNature, String date, Long alarmNumber) { public DailyAlarm(String province, String city, String area, String useNature, String date, Long number) {
this.province = province; this.province = province;
this.city = city; this.city = city;
this.area = area; this.area = area;
this.useNature = useNature; this.useNature = useNature;
this.date = date; this.date = date;
this.alarmNumber = alarmNumber; this.number = number;
} }
} }
...@@ -3,7 +3,9 @@ package com.hikcreate.ignite.domain.alarm; ...@@ -3,7 +3,9 @@ package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
/**
* 每日报警处理处
*/
public class DailyAlarmDeal implements Serializable { public class DailyAlarmDeal implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -2,7 +2,9 @@ package com.hikcreate.ignite.domain.alarm; ...@@ -2,7 +2,9 @@ package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
/**
* 每日报警详情
*/
public class DailyAlarmDetail implements Serializable { public class DailyAlarmDetail implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -29,15 +31,15 @@ public class DailyAlarmDetail implements Serializable { ...@@ -29,15 +31,15 @@ public class DailyAlarmDetail implements Serializable {
private String deviceId; //终端ID private String deviceId; //终端ID
@QuerySqlField @QuerySqlField
private String warnTime; //报警时间
@QuerySqlField
private Double lon; //经度 private Double lon; //经度
@QuerySqlField @QuerySqlField
private Double lat; //纬度 private Double lat; //纬度
@QuerySqlField @QuerySqlField
private String warnTime; //报警时间
@QuerySqlField
private Long warnType; //报警类型 private Long warnType; //报警类型
@QuerySqlField @QuerySqlField
......
package com.hikcreate.ignite.processor; package com.hikcreate.ignite.domain.alarm.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
......
package com.hikcreate.ignite.processor; package com.hikcreate.ignite.domain.alarm.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
...@@ -20,25 +20,23 @@ public class DailyAlarmUpdate implements CacheEntryProcessor<BinaryObject,Binary ...@@ -20,25 +20,23 @@ public class DailyAlarmUpdate implements CacheEntryProcessor<BinaryObject,Binary
public Void process(MutableEntry<BinaryObject, BinaryObject> mutableEntry, Object... objects) throws EntryProcessorException { public Void process(MutableEntry<BinaryObject, BinaryObject> mutableEntry, Object... objects) throws EntryProcessorException {
BinaryObject value = mutableEntry.getValue(); BinaryObject value = mutableEntry.getValue();
BinaryObjectBuilder builder = value.toBuilder(); BinaryObjectBuilder builder = value.toBuilder();
LocalDate today = LocalDate.now();//今日日期
LocalDate today = LocalDate.now(); //今日日期 LocalDate statisticalDate = LocalDate.parse(value.<String>field("date"),DateTimeFormat.forPattern("yyyy-MM-dd"));//统计日期
LocalDate statisticalDate = LocalDate.parse(value.<String>field("date"),DateTimeFormat.forPattern("yyyy-MM-dd")); //统计日期 LocalDateTime eventTime = LocalDateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));
LocalDateTime eventTime = LocalDateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));//事件日期 LocalDate eventDate = eventTime.toLocalDate();//事件日期
LocalDate eventDate = eventTime.toLocalDate();
if(statisticalDate.isBefore(today)){ //统计时间不是当前日,需要重置 if(statisticalDate.isBefore(today)){ //统计时间不是当前日,需要重置
builder.setField("date",today.toString("yyyy-MM-dd")); builder.setField("date",today.toString("yyyy-MM-dd"));
if(eventDate.equals(today)){ if(eventDate.equals(today)){
builder.setField("alarmNumber",1L); builder.setField("number",1L);
mutableEntry.setValue(builder.build()); mutableEntry.setValue(builder.build());
}else{ }else{
builder.setField("alarmNumber",0L); builder.setField("number",0L);
mutableEntry.setValue(builder.build()); mutableEntry.setValue(builder.build());
} }
}else{ }else{
if(eventDate.equals(today)){ if(eventDate.equals(today)){
Long old = value.<Long>field("alarmNumber"); Long old = value.<Long>field("number");
builder.setField("alarmNumber",old +1); builder.setField("number",old +1);
mutableEntry.setValue(builder.build()); mutableEntry.setValue(builder.build());
} }
} }
......
...@@ -3,7 +3,7 @@ package com.hikcreate.ignite.domain.vehicles; ...@@ -3,7 +3,7 @@ package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
public class DailyMileage implements Serializable { public class DailyTravel implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -37,7 +37,17 @@ public class DailyMileage implements Serializable { ...@@ -37,7 +37,17 @@ public class DailyMileage implements Serializable {
@QuerySqlField @QuerySqlField
private Long travelTime; //当日累计行驶时间,单位秒 private Long travelTime; //当日累计行驶时间,单位秒
public DailyMileage(String province, String city, String area, String useNature, String date, Double lat, Double lng, String time, Double travelMileage, Long travelTime) { @QuerySqlField
private String gpsProvince; //车牌当前所处省
@QuerySqlField
private String gpsCity; //车牌当前所处市
@QuerySqlField
private String gpsArea; //车牌当前所处区
public DailyTravel(String province, String city, String area, String useNature, String date, Double lat, Double lng, String time,
Double travelMileage, Long travelTime, String gpsProvince, String gpsCity, String gpsArea) {
this.province = province; this.province = province;
this.city = city; this.city = city;
this.area = area; this.area = area;
...@@ -48,5 +58,8 @@ public class DailyMileage implements Serializable { ...@@ -48,5 +58,8 @@ public class DailyMileage implements Serializable {
this.time = time; this.time = time;
this.travelMileage = travelMileage; this.travelMileage = travelMileage;
this.travelTime = travelTime; this.travelTime = travelTime;
this.gpsProvince = gpsProvince;
this.gpsCity = gpsCity;
this.gpsArea = gpsArea;
} }
} }
package com.hikcreate.ignite.processor; package com.hikcreate.ignite.domain.vehicles.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
......
package com.hikcreate.ignite.processor; package com.hikcreate.ignite.domain.vehicles.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
...@@ -8,13 +8,13 @@ import org.joda.time.format.DateTimeFormat; ...@@ -8,13 +8,13 @@ import org.joda.time.format.DateTimeFormat;
import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry; import javax.cache.processor.MutableEntry;
public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,BinaryObject,Void> { public class DailyTravelUpdate implements CacheEntryProcessor<BinaryObject,BinaryObject,Void> {
private double lat; private double lat;
private double lng; private double lng;
private String time; private String time;
public DailyMileageUpdate(double lat, double lng, String time){ public DailyTravelUpdate(double lat, double lng, String time){
this.lat = lat; this.lat = lat;
this.lng = lng; this.lng = lng;
this.time = time; this.time = time;
...@@ -24,18 +24,15 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina ...@@ -24,18 +24,15 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina
public Void process(MutableEntry<BinaryObject, BinaryObject> mutableEntry, Object... objects) throws EntryProcessorException { public Void process(MutableEntry<BinaryObject, BinaryObject> mutableEntry, Object... objects) throws EntryProcessorException {
BinaryObject value = mutableEntry.getValue(); BinaryObject value = mutableEntry.getValue();
BinaryObjectBuilder builder = value.toBuilder(); BinaryObjectBuilder builder = value.toBuilder();
LocalDate today = LocalDate.now(); //今日日期 LocalDate today = LocalDate.now(); //今日日期
LocalDate statisticalDate = LocalDate.parse(value.<String>field("date"),DateTimeFormat.forPattern("yyyy-MM-dd")); //统计日期 LocalDate statisticalDate = LocalDate.parse(value.<String>field("date"),DateTimeFormat.forPattern("yyyy-MM-dd")); //统计日期
DateTime eventTime = DateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")); //事件日期 DateTime eventTime = DateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")); //事件日期
time = eventTime.toString("yyyy-MM-dd HH:mm:ss"); time = eventTime.toString("yyyy-MM-dd HH:mm:ss");
LocalDate eventDate = eventTime.toLocalDate(); LocalDate eventDate = eventTime.toLocalDate();
if(statisticalDate.isBefore(today)){ //统计时间不是当前日,需要重置 if(statisticalDate.isBefore(today)){ //统计时间不是当前日,需要重置
builder.setField("date",today.toString("yyyy-MM-dd")); builder.setField("date",today.toString("yyyy-MM-dd"));
builder.setField("travelMileage",0D); builder.setField("travelMileage",0D);
builder.setField("travelTime",0L); builder.setField("travelTime",0L);
if(eventDate.equals(today)){ //代表这条数据是今天的 if(eventDate.equals(today)){ //代表这条数据是今天的
builder.setField("lat",lat); builder.setField("lat",lat);
builder.setField("lng",lng); builder.setField("lng",lng);
...@@ -47,7 +44,6 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina ...@@ -47,7 +44,6 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina
builder.setField("time",null); builder.setField("time",null);
mutableEntry.setValue(builder.build()); mutableEntry.setValue(builder.build());
} }
}else if(eventDate.equals(today)){ }else if(eventDate.equals(today)){
if(value.<Double>field("lat") == null){ if(value.<Double>field("lat") == null){
builder.setField("lat",lat); builder.setField("lat",lat);
...@@ -55,9 +51,7 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina ...@@ -55,9 +51,7 @@ public class DailyMileageUpdate implements CacheEntryProcessor<BinaryObject,Bina
builder.setField("time",time); builder.setField("time",time);
mutableEntry.setValue(builder.build()); mutableEntry.setValue(builder.build());
}else{ }else{
DateTime lastTime = DateTime.parse(value.<String>field("time"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")); DateTime lastTime = DateTime.parse(value.<String>field("time"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));
if(lastTime.isBefore(eventTime)){ if(lastTime.isBefore(eventTime)){
Double differDistance = getDistance(value.<Double>field("lat"), value.<Double>field("lng"), lat, lng); Double differDistance = getDistance(value.<Double>field("lat"), value.<Double>field("lng"), lat, lng);
Interval interval = new Interval(lastTime,eventTime); Interval interval = new Interval(lastTime,eventTime);
......
...@@ -7,12 +7,13 @@ import com.hikcreate.data.constant.Const ...@@ -7,12 +7,13 @@ import com.hikcreate.data.constant.Const
import com.hikcreate.data.model.TableKey import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{Tools, ZkManager} import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain.PrimaryKey import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain.alarm.{AttachmentInfo, DailyAlarm, DailyAlarmDeal, DailyAlarmDetail} import com.hikcreate.ignite.domain.alarm._
import com.hikcreate.ignite.domain.vehicles.{AlarmNumber, DailyMileage, DriverNumber, VehicleNumber} import com.hikcreate.ignite.domain.vehicles._
import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object SyncIgnite extends Sparking with Logging{ object SyncIgnite extends Sparking with Logging{
...@@ -54,39 +55,26 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -54,39 +55,26 @@ object SyncIgnite extends Sparking with Logging{
val lon = json.getDouble("lon")/1000000 val lon = json.getDouble("lon")/1000000
val lat = json.getDouble("lat")/1000000 val lat = json.getDouble("lat")/1000000
val code = Tools.getLocationCode(lon,lat) val code = Tools.getLocationCode(lon,lat)
val time = json.getString("dateTime")//定位时间 val dateTime = DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))//定位时间
val date = DateTime.parse(time,DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toLocalDate.toString("yyyy-MM-dd") val time = dateTime.toString("yyyy-MM-dd HH:mm:ss")
val date = dateTime.toLocalDate.toString("yyyy-MM-dd")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor) val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val vehicleProvince = vehicleInfoOptional.map(x=>x.getProvince).getOrElse("无")
val vehicleCity = vehicleInfoOptional.map(x=>x.getCity).getOrElse("无")
val vehicleArea = vehicleInfoOptional.map(x=>x.getArea).getOrElse("无")
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无") val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
//累计行驶车辆数 //累计行驶车辆数
val vehicleNumberKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,vehicleNo,vehicleColor)) val vehicleNumberKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,vehicleNo,vehicleColor))
val vehicleNumberValue = new VehicleNumber(code._1,code._2,code._3,useNature) val vehicleNumberValue = new VehicleNumber(code._1,code._2,code._3,useNature)
IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue) IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue)
//当前在线车辆 今日车辆在线情况 累计行驶 累计安全行驶里程 //累计行驶 累计安全行驶里程 今日车辆在线情况
val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor))
val mileageValue = new DailyMileage("33","01","22",useNature,date,lat,lon,time,0D,0L)
if(!IgniteClient.mileageCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){
IgniteClient.updateMileageCache(mileageKey,lat,lon,time)
}
/*
if(vehicleInfoOptional.isDefined){
val vehicleInfo = vehicleInfoOptional.get
val vehicleProvince = vehicleInfo.getProvince
val vehicleCity = vehicleInfo.getCity
val vehicleArea = vehicleInfo.getArea
val useNature = vehicleInfo.getUseNature
val vehicleNumberKey = new PrimaryKey(code._1,code._2,code._3,vehicleNo,vehicleColor)
val vehicleNumberValue = new VehicleNumber(code._1,code._2,code._3,useNature)
IgniteClient.vehicleNumberCache.put(vehicleNumberKey,vehicleNumberValue)//累计行驶车辆数
val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor)) val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor))
if(!IgniteClient.mileageCache.withKeepBinary().containsKey(mileageKey)){ //val mileageValue = new DailyTravel(vehicleProvince,vehicleCity,vehicleArea,useNature,date,lat,lon,time,0D,0L)
val mileageValue = new DailyMileage(vehicleProvince,vehicleCity,vehicleArea,useNature,date,lat,lon,time,0D,0L) val mileageValue = new DailyTravel("33","01","02",useNature,date,lat,lon,time,
IgniteClient.mileageCache.withKeepBinary().put(mileageKey,mileageValue) 0D,0L,code._1,code._2,code._3)
}else{ if(!IgniteClient.dailyTravelCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){
IgniteClient.updateMileageCache(mileageKey,lat,lon,time) IgniteClient.updateMileageCache(mileageKey,lat,lon,time)
} }
}*/
//车辆定位消息补报 //车辆定位消息补报
case tableKey if tableKey == TableKey(Some("0x1200"),Some("0x1203")) => case tableKey if tableKey == TableKey(Some("0x1200"),Some("0x1203")) =>
Tools.addLocation(json).foreach{ x => Tools.addLocation(json).foreach{ x =>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment