Commit aa830172 by 杜发飞

1

parent dac9c849
kafka.bootstrap.servers=39.100.49.76:9092 kafka.bootstrap.servers=39.100.49.76:9092
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
kafka.zookerper.servers=10.197.236.211:2181 kafka.zookerper.servers=10.197.236.211:2181
application.kafka.topic=tbd-transport-data-gathering application.kafka.topic=tbd-transport-data-gathering
basicsInfo.kafka.topic=transport_basedata_operation basicsInfo.kafka.topic=transport_basedata_operation
hive.group.id=hive hive.group.id=hive
ignite.group.id=ignite1 ignite.group.id=ignite
basics.group.id=basics basics.group.id=basics
hive.driver=org.apache.hive.jdbc.HiveDriver hive.driver=org.apache.hive.jdbc.HiveDriver
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
<property name="zkConnectionString" value="10.197.236.211:2181,10.197.236.212:2181,10.197.236.213:2181"/> <property name="zkConnectionString" value="10.197.236.211:2181,10.197.236.212:2181,10.197.236.213:2181"/>
<property name="sessionTimeout" value="30000"/> <property name="sessionTimeout" value="30000"/>
<property name="zkRootPath" value="/apacheIgnite"/> <property name="zkRootPath" value="/apacheIgnite"/>
<!--<property name="zkRootPath" value="/Ignite"/>-->
<property name="joinTimeout" value="0"/> <property name="joinTimeout" value="0"/>
</bean> </bean>
</property> </property>
......
package com.hikcreate.data.client package com.hikcreate.data.client
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import com.hikcreate.ignite.domain.PrimaryKey import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain.alarm._ import com.hikcreate.ignite.domain.alarm._
import com.hikcreate.ignite.domain.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate} import com.hikcreate.ignite.domain.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate}
...@@ -12,7 +11,6 @@ import org.apache.ignite.binary.BinaryObject ...@@ -12,7 +11,6 @@ import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheMode import org.apache.ignite.cache.CacheMode
import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.{Ignite, IgniteCache, Ignition} import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
/** /**
...@@ -210,9 +208,7 @@ object IgniteClient { ...@@ -210,9 +208,7 @@ object IgniteClient {
) )
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
//ignite.cacheNames().asScala.foreach(x=>ignite.destroyCache(x))
dailyAlarmNumberCache.clear()
ignite.close() ignite.close()
} }
} }
...@@ -27,8 +27,8 @@ object FullSync extends Sparking{ ...@@ -27,8 +27,8 @@ object FullSync extends Sparking{
val social_credit_code = row.getString(row.fieldIndex("social_credit_code")) val social_credit_code = row.getString(row.fieldIndex("social_credit_code"))
val into_time = row.getDate(row.fieldIndex("into_time")) val into_time = row.getDate(row.fieldIndex("into_time"))
val province = row.getString(row.fieldIndex("province")) val province = row.getString(row.fieldIndex("province"))
val city = row.getString(row.fieldIndex("city")) val city = row.getString(row.fieldIndex("city")).substring(2,4)
val area = row.getString(row.fieldIndex("area")) val area = row.getString(row.fieldIndex("area")).substring(4,6)
val into_status = row.getBoolean(row.fieldIndex("into_status")) val into_status = row.getBoolean(row.fieldIndex("into_status"))
val status = row.getString(row.fieldIndex("status")) val status = row.getString(row.fieldIndex("status"))
val address = row.getString(row.fieldIndex("address")) val address = row.getString(row.fieldIndex("address"))
...@@ -65,10 +65,9 @@ object FullSync extends Sparking{ ...@@ -65,10 +65,9 @@ object FullSync extends Sparking{
val city = enterpriseInfo.map(x => x.getCity).getOrElse("") val city = enterpriseInfo.map(x => x.getCity).getOrElse("")
val area = enterpriseInfo.map(x => x.getArea).getOrElse("") val area = enterpriseInfo.map(x => x.getArea).getOrElse("")
val value = new VehicleInfo( val value = new VehicleInfo(
id,vehicle_code,plate_num,plate_color,into_time,business_scope, id,vehicle_code,plate_num,plate_color,into_time,business_scope,business_scope_detail,
business_scope_detail,indentifier,into_status,status,operating_certificate_no, indentifier,into_status,status,operating_certificate_no,class_line,use_nature,
class_line,use_nature,enterprise_code,driving_permit_no,vehicle_brand, enterprise_code,driving_permit_no,vehicle_brand, "",province,city,area)
"",province,city,area)
IgniteClient.basicVehicleInfo.withKeepBinary().put(id,value) IgniteClient.basicVehicleInfo.withKeepBinary().put(id,value)
} }
//报警类型配置基础表 //报警类型配置基础表
......
...@@ -13,7 +13,6 @@ import org.apache.spark.streaming.kafka010._ ...@@ -13,7 +13,6 @@ import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object SyncIgnite extends Sparking with Logging{ object SyncIgnite extends Sparking with Logging{
...@@ -24,8 +23,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -24,8 +23,7 @@ object SyncIgnite extends Sparking with Logging{
val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId) val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]() val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf, Seconds(1)) val ssc = new StreamingContext(conf, Seconds(1))
val inputStream = KafkaUtils.createDirectStream[String, String]( val inputStream = KafkaUtils.createDirectStream[String, String](ssc,
ssc,
LocationStrategies.PreferConsistent, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets)) ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets))
inputStream.transform { rdd => inputStream.transform { rdd =>
...@@ -69,8 +67,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -69,8 +67,7 @@ object SyncIgnite extends Sparking with Logging{
IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue) IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue)
//累计行驶 累计安全行驶里程 今日车辆在线情况 //累计行驶 累计安全行驶里程 今日车辆在线情况
val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor)) val mileageKey = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor))
//val mileageValue = new DailyTravel(vehicleProvince,vehicleCity,vehicleArea,useNature,date,lat,lon,time,0D,0L) val mileageValue = new DailyTravel(vehicleProvince,vehicleCity,vehicleArea,useNature,date,lat,lon,time,
val mileageValue = new DailyTravel("33","01","02",useNature,date,lat,lon,time,
0D,0L,code._1,code._2,code._3) 0D,0L,code._1,code._2,code._3)
if(!IgniteClient.dailyTravelCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){ if(!IgniteClient.dailyTravelCache.withKeepBinary().putIfAbsent(mileageKey,mileageValue)){
IgniteClient.updateMileageCache(mileageKey,lat,lon,time) IgniteClient.updateMileageCache(mileageKey,lat,lon,time)
...@@ -171,11 +168,13 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -171,11 +168,13 @@ object SyncIgnite extends Sparking with Logging{
val timestamp = json.getLong("warnTime") * 1000 val timestamp = json.getLong("warnTime") * 1000
val time = new DateTime(timestamp) val time = new DateTime(timestamp)
val warnTime = time.toString("yyyy-MM-dd HH:mm:ss") val warnTime = time.toString("yyyy-MM-dd HH:mm:ss")
val vehicleInfoOptional = Tools.getVehicleInfo(vehicleNo,vehicleColor)
val useNature = vehicleInfoOptional.map(x=>x.getUseNature).getOrElse("无")
val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId)) val key = IgniteClient.getBinaryObject(new PrimaryKey(vehicleNo,vehicleColor,superVisionId))
val value = new DailyAlarmDeal(vehicleNo,vehicleColor,"",superVisionId,warnTime,false) val value = new DailyAlarmDeal(vehicleNo,vehicleColor,useNature,superVisionId,warnTime,false)
if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){ if(!IgniteClient.dailyAlarmDealCache.withKeepBinary().putIfAbsent(key,value)){
IgniteClient.updateDailyAlarmDealCache(key,Map( IgniteClient.updateDailyAlarmDealCache(key,Map(
"useNature"->"", "useNature"->useNature,
"superVisionId"->superVisionId, "superVisionId"->superVisionId,
"warnTime"->warnTime) "warnTime"->warnTime)
) )
......
...@@ -32,10 +32,8 @@ object Tools extends Logging{ ...@@ -32,10 +32,8 @@ object Tools extends Logging{
lonAndLat.put("latitude",lat) lonAndLat.put("latitude",lat)
arr.add(lonAndLat) arr.add(lonAndLat)
json.put("locations",arr) json.put("locations",arr)
val response = Http(Const.areaCodeAndAddressUrl) val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString)
.postData(json.toJSONString) .header("content-type","application/json")//.charset("ISO-8859-1")
.header("content-type","application/json")
//.charset("ISO-8859-1")
.timeout(connTimeoutMs = 8000, readTimeoutMs = 8000) .timeout(connTimeoutMs = 8000, readTimeoutMs = 8000)
.asString .asString
if(response.code == 200){ if(response.code == 200){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment