Commit 09c081de by dufafei

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/scala/com/hikcreate/data/client/IgniteClient.scala
#	src/main/scala/com/hikcreate/data/offline/FullSync.scala
parents d4d16490 09ab3bf6
### IntelliJ IDEA ### ### IntelliJ IDEA ###
.idea/ .idea/
lib/
target/ target/
*.iml *.iml
*.ipr *.ipr
......
...@@ -132,7 +132,7 @@ ...@@ -132,7 +132,7 @@
<build> <build>
<plugins> <plugins>
<plugin> <!--<plugin>
<artifactId>maven-dependency-plugin</artifactId> <artifactId>maven-dependency-plugin</artifactId>
<executions> <executions>
<execution> <execution>
...@@ -146,12 +146,17 @@ ...@@ -146,12 +146,17 @@
<includeScope>runtime</includeScope> <includeScope>runtime</includeScope>
<excludeTransitive>false</excludeTransitive> <excludeTransitive>false</excludeTransitive>
</configuration> </configuration>
</plugin> </plugin>-->
<plugin> <plugin>
<groupId>org.scala-tools</groupId> <groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId> <artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version> <version>2.15.2</version>
<executions><execution><goals><goal>compile</goal></goals></execution> <executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
......
kafka.bootstrap.servers=39.100.49.76:9092 kafka.bootstrap.servers=39.100.49.76:9092
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
kafka.zookerper.servers=10.197.236.211:2181 kafka.zookerper.servers=10.197.236.211:2181
application.kafka.topic=tbd-transport-data-gathering application.kafka.topic=tbd-transport-data-gathering
basicsInfo.kafka.topic=transport_basedata_operation basicsInfo.kafka.topic=transport_basedata_operation
hive.group.id=hive hive.group.id=hive
ignite.group.id=ignite1 ignite.group.id=ignite
basics.group.id=basics basics.group.id=basics
hive.driver=org.apache.hive.jdbc.HiveDriver hive.driver=org.apache.hive.jdbc.HiveDriver
......
package com.hikcreate.data.client package com.hikcreate.data.client
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import com.hikcreate.ignite.domain.PrimaryKey import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain.alarm._ import com.hikcreate.ignite.domain.alarm._
import com.hikcreate.ignite.domain.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate} import com.hikcreate.ignite.domain.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate}
...@@ -12,7 +11,6 @@ import org.apache.ignite.binary.BinaryObject ...@@ -12,7 +11,6 @@ import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheMode import org.apache.ignite.cache.CacheMode
import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.{Ignite, IgniteCache, Ignition} import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
/** /**
...@@ -210,9 +208,7 @@ object IgniteClient { ...@@ -210,9 +208,7 @@ object IgniteClient {
) )
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
ignite.cacheNames().asScala.foreach(x=>ignite.getOrCreateCache(x).clear())
//ignite.cacheNames().asScala.foreach(x=>ignite.destroyCache(x))
ignite.close() ignite.close()
} }
} }
package com.hikcreate.data.common package com.hikcreate.data.common
import org.apache.log4j.{Level, Logger}
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
trait Sparking { trait Sparking {
// 屏蔽不必要的日志 ,在终端上显示需要的日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("hive.exec.dynamici.partition","true") .set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict") .set("hive.exec.dynamic.partition.mode","nonstrict")
.setAppName("test") //.setAppName("test")
.setMaster("local[*]") //.setMaster("local[*]")
def getKafkaParams(servers:String,groupId: String):Map[String,Object] = { def getKafkaParams(servers:String,groupId: String):Map[String,Object] = {
Map[String,Object]( Map[String,Object](
......
...@@ -27,8 +27,8 @@ object FullSync extends Sparking{ ...@@ -27,8 +27,8 @@ object FullSync extends Sparking{
val social_credit_code = row.getString(row.fieldIndex("social_credit_code")) val social_credit_code = row.getString(row.fieldIndex("social_credit_code"))
val into_time = row.getDate(row.fieldIndex("into_time")) val into_time = row.getDate(row.fieldIndex("into_time"))
val province = row.getString(row.fieldIndex("province")) val province = row.getString(row.fieldIndex("province"))
val city = row.getString(row.fieldIndex("city")) val city = row.getString(row.fieldIndex("city")).substring(2,4)
val area = row.getString(row.fieldIndex("area")) val area = row.getString(row.fieldIndex("area")).substring(4,6)
val into_status = row.getBoolean(row.fieldIndex("into_status")) val into_status = row.getBoolean(row.fieldIndex("into_status"))
val status = row.getString(row.fieldIndex("status")) val status = row.getString(row.fieldIndex("status"))
val address = row.getString(row.fieldIndex("address")) val address = row.getString(row.fieldIndex("address"))
...@@ -61,23 +61,14 @@ object FullSync extends Sparking{ ...@@ -61,23 +61,14 @@ object FullSync extends Sparking{
val status = row.getString(row.fieldIndex("status")) val status = row.getString(row.fieldIndex("status"))
val vehicle_brand = row.getString(row.fieldIndex("vehicle_brand")) val vehicle_brand = row.getString(row.fieldIndex("vehicle_brand"))
val enterpriseInfo = Tools.getEnterpriseInfo(enterprise_code) val enterpriseInfo = Tools.getEnterpriseInfo(enterprise_code)
/*val province = enterpriseInfo.map(x => x.getProvince).getOrElse("") val province = enterpriseInfo.map(x => x.getProvince).getOrElse("")
val city = enterpriseInfo.map(x => x.getCity).getOrElse("")*/ val city = enterpriseInfo.map(x => x.getCity).getOrElse("")
val code = enterpriseInfo.map(x => x.getArea).getOrElse("") val area = enterpriseInfo.map(x => x.getArea).getOrElse("")
if(code.length == 6) {
val province = code.substring(0,2)
val city = code.substring(2,4)
val area = code.substring(4,6)
val value = new VehicleInfo( val value = new VehicleInfo(
id,vehicle_code,plate_num,plate_color,into_time,business_scope, id,vehicle_code,plate_num,plate_color,into_time,business_scope,business_scope_detail,
business_scope_detail,indentifier,into_status,status,operating_certificate_no, indentifier,into_status,status,operating_certificate_no,class_line,use_nature,
class_line,use_nature,enterprise_code,driving_permit_no,vehicle_brand, enterprise_code,driving_permit_no,vehicle_brand, "",province,city,area)
"",province,city,area)
IgniteClient.basicVehicleInfo.withKeepBinary().put(id,value) IgniteClient.basicVehicleInfo.withKeepBinary().put(id,value)
}
} }
//报警类型配置基础表 //报警类型配置基础表
sparkSession.sqlContext.read.format("jdbc").options(Map( sparkSession.sqlContext.read.format("jdbc").options(Map(
......
...@@ -13,7 +13,6 @@ import org.apache.spark.streaming.kafka010._ ...@@ -13,7 +13,6 @@ import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object SyncIgnite extends Sparking with Logging{ object SyncIgnite extends Sparking with Logging{
...@@ -23,9 +22,9 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -23,9 +22,9 @@ object SyncIgnite extends Sparking with Logging{
val kafkaParams = getKafkaParams(Const.bootstrap, Const.igniteGroupId) val kafkaParams = getKafkaParams(Const.bootstrap, Const.igniteGroupId)
val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId) val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]() val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf, Seconds(1)) val ssc = new StreamingContext(conf, Seconds(2))
val inputStream = KafkaUtils.createDirectStream[String, String]( //ssc.sparkContext.setLogLevel("WARN")
ssc, val inputStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets)) ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets))
inputStream.transform { rdd => inputStream.transform { rdd =>
...@@ -45,7 +44,6 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -45,7 +44,6 @@ object SyncIgnite extends Sparking with Logging{
def processRow(iterator:Iterator[String]): Unit = { def processRow(iterator:Iterator[String]): Unit = {
iterator.foreach{ x => iterator.foreach{ x =>
try { try {
println(x)
val json = JSON.parseObject(x) val json = JSON.parseObject(x)
TableKey(Option(json.getString("msgId")), Option(json.getString("dataType"))) match { TableKey(Option(json.getString("msgId")), Option(json.getString("dataType"))) match {
//车辆定位消息 //车辆定位消息
...@@ -69,8 +67,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -69,8 +67,7 @@ object SyncIgnite extends Sparking with Logging{
IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue) IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue)
//累计行驶 累计安全行驶里程 今日车辆在线情况 //累计行驶 累计安全行驶里程 今日车辆在线情况
val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor)) val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor))
//val mileageValue = new DailyTravel(vehicleProvince,vehicleCity,vehicleArea,useNature,date,lat,lon,time,0D,0L) val mileageValue = new DailyTravel(vehicleProvince,vehicleCity,vehicleArea,useNature,date,lat,lon,time,
val mileageValue = new DailyTravel("33","01","02",useNature,date,lat,lon,time,
0D,0L,code._1,code._2,code._3) 0D,0L,code._1,code._2,code._3)
if(!IgniteClient.dailyTravelCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){ if(!IgniteClient.dailyTravelCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){
IgniteClient.updateMileageCache(mileageKey,lat,lon,time) IgniteClient.updateMileageCache(mileageKey,lat,lon,time)
...@@ -140,7 +137,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -140,7 +137,7 @@ object SyncIgnite extends Sparking with Logging{
val eventType = infoJson.getString("EVENT_TYPE") val eventType = infoJson.getString("EVENT_TYPE")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor) val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val alarmInfoOptional = Tools.getAlarmInfo(warnType,eventType) val alarmInfoOptional = Tools.getAlarmInfo(warnType,eventType)
if(alarmInfoOptional.isDefined){//是否关联到报警信息基础表 if(alarmInfoOptional.isDefined){ //是否关联到报警信息基础表
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("没有关联到车辆性质") val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("没有关联到车辆性质")
val alarmKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,useNature)) val alarmKey = IgniteClient.getBinaryObject(new PrimaryKey(code._1,code._2,code._3,useNature))
//累计行驶报警数 //累计行驶报警数
...@@ -171,11 +168,13 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -171,11 +168,13 @@ object SyncIgnite extends Sparking with Logging{
val timestamp = json.getLong("warnTime") * 1000 val timestamp = json.getLong("warnTime") * 1000
val time = new DateTime(timestamp) val time = new DateTime(timestamp)
val warnTime = time.toString("yyyy-MM-dd HH:mm:ss") val warnTime = time.toString("yyyy-MM-dd HH:mm:ss")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId)) val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmDeal(vehicleNo,vehicleColor,"",superVisionId,warnTime,false) val value = new DailyAlarmDeal(vehicleNo,vehicleColor,useNature,superVisionId,warnTime,false)
if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){ if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){
IgniteClient.updateDailyAlarmDealCache(key,Map( IgniteClient.updateDailyAlarmDealCache(key,Map(
"useNature"->"", "useNature"->useNature,
"superVisionId"->superVisionId, "superVisionId"->superVisionId,
"warnTime"->warnTime) "warnTime"->warnTime)
) )
...@@ -184,7 +183,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -184,7 +183,7 @@ object SyncIgnite extends Sparking with Logging{
case tableKey if tableKey == TableKey(Some("0x1400"),Some("0x1401")) => case tableKey if tableKey == TableKey(Some("0x1400"),Some("0x1401")) =>
val vehicleNo = json.getString("vehicleNo") val vehicleNo = json.getString("vehicleNo")
val vehicleColor = json.getString("vehicleColor") val vehicleColor = json.getString("vehicleColor")
val superVisionId = json.getString("superVisionId") val superVisionId = json.getString("supervisionId")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId)) val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmDeal(true) val value = new DailyAlarmDeal(true)
if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){ if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){
...@@ -194,6 +193,8 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -194,6 +193,8 @@ object SyncIgnite extends Sparking with Logging{
} }
}catch { }catch {
case e:Exception => case e:Exception =>
println(x)
println(e.getMessage)
e.printStackTrace() e.printStackTrace()
} }
} }
......
...@@ -8,8 +8,11 @@ import com.hikcreate.data.common.Logging ...@@ -8,8 +8,11 @@ import com.hikcreate.data.common.Logging
import com.hikcreate.data.constant.Const import com.hikcreate.data.constant.Const
import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo} import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.ignite.cache.query.SqlQuery import org.apache.ignite.cache.query.SqlQuery
import org.joda.time.{DateTime, Duration}
import scalaj.http.Http import scalaj.http.Http
import scala.collection.mutable.ArrayBuffer
object Tools extends Logging{ object Tools extends Logging{
def addLocation(json:JSONObject): Array[JSONObject] = { def addLocation(json:JSONObject): Array[JSONObject] = {
...@@ -25,6 +28,7 @@ object Tools extends Logging{ ...@@ -25,6 +28,7 @@ object Tools extends Logging{
} }
def getAddressAndLocationCode(lon:Double,lat:Double):(String,String) = { def getAddressAndLocationCode(lon:Double,lat:Double):(String,String) = {
try{
val json = new JSONObject() val json = new JSONObject()
val arr = new JSONArray() val arr = new JSONArray()
val lonAndLat = new JSONObject() val lonAndLat = new JSONObject()
...@@ -32,20 +36,21 @@ object Tools extends Logging{ ...@@ -32,20 +36,21 @@ object Tools extends Logging{
lonAndLat.put("latitude",lat) lonAndLat.put("latitude",lat)
arr.add(lonAndLat) arr.add(lonAndLat)
json.put("locations",arr) json.put("locations",arr)
val response = Http(Const.areaCodeAndAddressUrl) val starttime = DateTime.now()
.postData(json.toJSONString) val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString)
.header("content-type","application/json") .header("content-type","application/json")
//.charset("ISO-8859-1") //.timeout(connTimeoutMs = 1000,readTimeoutMs = 1000)
.timeout(connTimeoutMs = 8000, readTimeoutMs = 8000)
.asString .asString
if(response.code == 200){ val endtime = DateTime.now()
println("http请求时间:"+new Duration(starttime,endtime).getMillis)
val body = JSON.parseObject(response.body) val body = JSON.parseObject(response.body)
val item = body.getJSONObject("result").getJSONArray("regeoItems").getJSONObject(0) val item = body.getJSONObject("result").getJSONArray("regeoItems").getJSONObject(0)
val address = item.getString("formattedAddress") val address = item.getString("formattedAddress")
val locationCode = item.getJSONObject("addressComponent").getString("adcode") val locationCode = item.getJSONObject("addressComponent").getString("adcode")
(address,locationCode) (address,locationCode)
}else{ }catch{
throw new RuntimeException("http请求城市区编码出错") case e:Exception =>
throw new RuntimeException(e)
} }
} }
...@@ -58,6 +63,39 @@ object Tools extends Logging{ ...@@ -58,6 +63,39 @@ object Tools extends Logging{
(provinceCode,cityCode,areaCode) (provinceCode,cityCode,areaCode)
} }
def getAddressAndLocationCodes(buffer:List[(Double,Double)]): IndexedSeq[(String,String)] = {
val json = new JSONObject()
val arr = new JSONArray()
buffer.foreach{x =>
val lonAndLat = new JSONObject()
lonAndLat.put("longitude",x._1)
lonAndLat.put("latitude",x._2)
arr.add(lonAndLat)
}
json.put("locations",arr)
val startTime = DateTime.now()
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString).header("content-type","application/json").asString
val endTime = DateTime.now()
println("http请求时间:"+new Duration(startTime,endTime).getMillis)
val body = JSON.parseObject(response.body)
val items = body.getJSONObject("result").getJSONArray("regeoItems")
(0 until items.size()).map{ index =>
val item = items.getJSONObject(index)
val address = item.getString("formattedAddress")
val locationCode = item.getJSONObject("addressComponent").getString("adcode")
(address,locationCode)
}
}
def getLocationCodes(buffer:List[(Double,Double)]): IndexedSeq[(String,String,String)] = {
getAddressAndLocationCodes(buffer).map(x=>x._2).map{ locationCode =>
val provinceCode = locationCode.substring(0,2)
val cityCode = locationCode.substring(2,4)
val areaCode = locationCode.substring(4,6)
(provinceCode,cityCode,areaCode)
}
}
def getInfoContentJsonobj(infoStr:String):JSONObject = { def getInfoContentJsonobj(infoStr:String):JSONObject = {
val jsonStr=("{\""+infoStr.replace(":=","\":\"").replace(";","\",\"")+"\"}").replace(",\"\"}","}") val jsonStr=("{\""+infoStr.replace(":=","\":\"").replace(";","\",\"")+"\"}").replace(",\"\"}","}")
val jSONObject = JSON.parseObject(jsonStr) val jSONObject = JSON.parseObject(jsonStr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment