Commit 9be954d2 by 王建成

update

parent cb8a603c
...@@ -66,6 +66,8 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -66,6 +66,8 @@ object SysncHiveBatch extends Sparking with Logging {
} }
def processRow3(x: RDD[(TableKey, Iterable[JSONObject])],ssc: StreamingContext): Unit = { def processRow3(x: RDD[(TableKey, Iterable[JSONObject])],ssc: StreamingContext): Unit = {
println("start process data: "+DateTime.now()) println("start process data: "+DateTime.now())
DbClient.init(Const.hivePoolName,Const.hiveDriver,Const.hiveUrl, Const.hiveUsername,Const.hivePassword)
DbClient.usingDB(Const.hivePoolName) { db =>
x.foreachPartition{x=> x.foreachPartition{x=>
x.foreach{ x.foreach{
x=>try{ x=>try{
...@@ -75,12 +77,12 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -75,12 +77,12 @@ object SysncHiveBatch extends Sparking with Logging {
x._2.foreach{ json => x._2.foreach{ json =>
jsonArr.append(json) jsonArr.append(json)
} }
writeUnknown(Const.unKnownTable,jsonArr) writeUnknown(db.conn,Const.unKnownTable,jsonArr)
}else if ( tableKey.msgId == null){//基础信息 }else if ( tableKey.msgId == null){//基础信息
//x._2.foreach{json=> //x._2.foreach{json=>
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json) // writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
// } // }
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),x._2.toArray) // writeBaseInfoHive(db.conn,Const.tableMap(tableKey),x._2.toArray)
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报 }else if (tableKey== TableKey(Some("0x1200"),Some("0x1203"))){//定位补报
var jsonArr= new ArrayBuffer[JSONObject]() var jsonArr= new ArrayBuffer[JSONObject]()
val flat = x._2.flatMap(x=>Tools.addLocation(x)) val flat = x._2.flatMap(x=>Tools.addLocation(x))
...@@ -95,7 +97,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -95,7 +97,7 @@ object SysncHiveBatch extends Sparking with Logging {
//writeHiveTable(db.conn,Const.tableMap(tableKey),json) //writeHiveTable(db.conn,Const.tableMap(tableKey),json)
} }
} }
// writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime") // writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
}else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息 }else if (tableKey== TableKey(Some("0x1200"),Some("0x1202"))){//定位消息
var jsonArr= new ArrayBuffer[JSONObject]() var jsonArr= new ArrayBuffer[JSONObject]()
val value = x._2.toList.grouped(20) val value = x._2.toList.grouped(20)
...@@ -154,7 +156,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -154,7 +156,7 @@ object SysncHiveBatch extends Sparking with Logging {
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"warnTime") //writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"warnTime")
} }
if (useLess.size>0 && useLess !=null){ if (useLess.size>0 && useLess !=null){
writeUnknown(Const.unKnownTable,useLess) writeUnknown(db.conn,Const.unKnownTable,useLess)
} }
}else{//除了以上几种情况外的消息 }else{//除了以上几种情况外的消息
var jsonArr= new ArrayBuffer[JSONObject]() var jsonArr= new ArrayBuffer[JSONObject]()
...@@ -173,6 +175,9 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -173,6 +175,9 @@ object SysncHiveBatch extends Sparking with Logging {
} }
} }
}
} }
def processRow2(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = { def processRow2(x: Iterator[(TableKey, Iterable[JSONObject])]): Unit = {
println("start process data: "+DateTime.now()) println("start process data: "+DateTime.now())
...@@ -333,19 +338,16 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -333,19 +338,16 @@ object SysncHiveBatch extends Sparking with Logging {
stmt.close() stmt.close()
} }
} }
def writeUnknown(tableName:String,jsonArr:ArrayBuffer[JSONObject]): Unit = { def writeUnknown(conn:Connection,tableName:String,jsonArr:ArrayBuffer[JSONObject]): Unit = {
val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss") val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
val day = new DateTime().toString("yyyy-MM-dd") val day = new DateTime().toString("yyyy-MM-dd")
val results = new StringBuilder() val results = new StringBuilder()
val descTable = sparkSesson.sql(s"desc table ods.$tableName").registerTempTable("descTable") val sql =
val rdd = sparkSesson.sql("select col_name from descTable").rdd """
val strings = new ArrayBuffer[String]() |select * from
val broadcast = sc.broadcast(strings) """.stripMargin
rdd.coalesce(1,false).foreach{x=> val = conn.prepareStatement()
broadcast.value.append(x.getString(0))
}
broadcast.value.trimEnd(3)
broadcast.value.foreach(println(_))
jsonArr.foreach{json=> jsonArr.foreach{json=>
val result = dateTime+"\t"+json.toJSONString+"\t"+day+"\n" val result = dateTime+"\t"+json.toJSONString+"\t"+day+"\n"
results.append(result) results.append(result)
......
...@@ -74,7 +74,7 @@ object Tools extends Logging{ ...@@ -74,7 +74,7 @@ object Tools extends Logging{
arr.add(lonAndLat) arr.add(lonAndLat)
} }
json.put("locations",arr) json.put("locations",arr)
//val startTime = DateTime.now() val startTime = DateTime.now()
val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString).header("content-type","application/json").asString val response = Http(Const.areaCodeAndAddressUrl).postData(json.toJSONString).header("content-type","application/json").asString
val endTime = DateTime.now() val endTime = DateTime.now()
println("经纬度列表size:"+buffer.size+"===》http response time :"+new Duration(startTime,endTime).getMillis) println("经纬度列表size:"+buffer.size+"===》http response time :"+new Duration(startTime,endTime).getMillis)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment