Commit 0b820297 by 杜发飞

1

parent 27c4f5c8
......@@ -87,6 +87,11 @@
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j2</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${ignite.version}</version>
</dependency>
......
package com.hikcreate.ignite.domain;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
public class ErrorMsg implements Serializable {
private static final long serialVersionUID = 1L;
@QuerySqlField
private String data; //数据
@QuerySqlField
private String error; //错误原因
public ErrorMsg(String data, String error) {
this.data = data;
this.error = error;
}
}
......@@ -4,7 +4,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
/**
* 每日报警处理
* 每日报警处理
*/
public class DailyAlarmDeal implements Serializable {
......@@ -17,9 +17,6 @@ public class DailyAlarmDeal implements Serializable {
private String vehicleColor; //车牌颜色
@QuerySqlField
private String useNature; //使用性质 公交
@QuerySqlField
private String supervisionId; //报警督办 ID
@QuerySqlField
......@@ -28,10 +25,9 @@ public class DailyAlarmDeal implements Serializable {
@QuerySqlField
private Boolean isDeal; //是否处理
public DailyAlarmDeal(String vehicleNo, String vehicleColor, String useNature, String supervisionId, String warnTime, Boolean isDeal) {
public DailyAlarmDeal(String vehicleNo, String vehicleColor, String supervisionId, String warnTime, Boolean isDeal) {
this.vehicleNo = vehicleNo;
this.vehicleColor = vehicleColor;
this.useNature = useNature;
this.supervisionId = supervisionId;
this.warnTime = warnTime;
this.isDeal = isDeal;
......
kafka.bootstrap.servers=39.100.49.76:9092
#kafka.bootstrap.servers=10.197.236.154:9092
kafka.zookerper.servers=10.197.236.154:2181
#,10.197.236.169:2181,10.197.236.184:2181/kafka
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
kafka.zookerper.servers=10.197.236.211:2181
application.kafka.topic=tbd-transport-data-gathering
basicsInfo.kafka.topic=transport_basedata_operation
hive.group.id=hive
ignite.group.id=ignite
basics.group.id=basics
ignite.group.id=ignite3
basics.group.id=basics2
hive.driver=org.apache.hive.jdbc.HiveDriver
hive.url=jdbc:hive2://hadoop02:10000/ods
......
package com.hikcreate.data.client
import java.util.concurrent.TimeUnit
import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain.{ErrorMsg, PrimaryKey}
import com.hikcreate.ignite.domain.alarm._
import com.hikcreate.ignite.domain.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate}
import com.hikcreate.ignite.domain.basic._
......@@ -11,6 +12,7 @@ import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
/**
......@@ -31,15 +33,6 @@ object IgniteClient {
binary.build()
}
/*********************************基础信息表****************************************************************/
//平台基础信息表
lazy val basicPlatformInfo: IgniteCache[Long, PlatformInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,PlatformInfo]()
.setSqlSchema("BasicInfo")
.setName("PlatformInfo")
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[PlatformInfo])
)
//企业基础信息表--企业接入情况
lazy val basicEnterpriseInfo: IgniteCache[Long, EnterpriseInfo] = ignite.getOrCreateCache(
new CacheConfiguration[Long,EnterpriseInfo]()
......@@ -67,7 +60,6 @@ object IgniteClient {
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[Long],classOf[AlarmTypeInfo])
)
/*********************************营运车辆监测****************************************************************/
/**
* 今日车辆在线情况 累计行驶 累计安全行驶里程
......@@ -207,8 +199,18 @@ object IgniteClient {
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
/*********************************错误数据****************************************************************/
lazy val errorMsgInfo: IgniteCache[String,ErrorMsg] = ignite.getOrCreateCache(
new CacheConfiguration[String,ErrorMsg]()
.setSqlSchema("Error")
.setName("ErrorMsg")
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
.setIndexedTypes(classOf[String],classOf[ErrorMsg])
)
def main(args: Array[String]): Unit = {
ignite.cacheNames().asScala.foreach(x=>ignite.getOrCreateCache(x).clear())
ignite.cacheNames().asScala.foreach(println(_))
ignite.close()
}
}
......@@ -15,8 +15,8 @@ trait Sparking {
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict")
//.setAppName("test")
//.setMaster("local[*]")
.setAppName("test")
.setMaster("local[*]")
def getKafkaParams(servers:String,groupId: String):Map[String,Object] = {
Map[String,Object](
......
......@@ -6,10 +6,9 @@ import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const
import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain.basic.{EnterpriseInfo, VehicleInfo, PlatformInfo}
import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ArrayBuffer
object SyncBasic extends Sparking with Logging{
......@@ -20,8 +19,7 @@ object SyncBasic extends Sparking with Logging{
val offsets = zkManager.getBeginOffset(Const.basicsInfoTopic,Const.basicsGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf,Seconds(1))
val inputStream = KafkaUtils.createDirectStream[String,String](
ssc,
val inputStream = KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Const.basicsInfoTopic,kafkaParams,offsets))
inputStream.transform{ rdd =>
......@@ -41,52 +39,50 @@ object SyncBasic extends Sparking with Logging{
def processRow(iterator:Iterator[String]): Unit = {
iterator.foreach{ x =>
val json = JSON.parseObject(x)
println(x)
val tableName = json.getString("dataType")
val operation = json.getString("operationType")
json.remove("dataType")
json.remove("operationType")
val str = JSON.toJSONString(json,SerializerFeature.WriteMapNullValue)
if(tableName.equals("baseIntoPlatformInfo")){ //平台
operation match {
case "add" =>
IgniteClient.basicPlatformInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[PlatformInfo]))
case "update" =>
IgniteClient.basicPlatformInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[PlatformInfo]))
case "delete" =>
IgniteClient.basicPlatformInfo.withKeepBinary().remove(json.getLong("id"))
}
}
if(tableName.equals("baseIntoEnterpriseInfo")){ //企业
val str = JSON.toJSONString(json,SerializerFeature.WriteMapNullValue)
operation match {
case "add" =>
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[EnterpriseInfo]))
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(json.getLong("id"),
JSON.parseObject(str,classOf[EnterpriseInfo]))
case "update" =>
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[EnterpriseInfo]))
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(json.getLong("id"),
JSON.parseObject(str,classOf[EnterpriseInfo]))
case "delete" =>
IgniteClient.basicEnterpriseInfo.withKeepBinary().remove(json.getLong("id"))
}
}
if(tableName.equals("baseIntoVehicleInfo")){ //车辆
val companyInfo = Tools.getEnterpriseInfo(json.getString("enterpriseCode"))
if(companyInfo.isDefined){
json.put("province",companyInfo.get.getProvince)
json.put("city",companyInfo.get.getCity)
json.put("area",companyInfo.get.getArea)
}
val str = JSON.toJSONString(json,SerializerFeature.WriteMapNullValue)
operation match {
case "add" =>
val companyInfo = Tools.getEnterpriseInfo(json.getString("enterpriseCode"))
if(companyInfo.isDefined){
//json.put("province",companyInfo.get.getProvince)
//json.put("city",companyInfo.get.getCity)
}
IgniteClient.basicVehicleInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
IgniteClient.basicVehicleInfo.withKeepBinary().put(json.getLong("id"),
JSON.parseObject(str,classOf[VehicleInfo]))
case "update" =>
IgniteClient.basicVehicleInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
IgniteClient.basicVehicleInfo.withKeepBinary().put(json.getLong("id"),
JSON.parseObject(str,classOf[VehicleInfo]))
case "delete" =>
IgniteClient.basicVehicleInfo.withKeepBinary().remove(json.getLong("id"))
}
}
if(tableName.equals("baseWarningType")){
if(tableName.equals("baseWarningType")){ //报警类型
val str = JSON.toJSONString(json,SerializerFeature.WriteMapNullValue)
operation match {
case "add" =>
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[AlarmTypeInfo]))
case "update" =>
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[VehicleInfo]))
IgniteClient.basicAlarmTypeInfo.withKeepBinary().put(json.getLong("id"),JSON.parseObject(str,classOf[AlarmTypeInfo]))
case "delete" =>
IgniteClient.basicAlarmTypeInfo.withKeepBinary().remove(json.getLong("id"))
}
......
......@@ -36,13 +36,14 @@ object Tools extends Logging{
lonAndLat.put("latitude",lat)
arr.add(lonAndLat)
json.put("locations",arr)
val starttime = DateTime.now()
println(json.toJSONString)
//val starttime = DateTime.now()
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString)
.header("content-type","application/json")
//.timeout(connTimeoutMs = 1000,readTimeoutMs = 1000)
.asString
val endtime = DateTime.now()
println("http请求时间:"+new Duration(starttime,endtime).getMillis)
//println("http请求时间:"+new Duration(starttime,endtime).getMillis)
val body = JSON.parseObject(response.body)
val item = body.getJSONObject("result").getJSONArray("regeoItems").getJSONObject(0)
val address = item.getString("formattedAddress")
......@@ -78,6 +79,7 @@ object Tools extends Logging{
//val endTime = DateTime.now()
//println("http请求时间:"+new Duration(startTime,endTime).getMillis/1000)
val body = JSON.parseObject(response.body)
println(response.body)
val items = body.getJSONObject("result").getJSONArray("regeoItems")
(0 until items.size()).map{ index =>
val item = items.getJSONObject(index)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment