Commit 655c5792 by 王建成

update

parent 2ae921dd
...@@ -5,8 +5,14 @@ import com.hikcreate.data.model.TableKey ...@@ -5,8 +5,14 @@ import com.hikcreate.data.model.TableKey
object ApolloConst { object ApolloConst {
val config: Config = ConfigService.getConfig("application") val config: Config = ConfigService.getConfig("application")
var hdfsUrl: String = config.getProperty("hdfs.url",null)
var windowTime:Long = config.getLongProperty("window.time",1L)
var compactTopic:Seq[String] = config.getProperty("compact.kafka.topic",null).split(",")
val delayMax: Integer = config.getIntProperty("spark.delay.max",1) val delayMax: Integer = config.getIntProperty("spark.delay.max",1)
val recipients: Seq[String] = config.getProperty("spark.email.receiver",null).split(",") val recipients: Seq[String] = config.getProperty("spark.email.receiver",null).split(",")
......
package com.hikcreate.data.sync package com.hikcreate.data.sync
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.net.URI import java.net.URI
import java.sql.{Connection, ResultSet} import java.sql.{Connection, ResultSet}
import java.util.Locale import java.util.Locale
import com.alibaba.fastjson.{JSON, JSONObject} import com.alibaba.fastjson.{JSON, JSONObject}
import com.hikcreate.data.client.DbClient import com.hikcreate.data.client.DbClient
import com.hikcreate.data.common.{Logging, Sparking} import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const import com.hikcreate.data.constant.ApolloConst
import com.hikcreate.data.listener.BatchProcessListener
import com.hikcreate.data.model.TableKey import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{HDFSHelper, Tools, ZkManager} import com.hikcreate.data.util.{HDFSHelper, Tools, ZkManager}
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.{Seconds, StreamingContext}
...@@ -19,23 +17,23 @@ import org.joda.time.format.DateTimeFormat ...@@ -19,23 +17,23 @@ import org.joda.time.format.DateTimeFormat
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object SysncHiveBatch extends Sparking with Logging { object SysncHiveBatch extends Sparking with Logging {
val hdfs: DistributedFileSystem = new DistributedFileSystem() val hdfs: DistributedFileSystem = new DistributedFileSystem()
hdfs.initialize(URI.create(Const.hdfsUrl), new Configuration()) hdfs.initialize(URI.create(ApolloConst.hdfsUrl), new Configuration())
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(conf,Seconds(Const.windowTime)) val ssc = new StreamingContext(conf,Seconds(ApolloConst.windowTime))
val zkManager = ZkManager(Const.zkKafka) val zkManager = ZkManager(ApolloConst.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap, Const.hiveGroupId) val kafkaParams = getKafkaParams(ApolloConst.bootstrap, ApolloConst.hiveGroupId)
val offsets = zkManager.getBeginOffset(Const.compactTopic, Const.hiveGroupId) val offsets = zkManager.getBeginOffset(ApolloConst.compactTopic, ApolloConst.hiveGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]() val offsetRanges = new ArrayBuffer[OffsetRange]()
ssc.addStreamingListener(new BatchProcessListener(ssc))
val inputStream = KafkaUtils.createDirectStream[String, String]( val inputStream = KafkaUtils.createDirectStream[String, String](
ssc, ssc,
LocationStrategies.PreferConsistent, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Const.compactTopic, kafkaParams, offsets)) ConsumerStrategies.Subscribe[String, String](ApolloConst.compactTopic, kafkaParams, offsets))
inputStream.transform { rdd => inputStream.transform { rdd =>
offsetRanges.clear() offsetRanges.clear()
offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*) offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*)
...@@ -44,7 +42,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -44,7 +42,7 @@ object SysncHiveBatch extends Sparking with Logging {
if (!rdd.isEmpty()) { if (!rdd.isEmpty()) {
val startTime = DateTime.now() val startTime = DateTime.now()
rdd.map(JSON.parseObject).groupBy(json => TableKey(Option(json.getString("msgId")), Option(json.getString("dataType")))).foreachPartition(x => processRow3(x)) rdd.map(JSON.parseObject).groupBy(json => TableKey(Option(json.getString("msgId")), Option(json.getString("dataType")))).foreachPartition(x => processRow3(x))
zkManager.saveEndOffset(offsetRanges, Const.hiveGroupId) zkManager.saveEndOffset(offsetRanges, ApolloConst.hiveGroupId)
offsetRanges.foreach { x => offsetRanges.foreach { x =>
println(x) println(x)
} }
...@@ -57,21 +55,21 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -57,21 +55,21 @@ object SysncHiveBatch extends Sparking with Logging {
} }
def processRow3(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = { def processRow3(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = {
DbClient.init(Const.hivePoolName, Const.hiveDriver, Const.hiveUrl, Const.hiveUsername, Const.hivePassword) DbClient.init(ApolloConst.hivePoolName, ApolloConst.hiveDriver, ApolloConst.hiveUrl, ApolloConst.hiveUsername, ApolloConst.hivePassword)
DbClient.usingDB(Const.hivePoolName) { db => DbClient.usingDB(ApolloConst.hivePoolName) { db =>
x.foreach { x => x.foreach { x =>
try { try {
val tableKey = x._1 val tableKey = x._1
if (!Const.tableMap.contains(tableKey) && tableKey.msgId != None) { if (!ApolloConst.tableMap.contains(tableKey) && tableKey.msgId != None) {
//未知消息 //未知消息
var jsonArr = new ArrayBuffer[JSONObject]() var jsonArr = new ArrayBuffer[JSONObject]()
x._2.foreach { json => x._2.foreach { json =>
jsonArr.append(json) jsonArr.append(json)
} }
writeUnknown(db.conn, Const.unKnownTable, jsonArr) writeUnknown(db.conn, ApolloConst.unKnownTable, jsonArr)
} else if (tableKey.msgId == None && Const.tableMap.contains(tableKey)) { } else if (tableKey.msgId == None && ApolloConst.tableMap.contains(tableKey)) {
//基础信息 //基础信息
writeBaseInfoHive(db.conn, Const.tableMap(tableKey), x._2.toArray) writeBaseInfoHive(db.conn, ApolloConst.tableMap(tableKey), x._2.toArray)
} else if (tableKey == TableKey(Some("0x1200"), Some("0x1203"))) { } else if (tableKey == TableKey(Some("0x1200"), Some("0x1203"))) {
//定位补报 //定位补报
var jsonArr = new ArrayBuffer[JSONObject]() var jsonArr = new ArrayBuffer[JSONObject]()
...@@ -86,7 +84,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -86,7 +84,7 @@ object SysncHiveBatch extends Sparking with Logging {
jsonArr.append(json) jsonArr.append(json)
} }
} }
writeHdfs(db.conn, Const.tableMap(tableKey), jsonArr, "dateTime") writeHdfs(db.conn, ApolloConst.tableMap(tableKey), jsonArr, "dateTime")
} else if (tableKey == TableKey(Some("0x1200"), Some("0x1202"))) { } else if (tableKey == TableKey(Some("0x1200"), Some("0x1202"))) {
//定位消息 //定位消息
var jsonArr = new ArrayBuffer[JSONObject]() var jsonArr = new ArrayBuffer[JSONObject]()
...@@ -100,7 +98,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -100,7 +98,7 @@ object SysncHiveBatch extends Sparking with Logging {
jsonArr.append(json) jsonArr.append(json)
} }
} }
writeHdfs(db.conn, Const.tableMap(tableKey), jsonArr, "dateTime") writeHdfs(db.conn, ApolloConst.tableMap(tableKey), jsonArr, "dateTime")
} else if (tableKey == TableKey(Some("0x1400"), Some("0x1402"))) { } else if (tableKey == TableKey(Some("0x1400"), Some("0x1402"))) {
//报警上传 //报警上传
var jsonArr = new ArrayBuffer[JSONObject]() var jsonArr = new ArrayBuffer[JSONObject]()
...@@ -108,13 +106,13 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -108,13 +106,13 @@ object SysncHiveBatch extends Sparking with Logging {
var useLess = new ArrayBuffer[JSONObject]() var useLess = new ArrayBuffer[JSONObject]()
x._2.foreach { json => x._2.foreach { json =>
val warnType = json.getString("warnType") val warnType = json.getString("warnType")
if (Const.warnTypes.contains(warnType)) { if (ApolloConst.warnTypes.contains(warnType)) {
useFul.append(json) useFul.append(json)
} else { } else {
useLess.append(json) useLess.append(json)
} }
} }
val value = useFul.filter(x => Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20) val value = useFul.filter(x => ApolloConst.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
value.foreach { sub => value.foreach { sub =>
val codes = { val codes = {
Tools.getAddressAndLocationCodes(sub.map { t => Tools.getAddressAndLocationCodes(sub.map { t =>
...@@ -141,10 +139,10 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -141,10 +139,10 @@ object SysncHiveBatch extends Sparking with Logging {
} }
} }
if (jsonArr.size > 0 && jsonArr != null) { if (jsonArr.size > 0 && jsonArr != null) {
writeHdfs(db.conn, Const.tableMap(tableKey), jsonArr, "warnTime") writeHdfs(db.conn, ApolloConst.tableMap(tableKey), jsonArr, "warnTime")
} }
if (useLess.size > 0 && useLess != null) { if (useLess.size > 0 && useLess != null) {
writeUnknown(db.conn, Const.unKnownTable, useLess) writeUnknown(db.conn, ApolloConst.unKnownTable, useLess)
} }
} else { } else {
//除了以上几种情况外的消息 //除了以上几种情况外的消息
...@@ -152,7 +150,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -152,7 +150,7 @@ object SysncHiveBatch extends Sparking with Logging {
x._2.foreach { json => x._2.foreach { json =>
jsonArr.append(json) jsonArr.append(json)
} }
writeHdfs(db.conn, Const.tableMap(tableKey), jsonArr, "bussinessTime") writeHdfs(db.conn, ApolloConst.tableMap(tableKey), jsonArr, "bussinessTime")
} }
} catch { } catch {
case e: Exception => case e: Exception =>
...@@ -186,7 +184,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -186,7 +184,7 @@ object SysncHiveBatch extends Sparking with Logging {
if(clos.size!=keySet.size){ if(clos.size!=keySet.size){
val unKnownJson = new ArrayBuffer[JSONObject]() val unKnownJson = new ArrayBuffer[JSONObject]()
unKnownJson.append(json) unKnownJson.append(json)
writeUnknown(conn, Const.unKnownTable,unKnownJson) writeUnknown(conn, ApolloConst.unKnownTable,unKnownJson)
info(s"The JSON field does not correspond to the $tableName field ==> "+json.toJSONString) info(s"The JSON field does not correspond to the $tableName field ==> "+json.toJSONString)
info(s" $tableName field ==> "+clos.toList) info(s" $tableName field ==> "+clos.toList)
info(s" JSON field ==> "+keySet.toList) info(s" JSON field ==> "+keySet.toList)
...@@ -282,28 +280,6 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -282,28 +280,6 @@ object SysncHiveBatch extends Sparking with Logging {
def writeHiveTable(conn: Connection, tableName: String, json: JSONObject): Unit = { def writeHiveTable(conn: Connection, tableName: String, json: JSONObject): Unit = {
println("start insert hive : " + DateTime.now()) println("start insert hive : " + DateTime.now())
//定位消息,增加区县代码字段及值
// if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
// || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
// val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
// json.put("districtcode",addressAndLocation._2)
// }
//报警信息 增加区县代码和事件类型,经纬度,详细地址
// if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
// val warnType = json.getString("warnType")
// if (!Const.warnTypes.contains(warnType)){
// writeUnknown(conn,Const.unKnownTable,json)
// }
// val infoStr = json.getString("infoContent")
// val infoJson = Tools.getInfoContentJsonobj(infoStr)
// json.put("longitude",infoJson.get("LONGITUDE"))
// json.put("latitude",infoJson.get("LATITUDE"))
// json.put("eventtype",infoJson.get("EVENT_TYPE"))
// val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
// json.put("fulladdress",addressAndLocation._1)
// println(addressAndLocation._1)
// json.put("districtcode",addressAndLocation._2)
// }
var keys = json.keySet().toArray(Array[String]()) var keys = json.keySet().toArray(Array[String]())
val day = if (keys.contains("dateTime")) { val day = if (keys.contains("dateTime")) {
DateTime.parse(json.getString("dateTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") DateTime.parse(json.getString("dateTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
...@@ -312,12 +288,10 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -312,12 +288,10 @@ object SysncHiveBatch extends Sparking with Logging {
} else { } else {
DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
} }
// json.put("partitionday",day)
keys = json.keySet().toArray(Array[String]()) keys = json.keySet().toArray(Array[String]())
val createPartitionSql = s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')" val createPartitionSql = s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val createPartitionStmt = conn.prepareStatement(createPartitionSql) val createPartitionStmt = conn.prepareStatement(createPartitionSql)
val setStmt = conn.prepareStatement("set set hive.exec.dynamic.partition.mode=nonstrict") val setStmt = conn.prepareStatement("set set hive.exec.dynamic.partition.mode=nonstrict")
val sql = val sql =
s""" s"""
|insert into $tableName |insert into $tableName
...@@ -342,7 +316,6 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -342,7 +316,6 @@ object SysncHiveBatch extends Sparking with Logging {
setStmt.close() setStmt.close()
stmt.close() stmt.close()
} }
println("insert hive end : " + DateTime.now()) println("insert hive end : " + DateTime.now())
} }
...@@ -507,7 +480,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -507,7 +480,7 @@ object SysncHiveBatch extends Sparking with Logging {
if(clos.size!=keySet.size){ if(clos.size!=keySet.size){
val unKnownJson = new ArrayBuffer[JSONObject]() val unKnownJson = new ArrayBuffer[JSONObject]()
unKnownJson.append(json) unKnownJson.append(json)
writeUnknown(conn, Const.unKnownTable,unKnownJson) writeUnknown(conn, ApolloConst.unKnownTable,unKnownJson)
info(s"The JSON field does not correspond to the $tableName field ==> "+json.toJSONString) info(s"The JSON field does not correspond to the $tableName field ==> "+json.toJSONString)
info(s" $tableName field ==> "+clos.toList) info(s" $tableName field ==> "+clos.toList)
info(s" JSON field ==> "+keySet.toList) info(s" JSON field ==> "+keySet.toList)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment