Commit f86c96dc by 王建成

update

parent 26a16293
......@@ -6,7 +6,7 @@
<groupId>groupId</groupId>
<artifactId>operating-vehicle</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.2-SNAPSHOT</version>
<repositories>
<repository>
......
kafka.bootstrap.servers=39.100.49.76:9092
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
kafka.zookerper.servers=10.197.236.211:2181
window.time=5
application.kafka.topic=tbd-transport-data-gathering
basicsInfo.kafka.topic=transport_basedata_operation
hive.group.id=hive
......
......@@ -15,8 +15,8 @@ trait Sparking {
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict")
//.setAppName("test")
//.setMaster("local[*]")
.setAppName("syshive_local")
.setMaster("local[*]")
def getKafkaParams(servers:String,groupId: String):Map[String,Object] = {
Map[String,Object](
......
......@@ -6,7 +6,7 @@ import com.hikcreate.data.util.Config
object Const {
Config.load("conf.properties")
val windowTime:Int=Config.getInt("window.time")
val bootstrap: String = Config.getString("kafka.bootstrap.servers")
val zkKafka: String = Config.getString("kafka.zookerper.servers")
......
......@@ -11,19 +11,21 @@ import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{Tools, ZkManager}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.{DateTime, Duration}
import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer
object SyncHive extends Sparking with Logging {
def main(args: Array[String]): Unit = {
val zkManager = ZkManager(Const.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap,Const.hiveGroupId)
val offsets = zkManager.getBeginOffset(Const.applicationTopic,Const.hiveGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]()
val ssc = new StreamingContext(conf,Seconds(1))
val ssc = new StreamingContext(conf,Seconds(5))
val inputStream = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
......@@ -34,14 +36,111 @@ object SyncHive extends Sparking with Logging {
rdd
}.map(x=>x.value()).foreachRDD{ rdd =>
if(!rdd.isEmpty()){
rdd.foreachPartition(iterator=>processRow(iterator))
val startTime = DateTime.now()
rdd.map(JSON.parseObject).groupBy(json=>TableKey(Option(json.getString("msgId")),Option(json.getString("dataType")))).foreachPartition(x=>processRow2(x))
zkManager.saveEndOffset(offsetRanges,Const.hiveGroupId)
offsetRanges.foreach{x=>
println(x)
}
// println(offsetRanges(0))
val endTime = DateTime.now()
println(DateTime.now()+"==============time token: "+new Duration(startTime,endTime).getMillis+"ms==============")
}
}
ssc.start()
ssc.awaitTermination()
}
def processRow2(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = {
DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
DbClient.usingDB(Const.hivePoolName) { db =>
x.foreach{
x=>try{
val tableKey=x._1
if(!Const.tableMap.contains(tableKey) && tableKey.msgId !=null) {//未知消息
x._2.foreach{ json =>
writeUnknown(db.conn,Const.unKnownTable,json)
}
}else if ( tableKey.msgId == null){//基础信息
x._2.foreach{json=>
writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
}
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报
var jsonList=new ArrayBuffer[JSONObject]()
val flat = x._2.flatMap(x=>Tools.addLocation(x))
val value = flat.toList.grouped(20)
value.foreach{sub=>
val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{line=>
val json = line._1
val location=line._2
json.put("districtcode",location._2)
jsonList.append(json)
writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息
val value = x._2.toList.grouped(20)
value.foreach{ sub=>
val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{line=>
val json = line._1
val location=line._2
json.put("districtcode",location._2)
writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}else if (tableKey== TableKey(Some("0x1400"),Some("0x1402"))){//报警上传
var useFul= new ArrayBuffer[JSONObject]()
x._2.foreach{json=>
val warnType=json.getString("warnType")
if (Const.warnTypes.contains(warnType)){
useFul.append(json)
} else{
writeUnknown(db.conn,Const.unKnownTable,json)
}
}
val value = useFul.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
value.foreach{ sub=>
val codes={
Tools.getAddressAndLocationCodes(sub.map{t=>
val infoStr = t.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
}
)
}
sub.zip(codes).foreach{line=>
val json = line._1
val location = line._2
val infoStr = json.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
val longitude = infoJson.get("LONGITUDE")
val latitude = infoJson.get("LATITUDE")
val eventtype = infoJson.get("EVENT_TYPE")
json.put("longitude",infoJson.get("LONGITUDE"))
json.put("latitude",infoJson.get("LATITUDE"))
json.put("eventtype",infoJson.get("EVENT_TYPE"))
json.put("fulladdress",location._1)
json.put("districtcode",location._2)
writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}else{//除了以上几种情况外的消息
x._2.foreach{json=>
writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}catch {
case e:Exception=>
println("发生插入错误的消息"+x._2.toString())
e.printStackTrace()
}
}
}
}
def processRow(iterator:Iterator[String]): Unit = {
DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
DbClient.usingDB(Const.hivePoolName){ db =>
......@@ -79,8 +178,13 @@ object SyncHive extends Sparking with Logging {
(1 to keys.length).foreach{ index => stmt.setObject(index,json.get(keys(index-1)))}
try{
stmt.execute()
println(s"insert date to HiveTable $tableName SUCCESS")
} finally {
info(s"insert date to HiveTable $tableName SUCCESS")
}catch {
case e:Exception=>{
println("Exception Messages==>"+e)
println(s"hive table $tableName insert data failed==>"+json)
}
}finally {
stmt.close()
}
}
......@@ -101,27 +205,27 @@ object SyncHive extends Sparking with Logging {
def writeHiveTable(conn:Connection,tableName:String,json:JSONObject): Unit = {
//定位消息,增加区县代码字段及值
if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
|| tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
json.put("districtcode",addressAndLocation._2)
}
// if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
// || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
// val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
// json.put("districtcode",addressAndLocation._2)
// }
//报警信息 增加区县代码和事件类型,经纬度,详细地址
if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
val warnType = json.getString("warnType")
if (!Const.warnTypes.contains(warnType)){
writeUnknown(conn,Const.unKnownTable,json)
}
val infoStr = json.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
json.put("longitude",infoJson.get("LONGITUDE"))
json.put("latitude",infoJson.get("LATITUDE"))
json.put("eventtype",infoJson.get("EVENT_TYPE"))
val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
json.put("fulladdress",addressAndLocation._1)
println(addressAndLocation._1)
json.put("districtcode",addressAndLocation._2)
}
// if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
// val warnType = json.getString("warnType")
// if (!Const.warnTypes.contains(warnType)){
// writeUnknown(conn,Const.unKnownTable,json)
// }
// val infoStr = json.getString("infoContent")
// val infoJson = Tools.getInfoContentJsonobj(infoStr)
// json.put("longitude",infoJson.get("LONGITUDE"))
// json.put("latitude",infoJson.get("LATITUDE"))
// json.put("eventtype",infoJson.get("EVENT_TYPE"))
// val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
// json.put("fulladdress",addressAndLocation._1)
// println(addressAndLocation._1)
// json.put("districtcode",addressAndLocation._2)
// }
val keys = json.keySet().toArray(Array[String]())
val day = if (keys.contains("dateTime")) {
DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
......@@ -130,6 +234,8 @@ object SyncHive extends Sparking with Logging {
} else {
DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
}
val createPartitionSql =s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val createPartitionStmt = conn.prepareStatement(createPartitionSql)
val sql =
s"""
|insert into $tableName
......@@ -140,8 +246,16 @@ object SyncHive extends Sparking with Logging {
val stmt = conn.prepareStatement(sql)
(1 to keys.length).foreach { index => stmt.setObject(index, json.get(keys(index - 1))) }
try {
createPartitionStmt.execute()
stmt.execute()
} finally{
info(s"insert date to HiveTable $tableName SUCCESS")
}catch {
case e:Exception=>{
println("Exception Messages==>"+e)
println(s"hive table $tableName insert data failed==>"+json)
}
}finally{
createPartitionStmt.close()
stmt.close()
}
}
......
package com.hikcreate.data.sync
import java.io.ByteArrayInputStream
import java.net.URI
import java.sql.Connection
import java.util.Locale
import com.alibaba.fastjson.{JSON, JSONObject}
import com.hikcreate.data.client.DbClient
import com.hikcreate.data.common.{Logging, Sparking}
import com.hikcreate.data.constant.Const
import com.hikcreate.data.model.TableKey
import com.hikcreate.data.util.{HDFSHelper, Tools, ZkManager}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.joda.time.{DateTime, Duration}
import org.joda.time.format.DateTimeFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.io.IOUtils
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
object SysncHiveBatch extends Sparking with Logging {
//val sc = new SparkContext(conf)
private val sparkSesson = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
private val sc = sparkSesson.sparkContext
val ssc = new StreamingContext(sc,Seconds(Const.windowTime))
val hdfs : DistributedFileSystem = new DistributedFileSystem()
hdfs.initialize(URI.create("hdfs://10.197.236.211:8020"),new Configuration())
def main(args: Array[String]): Unit = {
val zkManager = ZkManager(Const.zkKafka)
val kafkaParams = getKafkaParams(Const.bootstrap,Const.hiveGroupId)
val offsets = zkManager.getBeginOffset(Const.applicationTopic,Const.hiveGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]()
val inputStream = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Const.applicationTopic,kafkaParams,offsets))
inputStream.transform{ rdd =>
offsetRanges.clear()
offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges:_*)
rdd
}.map(x=>x.value()).foreachRDD{ rdd =>
if(!rdd.isEmpty()){
val startTime = DateTime.now()
val groupRdd = rdd.map(JSON.parseObject).groupBy(json=>TableKey(Option(json.getString("msgId")),Option(json.getString("dataType"))))
processRow3(groupRdd,ssc)
zkManager.saveEndOffset(offsetRanges,Const.hiveGroupId)
offsetRanges.foreach{x=>
println(x)
}
val endTime = DateTime.now()
println(DateTime.now()+"==============time token: "+new Duration(startTime,endTime).getMillis+"ms==============")
}
}
ssc.start()
ssc.awaitTermination()
}
def processRow3(x: RDD[(TableKey, Iterable[JSONObject])],ssc: StreamingContext): Unit = {
println("start process data: "+DateTime.now())
x.foreachPartition{x=>
x.foreach{
x=>try{
val tableKey=x._1
if(!Const.tableMap.contains(tableKey) && tableKey.msgId !=null) {//未知消息
var jsonArr= new ArrayBuffer[JSONObject]()
x._2.foreach{ json =>
jsonArr.append(json)
}
writeUnknown(Const.unKnownTable,jsonArr)
}else if ( tableKey.msgId == null){//基础信息
//x._2.foreach{json=>
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
// }
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),x._2.toArray)
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报
var jsonArr= new ArrayBuffer[JSONObject]()
val flat = x._2.flatMap(x=>Tools.addLocation(x))
val value = flat.toList.grouped(20)
value.foreach{sub=>
val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{line=>
val json = line._1
val location=line._2
json.put("districtcode",location._2)
jsonArr.append(json)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
// writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息
var jsonArr= new ArrayBuffer[JSONObject]()
val value = x._2.toList.grouped(20)
value.foreach{ sub=>
val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{line=>
val json = line._1
val location=line._2
json.put("districtcode",location._2)
jsonArr.append(json)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
}else if (tableKey== TableKey(Some("0x1400"),Some("0x1402"))){//报警上传
var jsonArr= new ArrayBuffer[JSONObject]()
var useFul= new ArrayBuffer[JSONObject]()
var useLess= new ArrayBuffer[JSONObject]()
x._2.foreach{json=>
val warnType=json.getString("warnType")
if (Const.warnTypes.contains(warnType)){
useFul.append(json)
} else{
useLess.append(json)
//writeUnknown(db.conn,Const.unKnownTable,json)
}
}
val value = useFul.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
value.foreach{ sub=>
val codes={
Tools.getAddressAndLocationCodes(sub.map{t=>
val infoStr = t.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
}
)
}
sub.zip(codes).foreach{line=>
val json = line._1
val location = line._2
val infoStr = json.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
val longitude = infoJson.get("LONGITUDE")
val latitude = infoJson.get("LATITUDE")
val eventtype = infoJson.get("EVENT_TYPE")
json.put("longitude",infoJson.get("LONGITUDE"))
json.put("latitude",infoJson.get("LATITUDE"))
json.put("eventtype",infoJson.get("EVENT_TYPE"))
json.put("fulladdress",location._1)
json.put("districtcode",location._2)
jsonArr.append(json)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
if (jsonArr.size>0 && jsonArr !=null){
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"warnTime")
}
if (useLess.size>0 && useLess !=null){
writeUnknown(Const.unKnownTable,useLess)
}
}else{//除了以上几种情况外的消息
var jsonArr= new ArrayBuffer[JSONObject]()
x._2.foreach{json=>
jsonArr.append(json)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"bussinessTime")
}
}catch {
case e:Exception=>
println("发生插入错误的消息"+x._2.toString())
println(e)
e.printStackTrace()
}
}
}
}
def processRow2(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = {
println("start process data: "+DateTime.now())
DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
DbClient.usingDB(Const.hivePoolName) { db =>
x.foreach{
x=>try{
val tableKey=x._1
if(!Const.tableMap.contains(tableKey) && tableKey.msgId !=null) {//未知消息
var jsonArr= new ArrayBuffer[JSONObject]()
x._2.foreach{ json =>
jsonArr.append(json)
}
// writeUnknown(ssc,Const.unKnownTable,jsonArr)
}else if ( tableKey.msgId == null){//基础信息
//x._2.foreach{json=>
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
// }
writeBaseInfoHive(db.conn,Const.tableMap(tableKey),x._2.toArray)
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报
var jsonArr= new ArrayBuffer[JSONObject]()
val flat = x._2.flatMap(x=>Tools.addLocation(x))
val value = flat.toList.grouped(20)
value.foreach{sub=>
val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{line=>
val json = line._1
val location=line._2
json.put("districtcode",location._2)
jsonArr.append(json)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息
var jsonArr= new ArrayBuffer[JSONObject]()
val value = x._2.toList.grouped(20)
value.foreach{ sub=>
val codes = Tools.getAddressAndLocationCodes(sub.map(x=>(x.getDouble("lon")/1000000,x.getDouble("lat")/1000000)))
sub.zip(codes).foreach{line=>
val json = line._1
val location=line._2
json.put("districtcode",location._2)
jsonArr.append(json)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
}else if (tableKey== TableKey(Some("0x1400"),Some("0x1402"))){//报警上传
var jsonArr= new ArrayBuffer[JSONObject]()
var useFul= new ArrayBuffer[JSONObject]()
var useLess= new ArrayBuffer[JSONObject]()
x._2.foreach{json=>
val warnType=json.getString("warnType")
if (Const.warnTypes.contains(warnType)){
useFul.append(json)
} else{
useLess.append(json)
//writeUnknown(db.conn,Const.unKnownTable,json)
}
}
val value = useFul.filter(x=>Const.warnTypes.contains(x.getString("warnType"))).toList.grouped(20)
value.foreach{ sub=>
val codes={
Tools.getAddressAndLocationCodes(sub.map{t=>
val infoStr = t.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
}
)
}
sub.zip(codes).foreach{line=>
val json = line._1
val location = line._2
val infoStr = json.getString("infoContent")
val infoJson = Tools.getInfoContentJsonobj(infoStr)
val longitude = infoJson.get("LONGITUDE")
val latitude = infoJson.get("LATITUDE")
val eventtype = infoJson.get("EVENT_TYPE")
json.put("longitude",infoJson.get("LONGITUDE"))
json.put("latitude",infoJson.get("LATITUDE"))
json.put("eventtype",infoJson.get("EVENT_TYPE"))
json.put("fulladdress",location._1)
json.put("districtcode",location._2)
jsonArr.append(json)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
if (jsonArr.size>0 && jsonArr !=null){
writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"warnTime")
}
if (useLess.size>0 && useLess !=null){
//writeUnknown(db.conn,Const.unKnownTable,useLess)
}
}else{//除了以上几种情况外的消息
var jsonArr= new ArrayBuffer[JSONObject]()
x._2.foreach{json=>
jsonArr.append(json)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"bussinessTime")
}
}catch {
case e:Exception=>
println("发生插入错误的消息"+x._2.toString())
println(e)
e.printStackTrace()
}
}
}
}
def processRow(iterator:Iterator[String]): Unit = {
DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
DbClient.usingDB(Const.hivePoolName){ db =>
iterator.foreach{ x=>
try {
val json = JSON.parseObject(x)
val tableKey = TableKey(Option(json.getString("msgId")),Option(json.getString("dataType")))
if(!Const.tableMap.contains(tableKey)) {
//writeUnknown(db.conn,Const.unKnownTable,json)
}else if ( !json.containsKey("msgId")){
//writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){
Tools.addLocation(json).foreach(x=>writeHiveTable(db.conn,Const.tableMap(tableKey),x))
}else{
writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}catch {
case e:Exception =>
error(x)
e.printStackTrace()
}
}
}
}
def writeBaseInfoHive(conn:Connection,tableName:String,jsonArr:Array[JSONObject]): Unit = {
val keys = jsonArr(0).keySet().toArray(Array[String]())
val insetSql =
s"""
|insert into ods.$tableName
|(${keys.map(x=>x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
""".stripMargin
val stmt = conn.prepareStatement(insetSql)
jsonArr.foreach{json=>
(1 to keys.length).foreach{ index => stmt.setObject(index,json.get(keys(index-1)))}
stmt.addBatch()
}
try{
stmt.executeBatch()
info(s"insert date to HiveTable $tableName SUCCESS")
}catch {
case e:Exception=>{
println("Exception Messages==>"+e)
println(s"hive table $tableName insert data failed==>"+jsonArr.toList)
}
}finally {
stmt.close()
}
}
def writeUnknown(tableName:String,jsonArr:ArrayBuffer[JSONObject]): Unit = {
val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
val day = new DateTime().toString("yyyy-MM-dd")
val results = new StringBuilder()
val descTable = sparkSesson.sql(s"desc table ods.$tableName").registerTempTable("descTable")
val rdd = sparkSesson.sql("select col_name from descTable").rdd
val strings = new ArrayBuffer[String]()
val broadcast = sc.broadcast(strings)
rdd.coalesce(1,false).foreach{x=>
broadcast.value.append(x.getString(0))
}
broadcast.value.trimEnd(3)
broadcast.value.foreach(println(_))
jsonArr.foreach{json=>
val result = dateTime+"\t"+json.toJSONString+"\t"+day+"\n"
results.append(result)
}
if (results.size>0 && results!=null){
val fileName = s"/hive/ODS.db/$tableName/day=$day/000000_0"
val exist = HDFSHelper.exists(hdfs,fileName)
if (!exist){
hdfs.createNewFile(new Path(fileName))
}
val outputStream = hdfs.append(new Path(fileName))
val inputStream = new ByteArrayInputStream(results.toString().getBytes())
HDFSHelper.transport(inputStream,outputStream)
inputStream.close()
outputStream.close()
println("保存到本地")
}
}
def writeHiveTable(conn:Connection,tableName:String,json:JSONObject): Unit = {
println("start insert hive : "+DateTime.now())
//定位消息,增加区县代码字段及值
// if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
// || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
// val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
// json.put("districtcode",addressAndLocation._2)
// }
//报警信息 增加区县代码和事件类型,经纬度,详细地址
// if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
// val warnType = json.getString("warnType")
// if (!Const.warnTypes.contains(warnType)){
// writeUnknown(conn,Const.unKnownTable,json)
// }
// val infoStr = json.getString("infoContent")
// val infoJson = Tools.getInfoContentJsonobj(infoStr)
// json.put("longitude",infoJson.get("LONGITUDE"))
// json.put("latitude",infoJson.get("LATITUDE"))
// json.put("eventtype",infoJson.get("EVENT_TYPE"))
// val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
// json.put("fulladdress",addressAndLocation._1)
// println(addressAndLocation._1)
// json.put("districtcode",addressAndLocation._2)
// }
var keys = json.keySet().toArray(Array[String]())
val day = if (keys.contains("dateTime")) {
DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
} else if (keys.contains("warnTime")) {
new DateTime(json.getLong("warnTime")*1000).toString("yyyy-MM-dd",Locale.CHINESE)
} else {
DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
}
// json.put("partitionday",day)
keys=json.keySet().toArray(Array[String]())
val createPartitionSql =s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val createPartitionStmt = conn.prepareStatement(createPartitionSql)
val setStmt = conn.prepareStatement("set set hive.exec.dynamic.partition.mode=nonstrict")
val sql =
s"""
|insert into $tableName
|partition(partitionday)
|(${keys.map(x=>x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
""".stripMargin
val stmt = conn.prepareStatement(sql)
(1 to keys.length).foreach { index => stmt.setObject(index, json.get(keys(index - 1))) }
try {
createPartitionStmt.execute()
setStmt.execute()
stmt.execute()
info(s"insert date to HiveTable $tableName SUCCESS")
}catch {
case e:Exception=>{
println("Exception Messages==>"+e)
println(s"hive table $tableName insert data failed==>"+json)
}
}finally{
createPartitionStmt.close()
setStmt.close()
stmt.close()
}
println("insert hive end : "+DateTime.now())
}
def writeHiveBatch(conn:Connection,tableName:String,jsonArr:ArrayBuffer[JSONObject],dateKey:String):Unit={
var map:Map[String,ArrayBuffer[JSONObject]]=Map()
dateKey match {
case "dateTime"=>{
jsonArr.foreach{json=>
val day=DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json)
if(map.contains(day)){
val jSONObjects = map.get(day).get.++:(jsons)
map.updated(day,jSONObjects)
}else{
map.+(day->json)
}
}
}
case "warnTime"=>{
jsonArr.foreach { json =>
val day= new DateTime(json.getLong("warnTime") * 1000).toString("yyyy-MM-dd", Locale.CHINESE)
if(map.contains(day)){
val jSONObjects = json +:map.get(day).get
map.updated(day,jSONObjects)
}else{
val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json)
map+=(day->jsons)
}
}
}
case "bussinessTime"=>{
jsonArr.foreach{json=>
val day=DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json)
if(map.contains(day)){
val jSONObjects = map.get(day).get.++:(jsons)
map.updated(day,jSONObjects)
}else{
map.+(day->json)
}
}
}
}
val partitionDays = map.keySet
partitionDays.foreach{day=>
val jsons:ArrayBuffer[JSONObject] = map.get(day).get
val keys = jsons(0).keySet().toArray(Array[String]())
val createPartitionSql =s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val createPartitionStmt = conn.prepareStatement(createPartitionSql)
createPartitionStmt.execute()
val sql =
s"""
|insert into $tableName
|partition(partitionday='$day')
|(${keys.map(x=>x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
""".stripMargin
val stmt = conn.prepareStatement(sql)
jsons.foreach{json=>
(1 to keys.length).foreach { index => stmt.setObject(index, json.get(keys(index - 1))) }
stmt.addBatch()
}
try{
stmt.executeBatch()
}catch {
case e:Exception=>{
println(jsons.toList)
println(jsons.toList)
println(e)
}
}finally {
stmt.close()
createPartitionStmt.close()
}
}
}
def writeHdfs(conn:Connection,tableName:String,jsonArr:ArrayBuffer[JSONObject],dateKey:String):Unit={
var map:Map[String,ArrayBuffer[JSONObject]]=Map()
dateKey match {
case "dateTime"=>{
jsonArr.foreach{json=>
val day=DateTime.parse(json.getString("dateTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json)
if(map.contains(day)){
val jSONObjects = map.get(day).get.++:(jsons)
map.updated(day,jSONObjects)
}else{
map.+(day->json)
}
}
}
case "warnTime"=>{
jsonArr.foreach { json =>
val day= new DateTime(json.getLong("warnTime") * 1000).toString("yyyy-MM-dd", Locale.CHINESE)
if(map.contains(day)){
val jSONObjects = json +:map.get(day).get
map.updated(day,jSONObjects)
}else{
val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json)
map+=(day->jsons)
}
}
}
case "bussinessTime"=>{
jsonArr.foreach{json=>
val day=DateTime.parse(json.getString("businessTime"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json)
if(map.contains(day)){
val jSONObjects = map.get(day).get.++:(jsons)
map.updated(day,jSONObjects)
}else{
map.+(day->json)
}
}
}
}
val partitionDays = map.keySet
partitionDays.foreach{day=>
val jsons:ArrayBuffer[JSONObject] = map.get(day).get
val keys = jsons(0).keySet().toArray(Array[String]())
val createPartitionSql =s"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val createPartitionStmt = conn.prepareStatement(createPartitionSql)
createPartitionStmt.execute()
val sql =
s"""
|insert into $tableName
|partition(partitionday='$day')
|(${keys.map(x=>x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
""".stripMargin
val stmt = conn.prepareStatement(sql)
jsons.foreach{json=>
(1 to keys.length).foreach { index => stmt.setObject(index, json.get(keys(index - 1))) }
stmt.addBatch()
}
try{
stmt.executeBatch()
}catch {
case e:Exception=>{
println(jsons.toList)
println(jsons.toList)
println(e)
}
}finally {
stmt.close()
createPartitionStmt.close()
}
}
}
}
package com.hikcreate.data.util
import java.io.{FileSystem => _, _}
import org.apache.hadoop.fs._
import scala.collection.mutable.ListBuffer
object HDFSHelper {
def isDir(hdfs : FileSystem, name : String) : Boolean = {
hdfs.isDirectory(new Path(name))
}
def isDir(hdfs : FileSystem, name : Path) : Boolean = {
hdfs.isDirectory(name)
}
def isFile(hdfs : FileSystem, name : String) : Boolean = {
hdfs.isFile(new Path(name))
}
def isFile(hdfs : FileSystem, name : Path) : Boolean = {
hdfs.isFile(name)
}
def createFile(hdfs : FileSystem, name : String) : Boolean = {
hdfs.createNewFile(new Path(name))
}
def createFile(hdfs : FileSystem, name : Path) : Boolean = {
hdfs.createNewFile(name)
}
def createFolder(hdfs : FileSystem, name : String) : Boolean = {
hdfs.mkdirs(new Path(name))
}
def createFolder(hdfs : FileSystem, name : Path) : Boolean = {
hdfs.mkdirs(name)
}
def exists(hdfs : FileSystem, name : String) : Boolean = {
hdfs.exists(new Path(name))
}
def exists(hdfs : FileSystem, name : Path) : Boolean = {
hdfs.exists(name)
}
def transport(inputStream : InputStream, outputStream : OutputStream): Unit ={
val buffer = new Array[Byte](64 * 1000)
var len = inputStream.read(buffer)
while (len != -1) {
outputStream.write(buffer, 0, len - 1)
len = inputStream.read(buffer)
}
outputStream.flush()
inputStream.close()
outputStream.close()
}
class MyPathFilter extends PathFilter {
override def accept(path: Path): Boolean = true
}
/**
* create a target file and provide parent folder if necessary
*/
def createLocalFile(fullName : String) : File = {
val target : File = new File(fullName)
if(!target.exists){
val index = fullName.lastIndexOf(File.separator)
val parentFullName = fullName.substring(0, index)
val parent : File = new File(parentFullName)
if(!parent.exists)
parent.mkdirs
else if(!parent.isDirectory)
parent.mkdir
target.createNewFile
}
target
}
/**
* delete file in hdfs
* @return true: success, false: failed
*/
def deleteFile(hdfs : FileSystem, path: String) : Boolean = {
if (isDir(hdfs, path))
hdfs.delete(new Path(path), true)//true: delete files recursively
else
hdfs.delete(new Path(path), false)
}
/**
* get all file children's full name of a hdfs dir, not include dir children
* @param fullName the hdfs dir's full name
*/
def listChildren(hdfs : FileSystem, fullName : String, holder : ListBuffer[String]) : ListBuffer[String] = {
val filesStatus = hdfs.listStatus(new Path(fullName), new MyPathFilter)
for(status <- filesStatus){
val filePath : Path = status.getPath
if(isFile(hdfs,filePath))
holder += filePath.toString
else
listChildren(hdfs, filePath.toString, holder)
}
holder
}
def copyFile(hdfs : FileSystem, source: String, target: String): Unit = {
val sourcePath = new Path(source)
val targetPath = new Path(target)
if(!exists(hdfs, targetPath))
createFile(hdfs, targetPath)
val inputStream : FSDataInputStream = hdfs.open(sourcePath)
val outputStream : FSDataOutputStream = hdfs.create(targetPath)
transport(inputStream, outputStream)
}
def copyFolder(hdfs : FileSystem, sourceFolder: String, targetFolder: String): Unit = {
val holder : ListBuffer[String] = new ListBuffer[String]
val children : List[String] = listChildren(hdfs, sourceFolder, holder).toList
for(child <- children)
copyFile(hdfs, child, child.replaceFirst(sourceFolder, targetFolder))
}
def copyFileFromLocal(hdfs : FileSystem, localSource: String, hdfsTarget: String): Unit = {
val targetPath = new Path(hdfsTarget)
if(!exists(hdfs, targetPath))
createFile(hdfs, targetPath)
val inputStream : FileInputStream = new FileInputStream(localSource)
val outputStream : FSDataOutputStream = hdfs.create(targetPath)
transport(inputStream, outputStream)
}
def copyFileToLocal(hdfs : FileSystem, hdfsSource: String, localTarget: String): Unit = {
val localFile : File = createLocalFile(localTarget)
val inputStream : FSDataInputStream = hdfs.open(new Path(hdfsSource))
val outputStream : FileOutputStream = new FileOutputStream(localFile)
transport(inputStream, outputStream)
}
def copyFolderFromLocal(hdfs : FileSystem, localSource: String, hdfsTarget: String): Unit = {
val localFolder : File = new File(localSource)
val allChildren : Array[File] = localFolder.listFiles
for(child <- allChildren){
val fullName = child.getAbsolutePath
val nameExcludeSource : String = fullName.substring(localSource.length)
val targetFileFullName : String = hdfsTarget + Path.SEPARATOR + nameExcludeSource
if(child.isFile)
copyFileFromLocal(hdfs, fullName, targetFileFullName)
else
copyFolderFromLocal(hdfs, fullName, targetFileFullName)
}
}
def copyFolderToLocal(hdfs : FileSystem, hdfsSource: String, localTarget: String): Unit = {
val holder : ListBuffer[String] = new ListBuffer[String]
val children : List[String] = listChildren(hdfs, hdfsSource, holder).toList
val hdfsSourceFullName = hdfs.getFileStatus(new Path(hdfsSource)).getPath.toString
val index = hdfsSourceFullName.length
for(child <- children){
val nameExcludeSource : String = child.substring(index + 1)
val targetFileFullName : String = localTarget + File.separator + nameExcludeSource
copyFileToLocal(hdfs, child, targetFileFullName)
}
}
}
......@@ -76,7 +76,7 @@ object Tools extends Logging{
val startTime = DateTime.now()
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString).header("content-type","application/json").asString
val endTime = DateTime.now()
println("http请求时间:"+new Duration(startTime,endTime).getMillis)
println("经纬度列表size:"+buffer.size+"===》http response time :"+new Duration(startTime,endTime).getMillis)
val body = JSON.parseObject(response.body)
val items = body.getJSONObject("result").getJSONArray("regeoItems")
(0 until items.size()).map{ index =>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment