Commit 26a16293 by 杜发飞

1

parent aa830172
......@@ -132,7 +132,7 @@
<build>
<plugins>
<plugin>
<!--<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
......@@ -146,12 +146,17 @@
<includeScope>runtime</includeScope>
<excludeTransitive>false</excludeTransitive>
</configuration>
</plugin>
</plugin>-->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions><execution><goals><goal>compile</goal></goals></execution>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
......
......@@ -208,7 +208,7 @@ object IgniteClient {
)
def main(args: Array[String]): Unit = {
//ignite.cacheNames().asScala.foreach(x=>ignite.destroyCache(x))
ignite.cacheNames().asScala.foreach(x=>ignite.getOrCreateCache(x).clear())
ignite.close()
}
}
package com.hikcreate.data.common
import org.apache.log4j.{Level, Logger}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
trait Sparking {
// 屏蔽不必要的日志 ,在终端上显示需要的日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict")
.setAppName("test")
.setMaster("local[*]")
//.setAppName("test")
//.setMaster("local[*]")
def getKafkaParams(servers:String,groupId: String):Map[String,Object] = {
Map[String,Object](
......
......@@ -22,7 +22,8 @@ object SyncIgnite extends Sparking with Logging{
val kafkaParams = getKafkaParams(Const.bootstrap, Const.igniteGroupId)
val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf, Seconds(1))
val ssc = new StreamingContext(conf, Seconds(2))
//ssc.sparkContext.setLogLevel("WARN")
val inputStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets))
......@@ -43,7 +44,6 @@ object SyncIgnite extends Sparking with Logging{
def processRow(iterator:Iterator[String]): Unit = {
iterator.foreach{ x =>
try {
println(x)
val json = JSON.parseObject(x)
TableKey(Option(json.getString("msgId")), Option(json.getString("dataType"))) match {
//车辆定位消息
......@@ -137,7 +137,7 @@ object SyncIgnite extends Sparking with Logging{
val eventType = infoJson.getString("EVENT_TYPE")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val alarmInfoOptional = Tools.getAlarmInfo(warnType,eventType)
if(alarmInfoOptional.isDefined){//是否关联到报警信息基础表
if(alarmInfoOptional.isDefined){ //是否关联到报警信息基础表
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("没有关联到车辆性质")
val alarmKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,useNature))
//累计行驶报警数
......@@ -183,7 +183,7 @@ object SyncIgnite extends Sparking with Logging{
case tableKey if tableKey == TableKey(Some("0x1400"),Some("0x1401")) =>
val vehicleNo = json.getString("vehicleNo")
val vehicleColor = json.getString("vehicleColor")
val superVisionId = json.getString("superVisionId")
val superVisionId = json.getString("supervisionId")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmDeal(true)
if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){
......@@ -193,6 +193,8 @@ object SyncIgnite extends Sparking with Logging{
}
}catch {
case e:Exception =>
println(x)
println(e.getMessage)
e.printStackTrace()
}
}
......
package com.hikcreate.data.sync
import com.alibaba.fastjson.JSON
import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const
import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain.alarm.{AttachmentInfo, DailyAlarm, DailyAlarmDeal, DailyAlarmDetail}
import com.hikcreate.ignite.domain.vehicles.{AlarmNumber, DailyTravel, DriverNumber, VehicleNumber}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer
object SyncIgnite1 extends Sparking with Logging{
def main(args: Array[String]): Unit = {
val zkManager = ZkManager(Const.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap, Const.igniteGroupId)
val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf, Seconds(1))
val inputStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets))
inputStream.transform { rdd =>
offsetRanges.clear()
offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*)
rdd
}.map(x => x.value()).foreachRDD{ rdd =>
if (!rdd.isEmpty()) {
rdd.map(JSON.parseObject)
.groupBy(json=>TableKey(Option(json.getString("msgId")),Option(json.getString("dataType"))))
.foreachPartition( x=>
x.foreach{ x =>
try{
x._1 match {
case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息
val value = x._2.toList.grouped(20)
value.foreach{ sub =>
val codes = Tools.getLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{ x=>
val json = x._1
val code = x._2
val vehicleNo = json.getString("vehicleNo")
val vehicleColor = json.getString("vehicleColor")
val lon = json.getDouble("lon")/1000000
val lat = json.getDouble("lat")/1000000
val dateTime = DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"))//定位时间
val time = dateTime.toString("yyyy-MM-dd HH:mm:ss")
val date = dateTime.toLocalDate.toString("yyyy-MM-dd")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val vehicleProvince = vehicleInfoOptional.map(x=>x.getProvince).getOrElse("无")
val vehicleCity = vehicleInfoOptional.map(x=>x.getCity).getOrElse("无")
val vehicleArea = vehicleInfoOptional.map(x=>x.getArea).getOrElse("无")
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
//累计行驶车辆数
val vehicleNumberKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,vehicleNo,vehicleColor))
val vehicleNumberValue = new VehicleNumber(code._1,code._2,code._3,useNature)
IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue)
//累计行驶 累计安全行驶里程 今日车辆在线情况
val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor))
val mileageValue = new DailyTravel(vehicleProvince,vehicleCity,vehicleArea,useNature,
date, lat,lon,time, 0D,0L,code._1,code._2,code._3)
if(!IgniteClient.dailyTravelCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){
IgniteClient.updateMileageCache(mileageKey,lat,lon,time)
}
}
}
case TableKey(Some("0x1200"),Some("0x1203")) => //车辆定位消息补报
val flat = x._2.flatMap(x=>Tools.addLocation(x))
val value = flat.toList.grouped(20)
value.foreach{ sub =>
val codes = Tools.getLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{ x=>
val json = x._1
val code = x._2
val vehicleNo = json.getString("vehicleNo")
val vehicleColor = json.getString("vehicleColor")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
//累计行驶车辆数
val vehicleNumberKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,vehicleNo,vehicleColor))
val vehicleNumberValue = new VehicleNumber(code._1,code._2,code._3,useNature)
IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue)
}
}
case tableKey if tableKey == TableKey(Some("0x1D00"),Some("0x1d02")) => //定时上传驾驶员身份识别信息
x._2.foreach{ x=>
val vehicleNo = x.getString("vehicleNo")
val vehicleColor = x.getString("vehicleColor")
val driverId = x.getString("driverId")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val vehicleProvince = vehicleInfoOptional.map(x=>x.getProvince).getOrElse("无")
val vehicleCity = vehicleInfoOptional.map(x=>x.getCity).getOrElse("无")
val vehicleArea = vehicleInfoOptional.map(x=>x.getArea).getOrElse("无")
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
val key = IgniteClient.getBinaryObject(new PrimaryKey(driverId))
val driverNumber = new DriverNumber(vehicleProvince,vehicleCity,vehicleArea,useNature)
IgniteClient.driverNumberCache.withKeepBinary().put(key,driverNumber)
}
case TableKey(Some("0x1C00"),Some("0x1c02")) => //智能视频报警附件上传结果上报
x._2.foreach{ x=>
x.remove("msgId")
x.remove("dataType")
val timestamp = x.getLong("warnTime") * 1000
val dateTime = new DateTime(timestamp)
val warnTime = dateTime.toString("yyyy-MM-dd HH:mm:ss")
x.put("warnTime",warnTime)
val key = IgniteClient.getBinaryObject(
new PrimaryKey(x.getString("vehicleNo"),
x.getString("vehicleColor"),
x.getString("deviceId"),
x.getString("warnTime"),
x.getString("fileIndex"))
)
val value = JSON.parseObject(x.toJSONString,classOf[AttachmentInfo])
IgniteClient.attachmentCache.withKeepBinary().put(key,value)
}
case TableKey(Some("0x1400"), Some("0x1402")) => //上报报警信息消息
val value = x._2.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
value.foreach{ sub=>
val codes = Tools.getLocationCodes(sub
.map{ x=>
val infoStr = x.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
})
sub.zip(codes).foreach{ x=>
val json = x._1
val code = x._2
val vehicleNo = json.getString("vehicleNo")
val vehicleColor = json.getString("vehicleColor")
val warnType = json.getString("warnType")
val timestamp = json.getLong("warnTime") * 1000
val time = new DateTime(timestamp)
val warnTime = time.toString("yyyy-MM-dd HH:mm:ss")
val date = time.toString("yyyy-MM-dd")
val infoStr = json.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
val eventType = infoJson.getString("EVENT_TYPE")
val deviceId = infoJson.getString("DEVICE_ID")
val lon = infoJson.getDouble("LONGITUDE")/1000000
val lat = infoJson.getDouble("LATITUDE")/1000000
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val alarmInfoOptional = Tools.getAlarmInfo(warnType,eventType)
if(alarmInfoOptional.isDefined){ //是否关联到报警信息基础表
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("没有关联到车辆性质")
val alarmKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,useNature))
//累计行驶报警数
val alarmNumber = new AlarmNumber(code._1,code._2,code._3,useNature,1L)
if(!IgniteClient.alarmNumberCache.withKeepBinary().putIfAbsent(alarmKey,alarmNumber)){
IgniteClient.updateAlarmNumberCache(alarmKey)
}
//今日报警数
val dailyAlarmNumber = new DailyAlarm(code._1,code._2,code._3,useNature,date,1L)
if(!IgniteClient.dailyAlarmNumberCache.withKeepBinary().putIfAbsent(alarmKey,dailyAlarmNumber)){
IgniteClient.updateDailyAlarmNumberCache(alarmKey,warnTime)
}
//今日报警详情
val alarmType = alarmInfoOptional.get.getWarningLevelCode
val alarmContent = alarmInfoOptional.get.getWarningTypeName
val alarmDetailKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,warnTime))
val dailyAlarmDetail = new DailyAlarmDetail(code._1,code._2,code._3,vehicleNo,vehicleColor,useNature,
deviceId,warnTime,lon,lat,alarmType,alarmContent)
IgniteClient.dailyAlarmDetailCache.withKeepBinary().put(alarmDetailKey,dailyAlarmDetail)
}
}
}
//报警督办请求信息
case TableKey(Some("0x9400"),Some("0x9401")) =>
x._2.foreach{ x=>
val vehicleNo = x.getString("vehicleNo")
val vehicleColor = x.getString("vehicleColor")
val superVisionId = x.getString("superVisionId")
val timestamp = x.getLong("warnTime") * 1000
val warnTime = new DateTime(timestamp).toString("yyyy-MM-dd HH:mm:ss")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmDeal(vehicleNo,vehicleColor,useNature,superVisionId,warnTime,false)
if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){
IgniteClient.updateDailyAlarmDealCache(key,Map(
"useNature"->useNature,
"superVisionId"->superVisionId,
"warnTime"->warnTime)
)
}
}
//报警督办应答消息
case tableKey if tableKey == TableKey(Some("0x1400"),Some("0x1401")) =>
x._2.foreach{ x=>
val vehicleNo = x.getString("vehicleNo")
val vehicleColor = x.getString("vehicleColor")
val superVisionId = x.getString("supervisionId")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmDeal(true)
if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){
IgniteClient.updateDailyAlarmDealCache(key,Map("isDeal"-> (true:java.lang.Boolean)))
}
}
case _ =>
}
}catch {
case e:Exception => e.printStackTrace()
}
}
)
zkManager.saveEndOffset(offsetRanges,Const.igniteGroupId)
}
}
ssc.start()
ssc.awaitTermination()
}
}
......@@ -8,8 +8,11 @@ import com.hikcreate.data.common.Logging
import com.hikcreate.data.constant.Const
import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.ignite.cache.query.SqlQuery
import org.joda.time.{DateTime, Duration}
import scalaj.http.Http
import scala.collection.mutable.ArrayBuffer
object Tools extends Logging{
def addLocation(json:JSONObject): Array[JSONObject] = {
......@@ -25,25 +28,29 @@ object Tools extends Logging{
}
def getAddressAndLocationCode(lon:Double,lat:Double):(String,String) = {
val json = new JSONObject()
val arr = new JSONArray()
val lonAndLat = new JSONObject()
lonAndLat.put("longitude",lon)
lonAndLat.put("latitude",lat)
arr.add(lonAndLat)
json.put("locations",arr)
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString)
.header("content-type","application/json")//.charset("ISO-8859-1")
.timeout(connTimeoutMs = 8000, readTimeoutMs = 8000)
.asString
if(response.code == 200){
try{
val json = new JSONObject()
val arr = new JSONArray()
val lonAndLat = new JSONObject()
lonAndLat.put("longitude",lon)
lonAndLat.put("latitude",lat)
arr.add(lonAndLat)
json.put("locations",arr)
val starttime = DateTime.now()
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString)
.header("content-type","application/json")
//.timeout(connTimeoutMs = 1000,readTimeoutMs = 1000)
.asString
val endtime = DateTime.now()
println("http请求时间:"+new Duration(starttime,endtime).getMillis)
val body = JSON.parseObject(response.body)
val item = body.getJSONObject("result").getJSONArray("regeoItems").getJSONObject(0)
val address = item.getString("formattedAddress")
val locationCode = item.getJSONObject("addressComponent").getString("adcode")
(address,locationCode)
}else{
throw new RuntimeException("http请求城市区编码出错")
}catch{
case e:Exception =>
throw new RuntimeException(e)
}
}
......@@ -56,6 +63,39 @@ object Tools extends Logging{
(provinceCode,cityCode,areaCode)
}
def getAddressAndLocationCodes(buffer:List[(Double,Double)]): IndexedSeq[(String,String)] = {
val json = new JSONObject()
val arr = new JSONArray()
buffer.foreach{x =>
val lonAndLat = new JSONObject()
lonAndLat.put("longitude",x._1)
lonAndLat.put("latitude",x._2)
arr.add(lonAndLat)
}
json.put("locations",arr)
val startTime = DateTime.now()
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString).header("content-type","application/json").asString
val endTime = DateTime.now()
println("http请求时间:"+new Duration(startTime,endTime).getMillis)
val body = JSON.parseObject(response.body)
val items = body.getJSONObject("result").getJSONArray("regeoItems")
(0 until items.size()).map{ index =>
val item = items.getJSONObject(index)
val address = item.getString("formattedAddress")
val locationCode = item.getJSONObject("addressComponent").getString("adcode")
(address,locationCode)
}
}
def getLocationCodes(buffer:List[(Double,Double)]): IndexedSeq[(String,String,String)] = {
getAddressAndLocationCodes(buffer).map(x=>x._2).map{ locationCode =>
val provinceCode = locationCode.substring(0,2)
val cityCode = locationCode.substring(2,4)
val areaCode = locationCode.substring(4,6)
(provinceCode,cityCode,areaCode)
}
}
def getInfoContentJsonobj(infoStr:String):JSONObject = {
val jsonStr=("{\""+infoStr.replace(":=","\":\"").replace(";","\",\"")+"\"}").replace(",\"\"}","}")
val jSONObject = JSON.parseObject(jsonStr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment