package com.hikcreate.data.sync import java.sql.Connection import java.util.Locale import com.alibaba.fastjson.{JSON, JSONObject} import com.hikcreate.data.client.DbClient import com.hikcreate.data.common.{Logging, Sparking} import com.hikcreate.data.constant.Const import com.hikcreate.data.model.TableKey import com.hikcreate.data.util.{Tools, ZkManager} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.joda.time.{DateTime, Duration} import org.joda.time.format.DateTimeFormat import scala.collection.mutable.ArrayBuffer object SyncHive extends Sparking with Logging { def main(args: Array[String]): Unit = { val zkManager = ZkManager(Const.zkKafka) val kafkaParams = getKafkaParams(Const.bootstrap,Const.hiveGroupId) val offsets = zkManager.getBeginOffset(Const.applicationTopic,Const.hiveGroupId) val offsetRanges = new ArrayBuffer[OffsetRange]() val ssc = new StreamingContext(conf,Seconds(5)) val inputStream = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Const.applicationTopic,kafkaParams,offsets)) inputStream.transform{ rdd => offsetRanges.clear() offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges:_*) rdd }.map(x=>x.value()).foreachRDD{ rdd => if(!rdd.isEmpty()){ val startTime = DateTime.now() rdd.map(JSON.parseObject).groupBy(json=>TableKey(Option(json.getString("msgId")),Option(json.getString("dataType")))).foreachPartition(x=>processRow2(x)) zkManager.saveEndOffset(offsetRanges,Const.hiveGroupId) offsetRanges.foreach{x=> println(x) } // println(offsetRanges(0)) val endTime = DateTime.now() println(DateTime.now()+"==============time token: "+new Duration(startTime,endTime).getMillis+"ms==============") } } ssc.start() ssc.awaitTermination() } def processRow2(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = { DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword) DbClient.usingDB(Const.hivePoolName) { db => x.foreach{ x=>try{ val tableKey=x._1 if(!Const.tableMap.contains(tableKey) && tableKey.msgId !=null) {//未知消息 x._2.foreach{ json => writeUnknown(db.conn,Const.unKnownTable,json) } }else if ( tableKey.msgId == null){//基础信息 x._2.foreach{json=> writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json) } }else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报 var jsonList=new ArrayBuffer[JSONObject]() val flat = x._2.flatMap(x=>Tools.addLocation(x)) val value = flat.toList.grouped(20) value.foreach{sub=> val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000))) sub.zip(codes).foreach{line=> val json = line._1 val location=line._2 json.put("districtcode",location._2) jsonList.append(json) writeHiveTable(db.conn,Const.tableMap(tableKey),json) } } }else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息 val value = x._2.toList.grouped(20) value.foreach{ sub=> val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000))) sub.zip(codes).foreach{line=> val json = line._1 val location=line._2 json.put("districtcode",location._2) writeHiveTable(db.conn,Const.tableMap(tableKey),json) } } }else if (tableKey== TableKey(Some("0x1400"),Some("0x1402"))){//报警上传 var useFul= new ArrayBuffer[JSONObject]() x._2.foreach{json=> val warnType=json.getString("warnType") if (Const.warnTypes.contains(warnType)){ useFul.append(json) } else{ writeUnknown(db.conn,Const.unKnownTable,json) } } val value = useFul.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20) value.foreach{ sub=> val codes={ Tools.getAddressAndLocationCodes(sub.map{t=> val infoStr = t.getString("infoContent") val infoJson = Tools.getInfoContentJsonobj(infoStr) (infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000) } ) } sub.zip(codes).foreach{line=> val json = line._1 val location = line._2 val infoStr = json.getString("infoContent") val infoJson = Tools.getInfoContentJsonobj(infoStr) val longitude = infoJson.get("LONGITUDE") val latitude = infoJson.get("LATITUDE") val eventtype = infoJson.get("EVENT_TYPE") json.put("longitude",infoJson.get("LONGITUDE")) json.put("latitude",infoJson.get("LATITUDE")) json.put("eventtype",infoJson.get("EVENT_TYPE")) json.put("fulladdress",location._1) json.put("districtcode",location._2) writeHiveTable(db.conn,Const.tableMap(tableKey),json) } } }else{//除了以上几种情况外的消息 x._2.foreach{json=> writeHiveTable(db.conn,Const.tableMap(tableKey),json) } } }catch { case e:Exception=> println("发生插入错误的消息"+x._2.toString()) e.printStackTrace() } } } } def processRow(iterator:Iterator[String]): Unit = { DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword) DbClient.usingDB(Const.hivePoolName){ db => iterator.foreach{ x=> try { val json = JSON.parseObject(x) val tableKey = TableKey(Option(json.getString("msgId")),Option(json.getString("dataType"))) if(!Const.tableMap.contains(tableKey)) { writeUnknown(db.conn,Const.unKnownTable,json) }else if ( !json.containsKey("msgId")){ writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json) }else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){ Tools.addLocation(json).foreach(x=>writeHiveTable(db.conn,Const.tableMap(tableKey),x)) }else{ writeHiveTable(db.conn,Const.tableMap(tableKey),json) } }catch { case e:Exception => error(x) e.printStackTrace() } } } } def writeBaseInfoHive(conn:Connection,tableName:String,json:JSONObject): Unit = { val keys = json.keySet().toArray(Array[String]()) val insetSql = s""" |insert into ods.$tableName |(${keys.map(x=>x.toLowerCase()).mkString(",")}) |values (${keys.map(_ => "?").mkString(",")}) """.stripMargin val stmt = conn.prepareStatement(insetSql) (1 to keys.length).foreach{ index => stmt.setObject(index,json.get(keys(index-1)))} try{ stmt.execute() info(s"insert date to HiveTable $tableName SUCCESS") }catch { case e:Exception=>{ println("Exception Messages==>"+e) println(s"hive table $tableName insert data failed==>"+json) } }finally { stmt.close() } } def writeUnknown(conn:Connection,tableName:String,json:JSONObject): Unit = { val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss") val day = new DateTime().toString("yyyy-MM-dd") val jsonStr=json.toJSONString val insertSql = s"insert into ods.$tableName partition( day = '$day' ) (datetime,jsondata) values('$dateTime','$jsonStr')" val insertStmt = conn.prepareStatement(insertSql) try{ insertStmt.execute() info(s"insert UnknownData date to HiveTable $tableName SUCCESS") } finally { insertStmt.close() } } def writeHiveTable(conn:Connection,tableName:String,json:JSONObject): Unit = { //定位消息,增加区县代码字段及值 // if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202"))) // || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){ // val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000) // json.put("districtcode",addressAndLocation._2) // } //报警信息 增加区县代码和事件类型,经纬度,详细地址 // if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){ // val warnType = json.getString("warnType") // if (!Const.warnTypes.contains(warnType)){ // writeUnknown(conn,Const.unKnownTable,json) // } // val infoStr = json.getString("infoContent") // val infoJson = Tools.getInfoContentJsonobj(infoStr) // json.put("longitude",infoJson.get("LONGITUDE")) // json.put("latitude",infoJson.get("LATITUDE")) // json.put("eventtype",infoJson.get("EVENT_TYPE")) // val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000) // json.put("fulladdress",addressAndLocation._1) // println(addressAndLocation._1) // json.put("districtcode",addressAndLocation._2) // } val keys = json.keySet().toArray(Array[String]()) val day = if (keys.contains("dateTime")) { DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") } else if (keys.contains("warnTime")) { new DateTime(json.getLong("warnTime")*1000).toString("yyyy-MM-dd",Locale.CHINESE) } else { DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") } val createPartitionSql =s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')" val createPartitionStmt = conn.prepareStatement(createPartitionSql) val sql = s""" |insert into $tableName |partition( partitionday = '$day' ) |(${keys.map(x=>x.toLowerCase()).mkString(",")}) |values (${keys.map(_ => "?").mkString(",")}) """.stripMargin val stmt = conn.prepareStatement(sql) (1 to keys.length).foreach { index => stmt.setObject(index, json.get(keys(index - 1))) } try { createPartitionStmt.execute() stmt.execute() info(s"insert date to HiveTable $tableName SUCCESS") }catch { case e:Exception=>{ println("Exception Messages==>"+e) println(s"hive table $tableName insert data failed==>"+json) } }finally{ createPartitionStmt.close() stmt.close() } } }