Commit 2030ec23 by 杜发飞

1

parent b67aa811
...@@ -123,11 +123,6 @@ ...@@ -123,11 +123,6 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.54</version> <version>1.2.54</version>
......
package com.hikcreate.ignite.domain1; package com.hikcreate.ignite.domain;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
......
package com.hikcreate.ignite.domain1; package com.hikcreate.ignite.domain;
import java.io.Serializable; import java.io.Serializable;
......
package com.hikcreate.ignite.domain1.alarm; package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.alarm; package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.alarm; package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.alarm; package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.alarm; package com.hikcreate.ignite.domain.alarm;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.alarm.processor; package com.hikcreate.ignite.domain.alarm.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
......
package com.hikcreate.ignite.domain1.alarm.processor; package com.hikcreate.ignite.domain.alarm.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
......
package com.hikcreate.ignite.domain1.basic; package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.basic; package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
...@@ -33,7 +33,8 @@ public class EnterpriseInfo implements Serializable { ...@@ -33,7 +33,8 @@ public class EnterpriseInfo implements Serializable {
@QuerySqlField @QuerySqlField
private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss private String businessTime; //业务发生时间,格式yyyy-MM-dd HH:mm:ss
public EnterpriseInfo(String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) { public EnterpriseInfo(Long id, String enterpriseCode, String enterpriseName, String socialCreditCode, Date intoTime, String province, String city, String area, Boolean intoStatus, String status, String address, String businessTime) {
this.id = id;
this.enterpriseCode = enterpriseCode; this.enterpriseCode = enterpriseCode;
this.enterpriseName = enterpriseName; this.enterpriseName = enterpriseName;
this.socialCreditCode = socialCreditCode; this.socialCreditCode = socialCreditCode;
......
package com.hikcreate.ignite.domain1.basic; package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
......
package com.hikcreate.ignite.domain1.basic; package com.hikcreate.ignite.domain.basic;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
......
package com.hikcreate.ignite.domain1.vehicles; package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.vehicles; package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.vehicles; package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
......
package com.hikcreate.ignite.domain1.vehicles; package com.hikcreate.ignite.domain.vehicles;
import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable; import java.io.Serializable;
......
package com.hikcreate.ignite.domain1.vehicles.processor; package com.hikcreate.ignite.domain.vehicles.processor;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
......
package com.hikcreate.ignite.domain1.vehicles.processor; package com.hikcreate.ignite.domain.vehicles.processor;
import com.hikcreate.ignite.domain1.vehicles.DailyTravel; import com.hikcreate.ignite.domain.vehicles.DailyTravel;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheEntryProcessor;
......
package com.hikcreate.data.client package com.hikcreate.data.client
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import com.hikcreate.ignite.domain1.{ErrorMsg, PrimaryKey} import com.hikcreate.ignite.domain.{ErrorMsg, PrimaryKey}
import com.hikcreate.ignite.domain1.alarm._ import com.hikcreate.ignite.domain.alarm._
import com.hikcreate.ignite.domain1.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate} import com.hikcreate.ignite.domain.alarm.processor.{DailyAlarmDealUpdate, DailyAlarmUpdate}
import com.hikcreate.ignite.domain1.basic._ import com.hikcreate.ignite.domain.basic._
import com.hikcreate.ignite.domain1.vehicles.processor.{AlarmNumberUpdate, DailyTravelUpdate} import com.hikcreate.ignite.domain.vehicles.processor.{AlarmNumberUpdate, DailyTravelUpdate}
import com.hikcreate.ignite.domain1.vehicles._ import com.hikcreate.ignite.domain.vehicles._
import javax.cache.expiry.{CreatedExpiryPolicy, Duration} import javax.cache.expiry.{CreatedExpiryPolicy, Duration}
import org.apache.ignite.binary.BinaryObject import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheMode import org.apache.ignite.cache.CacheMode
...@@ -219,5 +219,12 @@ object IgniteClient { ...@@ -219,5 +219,12 @@ object IgniteClient {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
//dailyAlarmDealCache.clear() //dailyAlarmDealCache.clear()
val a = new DailyAlarmDetail("浙F38935","2","A21346","民用爆炸物品运输",
"2019100011","OIFUOSD132475",
"ABCDEFJH123456799","","33","01","04","1000",
"4465","015676002309",120.2884,31.6649,"2019-11-07 13:42:54","江苏省无锡市惠山区无锡惠山经济开发区无锡惠山水处理有限公司",
1L,"车道偏移报警")
dailyAlarmDetailCache.withKeepBinary().put(getBinaryObject(new PrimaryKey("11")),a)
ignite.close()
} }
} }
...@@ -33,7 +33,6 @@ class LifecycleListener(conf:SparkConf) extends SparkListener with Logging { ...@@ -33,7 +33,6 @@ class LifecycleListener(conf:SparkConf) extends SparkListener with Logging {
case e: ExceptionFailure => Some(e.toErrorString) case e: ExceptionFailure => Some(e.toErrorString)
case e: TaskFailedReason => Some(e.toErrorString) case e: TaskFailedReason => Some(e.toErrorString)
case kill:TaskKilled => Some(kill.toErrorString) case kill:TaskKilled => Some(kill.toErrorString)
case UnknownReason => Some(UnknownReason.toErrorString)
case _ => None case _ => None
} }
if (errorMessage.nonEmpty) { if (errorMessage.nonEmpty) {
......
...@@ -3,7 +3,7 @@ package com.hikcreate.data.offline ...@@ -3,7 +3,7 @@ package com.hikcreate.data.offline
import com.hikcreate.data.client.IgniteClient import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.Sparking import com.hikcreate.data.common.Sparking
import com.hikcreate.data.util.Tools import com.hikcreate.data.util.Tools
import com.hikcreate.ignite.domain1.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo} import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
object FullSync extends Sparking{ object FullSync extends Sparking{
...@@ -33,7 +33,7 @@ object FullSync extends Sparking{ ...@@ -33,7 +33,7 @@ object FullSync extends Sparking{
val into_status = row.getBoolean(row.fieldIndex("into_status")) val into_status = row.getBoolean(row.fieldIndex("into_status"))
val status = row.getString(row.fieldIndex("status")) val status = row.getString(row.fieldIndex("status"))
val address = row.getString(row.fieldIndex("address")) val address = row.getString(row.fieldIndex("address"))
val value = new EnterpriseInfo(enterprise_code,enterprise_name,social_credit_code, val value = new EnterpriseInfo(id,enterprise_code,enterprise_name,social_credit_code,
into_time,province,city,area,into_status,status, address,"") into_time,province,city,area,into_status,status, address,"")
IgniteClient.basicEnterpriseInfo.withKeepBinary().put(id,value) IgniteClient.basicEnterpriseInfo.withKeepBinary().put(id,value)
} }
......
...@@ -4,9 +4,9 @@ import com.alibaba.fastjson.JSON ...@@ -4,9 +4,9 @@ import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializerFeature import com.alibaba.fastjson.serializer.SerializerFeature
import com.hikcreate.data.client.IgniteClient import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.{Logging, Sparking} import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const import com.hikcreate.data.constant.ApolloConst
import com.hikcreate.data.util.{Tools, ZkManager} import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain1.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo} import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
...@@ -14,14 +14,14 @@ import scala.collection.mutable.ArrayBuffer ...@@ -14,14 +14,14 @@ import scala.collection.mutable.ArrayBuffer
object SyncBasic extends Sparking with Logging{ object SyncBasic extends Sparking with Logging{
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val zkManager = ZkManager(Const.zkKafka) val zkManager = ZkManager(ApolloConst.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap,Const.basicsGroupId) val kafkaParams = getKafkaParams(ApolloConst.bootstrap,ApolloConst.basicsGroupId)
val offsets = zkManager.getBeginOffset(Const.basicsInfoTopic,Const.basicsGroupId) val offsets = zkManager.getBeginOffset(ApolloConst.basicsInfoTopic,ApolloConst.basicsGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]() val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf,Seconds(1)) val ssc = new StreamingContext(conf,Seconds(1))
val inputStream = KafkaUtils.createDirectStream[String,String](ssc, val inputStream = KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Const.basicsInfoTopic,kafkaParams,offsets)) ConsumerStrategies.Subscribe[String,String](ApolloConst.basicsInfoTopic,kafkaParams,offsets))
inputStream.transform{ rdd => inputStream.transform{ rdd =>
offsetRanges.clear() offsetRanges.clear()
offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges:_*) offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges:_*)
...@@ -29,7 +29,7 @@ object SyncBasic extends Sparking with Logging{ ...@@ -29,7 +29,7 @@ object SyncBasic extends Sparking with Logging{
}.map(x=>x.value()).foreachRDD{ rdd => }.map(x=>x.value()).foreachRDD{ rdd =>
if(!rdd.isEmpty()){ if(!rdd.isEmpty()){
rdd.foreachPartition(iterator=>processRow(iterator)) rdd.foreachPartition(iterator=>processRow(iterator))
zkManager.saveEndOffset(offsetRanges,Const.basicsGroupId) zkManager.saveEndOffset(offsetRanges,ApolloConst.basicsGroupId)
} }
} }
ssc.start() ssc.start()
...@@ -39,12 +39,13 @@ object SyncBasic extends Sparking with Logging{ ...@@ -39,12 +39,13 @@ object SyncBasic extends Sparking with Logging{
def processRow(iterator:Iterator[String]): Unit = { def processRow(iterator:Iterator[String]): Unit = {
iterator.foreach{ x => iterator.foreach{ x =>
val json = JSON.parseObject(x) val json = JSON.parseObject(x)
println(x)
val tableName = json.getString("dataType") val tableName = json.getString("dataType")
val operation = json.getString("operationType") val operation = json.getString("operationType")
json.remove("dataType") json.remove("dataType")
json.remove("operationType") json.remove("operationType")
if(tableName.equals("baseIntoEnterpriseInfo")){ //企业 if(tableName.equals("baseIntoEnterpriseInfo")){ //企业
json.put("city",json.getString("city").substring(2,4))
json.put("area",json.getString("area").substring(4,6))
val str = JSON.toJSONString(json,SerializerFeature.WriteMapNullValue) val str = JSON.toJSONString(json,SerializerFeature.WriteMapNullValue)
operation match { operation match {
case "add" => case "add" =>
......
...@@ -7,9 +7,9 @@ import com.hikcreate.data.constant.ApolloConst ...@@ -7,9 +7,9 @@ import com.hikcreate.data.constant.ApolloConst
import com.hikcreate.data.listener.BatchProcessListener import com.hikcreate.data.listener.BatchProcessListener
import com.hikcreate.data.model.TableKey import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{Tools, ZkManager} import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain1.PrimaryKey import com.hikcreate.ignite.domain.PrimaryKey
import com.hikcreate.ignite.domain1.alarm._ import com.hikcreate.ignite.domain.alarm._
import com.hikcreate.ignite.domain1.vehicles.{AlarmNumber, DailyTravel, DriverNumber, VehicleNumber} import com.hikcreate.ignite.domain.vehicles.{AlarmNumber, DailyTravel, DriverNumber, VehicleNumber}
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010._
import org.joda.time.DateTime import org.joda.time.DateTime
...@@ -39,8 +39,6 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -39,8 +39,6 @@ object SyncIgnite extends Sparking with Logging{
.foreachPartition( x => .foreachPartition( x =>
x.foreach{ x => x.foreach{ x =>
try{ try{
//System.exit(1)
//x._2.foreach(x=>println(x.toJSONString))
x._1 match { x._1 match {
case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息 case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息
val value = x._2.toList.grouped(20) val value = x._2.toList.grouped(20)
......
...@@ -6,7 +6,7 @@ import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} ...@@ -6,7 +6,7 @@ import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.hikcreate.data.client.IgniteClient import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.Logging import com.hikcreate.data.common.Logging
import com.hikcreate.data.constant.Const import com.hikcreate.data.constant.Const
import com.hikcreate.ignite.domain1.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo} import com.hikcreate.ignite.domain.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.ignite.cache.query.SqlQuery import org.apache.ignite.cache.query.SqlQuery
import org.joda.time.{DateTime, Duration} import org.joda.time.{DateTime, Duration}
import scalaj.http.Http import scalaj.http.Http
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment