Commit 30f220c7 by 杜发飞

1

parent d561bbfd
...@@ -111,12 +111,17 @@ ...@@ -111,12 +111,17 @@
<artifactId>curator-recipes</artifactId> <artifactId>curator-recipes</artifactId>
<version>${curator.version}</version> <version>${curator.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.54</version> <version>1.2.54</version>
</dependency> </dependency>
<!--<dependency> <!--<dependency>
<groupId>org.scala-lang.modules</groupId> <groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId> <artifactId>scala-xml_${scala.binary.version}</artifactId>
......
app.id=hangzhou-dataprocess
apollo.meta=http://10.197.236.187:7070/
\ No newline at end of file
...@@ -8,8 +8,8 @@ kafka.zookerper.servers=10.197.236.154:2181 ...@@ -8,8 +8,8 @@ kafka.zookerper.servers=10.197.236.154:2181
application.kafka.topic=tbd-transport-data-gathering application.kafka.topic=tbd-transport-data-gathering
basicsInfo.kafka.topic=transport_basedata_operation basicsInfo.kafka.topic=transport_basedata_operation
hive.group.id=hive hive.group.id=hive
ignite.group.id=ignite3 ignite.group.id=ignite
basics.group.id=basics2 basics.group.id=basics
hive.driver=org.apache.hive.jdbc.HiveDriver hive.driver=org.apache.hive.jdbc.HiveDriver
hive.url=jdbc:hive2://hadoop02:10000/ods hive.url=jdbc:hive2://hadoop02:10000/ods
......
package com.hikcreate.data.constant
import com.ctrip.framework.apollo.{Config, ConfigService}
import com.hikcreate.data.model.TableKey
object ApolloConst {
val config: Config = ConfigService.getConfig("application")
val bootstrap: String = config.getProperty("kafka.bootstrap.servers",null)
val zkKafka: String = config.getProperty("kafka.zookerper.servers",null)
val applicationTopic: Array[String] = config.getProperty("application.kafka.topic",null).split(",")
val basicsInfoTopic:Array[String] = config.getProperty("basicsInfo.kafka.topic","").split(",")
val hiveGroupId: String = config.getProperty("hive.group.id",null)
val igniteGroupId: String = config.getProperty("ignite.group.id",null)
val basicsGroupId:String = config.getProperty("basics.group.id",null)
val hivePoolName = "hive"
val hiveDriver: String = config.getProperty("hive.driver",null)
val hiveUrl: String = config.getProperty("hive.url",null)
val hiveUsername: String = config.getProperty("hive.username",null)
val hivePassword: String = config.getProperty("hive.password",null)
val areaCodeAndAddressUrl:String=config.getProperty("areaCodeAndAddress.Url",null)
val unKnownTable:String = config.getProperty("hive.unknown.table",null)
val warnTypes: Array[String] = config.getProperty("warnTypes",null).split(",")
val tableMap: Map[TableKey, String] = Map(
//链路管理
TableKey(Some("0x1001"),None)->config.getProperty("hive.UP_CONNECT_REQ.table",null),//主链路登录
TableKey(Some("0x1003"),None)->config.getProperty("hive.UP_DISCONNECT_REQ.table",null),//主链路注销
TableKey(Some("0x1005"),None)->config.getProperty("hive.UP_LINKTEST_REQ.table",null),//主链路链路保持
TableKey(Some("0x1007"),None)->config.getProperty("hive.UP_DISCONNECT_INFORM.table",null),//主链路断开
TableKey(Some("0x9001"),None)->config.getProperty("hive.DOWN_CONNECT.table",null),//从链路连接成功
TableKey(Some("0x9006"),None)->config.getProperty("hive.DOWN_LINKTEST.table",null),//从链路连接保持应答
//车辆动态信息交换类
TableKey(Some("0x1200"),Some("0x1201"))->config.getProperty("hive.UP_EXG_MSG_REGISTER.table",null),//上传车辆注册信息消息
TableKey(Some("0x1200"),Some("0x1202"))->config.getProperty("hive.UP_EXG_MSG_REAL_LOCATION.table",null),//实时上传车辆定位消息
TableKey(Some("0x1200"),Some("0x1203"))->config.getProperty("hive.KAFKA_UP_EXG_MSG_HISTORY_LOCATION.table",null),////车辆定位信息补报
TableKey(Some("0x1200"),Some("0x120B"))->config.getProperty("hive.UP_EXG_MSG_TAKE_EWAYBILL_ACK.table",null),//上报车辆电子运单
//车辆报警信息交互业务类
TableKey(Some("0x1400"),Some("0x1402"))->config.getProperty("hive.UP_WARN_MSG_ADPT_INFO.table",null),//上报报警信息消息
TableKey(Some("0x1400"),Some("0x1401"))->config.getProperty("hive.KAFKA_UP_WARN_MSG_URGE_TODO.table",null),//报警督办请求信息
TableKey(Some("0x9400"),Some("0x9401"))->config.getProperty("hive.KAFKA_DOWN_WARN_MSG_URGE_TODO.table",null),//报警督办应答消息
//车辆监管业务类
//TableKey(Some("0x1500"),Some("0x1504"))->config.getProperty("hive.UP_CTRL_MSG_TAKE_TRAVEL.table"),//上报车辆行驶记录
//车辆静态信息交换业务类
TableKey(Some("0x1600"),Some("0x1601"))->config.getProperty("hive.UP_BASE_MSG_VEHICLE_ADDED.table",null),//补报车辆静态信息
//智能视频动态信息交换消息类
TableKey(Some("0x1D00"),Some("0x1D01"))->config.getProperty("hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table",null),//视频报警设备参数查询请求
TableKey(Some("0x1D00"),Some("0x1d02"))->config.getProperty("hive.UP_PREVENTION_EXG_MSG_REPORT_DRIVER.table",null),//定时上传驾驶员身份识别信息
TableKey(Some("0x1C00"),Some("0x1c02"))->config.getProperty("hive.UP_PREVENTION_MSG_FILE_COMPLETE.table",null),//智能视频报警附件上传结果上报
//基础信息类
TableKey(None,Some("baseIntoPlatformInfo"))->config.getProperty("hive.KAFKA_base_into_platform_info.table",null),//接入平台
TableKey(None,Some("baseIntoEnterpriseInfo"))->config.getProperty("hive.KAFKA_base_into_enterprise_info.table",null),//企业信息
TableKey(None,Some("baseIntoVehicleInfo"))->config.getProperty("hive.KAFKA_base_into_vehicle_info.table",null),//接入车辆
TableKey(None,Some("baseDataDisplayConfig"))->config.getProperty("hive.KAFKA_base_data_display_config.table",null),//数据展示配置
TableKey(None,Some("baseWarningType"))->config.getProperty("hive.KAFKA_base_warning_type.table",null)//报警类型配置
)
}
...@@ -17,6 +17,7 @@ class LifecycleListener(conf:SparkConf) extends SparkListener with Logging { ...@@ -17,6 +17,7 @@ class LifecycleListener(conf:SparkConf) extends SparkListener with Logging {
msg.append("应用程序ID:" + applicationStart.appId.getOrElse("")) msg.append("应用程序ID:" + applicationStart.appId.getOrElse(""))
msg.append("应用程序名称:" + applicationStart.appName) msg.append("应用程序名称:" + applicationStart.appName)
msg.append("应用程序开始时间:" + new DateTime(applicationStart.time).toString("yyyy-MM-dd HH:mm:ss")) msg.append("应用程序开始时间:" + new DateTime(applicationStart.time).toString("yyyy-MM-dd HH:mm:ss"))
println(msg)
} }
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
......
...@@ -5,7 +5,6 @@ import com.hikcreate.data.common.Sparking ...@@ -5,7 +5,6 @@ import com.hikcreate.data.common.Sparking
import com.hikcreate.data.util.Tools import com.hikcreate.data.util.Tools
import com.hikcreate.ignite.domain1.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo} import com.hikcreate.ignite.domain1.basic.{AlarmTypeInfo, EnterpriseInfo, VehicleInfo}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._
object FullSync extends Sparking{ object FullSync extends Sparking{
......
...@@ -3,7 +3,8 @@ package com.hikcreate.data.sync ...@@ -3,7 +3,8 @@ package com.hikcreate.data.sync
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import com.hikcreate.data.client.IgniteClient import com.hikcreate.data.client.IgniteClient
import com.hikcreate.data.common.{Logging, Sparking} import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const import com.hikcreate.data.constant.ApolloConst
import com.hikcreate.data.listener.BatchProcessListener
import com.hikcreate.data.model.TableKey import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{Tools, ZkManager} import com.hikcreate.data.util.{Tools, ZkManager}
import com.hikcreate.ignite.domain1.PrimaryKey import com.hikcreate.ignite.domain1.PrimaryKey
...@@ -13,19 +14,21 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} ...@@ -13,19 +14,21 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010._
import org.joda.time.DateTime import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object SyncIgnite extends Sparking with Logging{ object SyncIgnite extends Sparking with Logging{
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val zkManager = ZkManager(Const.zkKafka) val zkManager = ZkManager(ApolloConst.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap, Const.igniteGroupId) val kafkaParams = getKafkaParams(ApolloConst.bootstrap, ApolloConst.igniteGroupId)
val offsets = zkManager.getBeginOffset(Const.applicationTopic, Const.igniteGroupId) val offsets = zkManager.getBeginOffset(ApolloConst.applicationTopic, ApolloConst.igniteGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]() val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf, Seconds(1)) val ssc = new StreamingContext(conf, Seconds(1))
ssc.addStreamingListener(new BatchProcessListener(ssc))
val inputStream = KafkaUtils.createDirectStream[String, String](ssc, val inputStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Const.applicationTopic,kafkaParams,offsets)) ConsumerStrategies.Subscribe[String, String](ApolloConst.applicationTopic,kafkaParams,offsets))
inputStream.transform { rdd => inputStream.transform { rdd =>
offsetRanges.clear() offsetRanges.clear()
offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*) offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*)
...@@ -37,6 +40,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -37,6 +40,7 @@ object SyncIgnite extends Sparking with Logging{
.foreachPartition( x => .foreachPartition( x =>
x.foreach{ x => x.foreach{ x =>
try{ try{
//System.exit(1)
x._2.foreach(x=>println(x.toJSONString)) x._2.foreach(x=>println(x.toJSONString))
x._1 match { x._1 match {
case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息 case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息
...@@ -126,7 +130,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -126,7 +130,7 @@ object SyncIgnite extends Sparking with Logging{
IgniteClient.attachmentCache.withKeepBinary().put(key,value) IgniteClient.attachmentCache.withKeepBinary().put(key,value)
} }
case TableKey(Some("0x1400"), Some("0x1402")) => //上报报警信息消息 case TableKey(Some("0x1400"), Some("0x1402")) => //上报报警信息消息
val value = x._2.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20) val value = x._2.filter(x=>ApolloConst.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
value.foreach{ sub=> value.foreach{ sub=>
val codes = Tools.getLocationCodes(sub.map{ x=> val codes = Tools.getLocationCodes(sub.map{ x=>
val infoStr = x.getString("infoContent") val infoStr = x.getString("infoContent")
...@@ -206,7 +210,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -206,7 +210,7 @@ object SyncIgnite extends Sparking with Logging{
} }
} }
) )
zkManager.saveEndOffset(offsetRanges,Const.igniteGroupId) zkManager.saveEndOffset(offsetRanges,ApolloConst.igniteGroupId)
} }
} }
ssc.start() ssc.start()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment