Commit 5a2d7476 by 王建成

update

parent 655c5792
kafka.bootstrap.servers=39.100.49.76:9092 kafka.bootstrap.servers=39.100.49.76:9092
#kafka.bootstrap.servers=10.197.236.154:9092 #kafka.bootstrap.servers=10.197.236.154:9092
kafka.zookerper.servers=10.197.236.154:2181 kafka.zookerper.servers=10.197.236.211:2181
#,10.197.236.169:2181,10.197.236.184:2181/kafka #,10.197.236.169:2181,10.197.236.184:2181/kafka
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka #kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
#kafka.zookerper.servers=10.197.236.211:2181 #kafka.zookerper.servers=10.197.236.211:2181
......
...@@ -18,8 +18,8 @@ trait Sparking { ...@@ -18,8 +18,8 @@ trait Sparking {
.set("spark.extraListeners",classOf[LifecycleListener].getName) .set("spark.extraListeners",classOf[LifecycleListener].getName)
.set("hive.exec.dynamici.partition","true") .set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict") .set("hive.exec.dynamic.partition.mode","nonstrict")
//.setAppName("test") .setAppName("test")
//.setMaster("local[*]") .setMaster("local[*]")
def getKafkaParams(servers:String,groupId: String):Map[String,Object] = { def getKafkaParams(servers:String,groupId: String):Map[String,Object] = {
Map[String,Object]( Map[String,Object](
......
...@@ -8,7 +8,7 @@ object ApolloConst { ...@@ -8,7 +8,7 @@ object ApolloConst {
val config: Config = ConfigService.getConfig("application") val config: Config = ConfigService.getConfig("application")
val queueSize:Int=config.getIntProperty("queue.size",20)
var hdfsUrl: String = config.getProperty("hdfs.url",null) var hdfsUrl: String = config.getProperty("hdfs.url",null)
var windowTime:Long = config.getLongProperty("window.time",1L) var windowTime:Long = config.getLongProperty("window.time",1L)
var compactTopic:Seq[String] = config.getProperty("compact.kafka.topic",null).split(",") var compactTopic:Seq[String] = config.getProperty("compact.kafka.topic",null).split(",")
......
...@@ -3,6 +3,7 @@ import java.io.ByteArrayInputStream ...@@ -3,6 +3,7 @@ import java.io.ByteArrayInputStream
import java.net.URI import java.net.URI
import java.sql.{Connection, ResultSet} import java.sql.{Connection, ResultSet}
import java.util.Locale import java.util.Locale
import com.alibaba.fastjson.{JSON, JSONObject} import com.alibaba.fastjson.{JSON, JSONObject}
import com.hikcreate.data.client.DbClient import com.hikcreate.data.client.DbClient
import com.hikcreate.data.common.{Logging, Sparking} import com.hikcreate.data.common.{Logging, Sparking}
...@@ -17,22 +18,33 @@ import org.joda.time.format.DateTimeFormat ...@@ -17,22 +18,33 @@ import org.joda.time.format.DateTimeFormat
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object SysncHiveBatch extends Sparking with Logging { object SysncHiveBatch extends Sparking with Logging {
val hdfs: DistributedFileSystem = new DistributedFileSystem() val hdfs: DistributedFileSystem = new DistributedFileSystem()
hdfs.initialize(URI.create(ApolloConst.hdfsUrl), new Configuration()) val hdfsConf = new Configuration()
hdfsConf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable",true)
hdfs.initialize(URI.create(ApolloConst.hdfsUrl), hdfsConf)
val queue=mutable.Queue[Int]()
(1 to(ApolloConst.queueSize)).map(i=>queue.enqueue(i))
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(conf,Seconds(ApolloConst.windowTime)) val ssc = new StreamingContext(conf,Seconds(ApolloConst.windowTime))
val zkManager = ZkManager(ApolloConst.zkKafka) val zkManager = ZkManager(ApolloConst.zkKafka)
val kafkaParams = getKafkaParams(ApolloConst.bootstrap, ApolloConst.hiveGroupId) val kafkaParams = getKafkaParams(ApolloConst.bootstrap, ApolloConst.hiveGroupId)
// val kafkaParams = getKafkaParams("10.197.236.154:9092,10.197.236.169:9092,10.197.236.184:9092", ApolloConst.hiveGroupId)
val offsets = zkManager.getBeginOffset(ApolloConst.compactTopic, ApolloConst.hiveGroupId) val offsets = zkManager.getBeginOffset(ApolloConst.compactTopic, ApolloConst.hiveGroupId)
// val offsets = zkManager.getBeginOffset("relay_hangzhou_test".split(","), ApolloConst.hiveGroupId)
val offsetRanges = new ArrayBuffer[OffsetRange]() val offsetRanges = new ArrayBuffer[OffsetRange]()
ssc.addStreamingListener(new BatchProcessListener(ssc)) ssc.addStreamingListener(new BatchProcessListener(ssc))
val inputStream = KafkaUtils.createDirectStream[String, String]( val inputStream = KafkaUtils.createDirectStream[String, String](
ssc, ssc,
LocationStrategies.PreferConsistent, LocationStrategies.PreferConsistent,
//ApolloConst.compactTopic
//"relay_hangzhou_test".split(",")
ConsumerStrategies.Subscribe[String, String](ApolloConst.compactTopic, kafkaParams, offsets)) ConsumerStrategies.Subscribe[String, String](ApolloConst.compactTopic, kafkaParams, offsets))
inputStream.transform { rdd => inputStream.transform { rdd =>
offsetRanges.clear() offsetRanges.clear()
...@@ -41,7 +53,9 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -41,7 +53,9 @@ object SysncHiveBatch extends Sparking with Logging {
}.map(x => x.value()).foreachRDD { rdd => }.map(x => x.value()).foreachRDD { rdd =>
if (!rdd.isEmpty()) { if (!rdd.isEmpty()) {
val startTime = DateTime.now() val startTime = DateTime.now()
rdd.map(JSON.parseObject).groupBy(json => TableKey(Option(json.getString("msgId")), Option(json.getString("dataType")))).foreachPartition(x => processRow3(x)) val rdd1 = rdd.map(JSON.parseObject).groupBy(json => TableKey(Option(json.getString("msgId")), Option(json.getString("dataType"))))
rdd1.foreachPartition(x => processRow3(x))
zkManager.saveEndOffset(offsetRanges, ApolloConst.hiveGroupId) zkManager.saveEndOffset(offsetRanges, ApolloConst.hiveGroupId)
offsetRanges.foreach { x => offsetRanges.foreach { x =>
println(x) println(x)
...@@ -129,7 +143,10 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -129,7 +143,10 @@ object SysncHiveBatch extends Sparking with Logging {
val infoJson = Tools.getInfoContentJsonobj(infoStr) val infoJson = Tools.getInfoContentJsonobj(infoStr)
val longitude = infoJson.get("LONGITUDE") val longitude = infoJson.get("LONGITUDE")
val latitude = infoJson.get("LATITUDE") val latitude = infoJson.get("LATITUDE")
val eventtype = infoJson.get("EVENT_TYPE") var eventtype = infoJson.getString("EVENT_TYPE")
if (eventtype.length==3){
eventtype = eventtype.replace("x","x0")
}
json.put("longitude", infoJson.get("LONGITUDE")) json.put("longitude", infoJson.get("LONGITUDE"))
json.put("latitude", infoJson.get("LATITUDE")) json.put("latitude", infoJson.get("LATITUDE"))
json.put("eventtype", infoJson.get("EVENT_TYPE")) json.put("eventtype", infoJson.get("EVENT_TYPE"))
...@@ -150,12 +167,11 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -150,12 +167,11 @@ object SysncHiveBatch extends Sparking with Logging {
x._2.foreach { json => x._2.foreach { json =>
jsonArr.append(json) jsonArr.append(json)
} }
writeHdfs(db.conn, ApolloConst.tableMap(tableKey), jsonArr, "bussinessTime") writeHdfs(db.conn, ApolloConst.tableMap(tableKey), jsonArr, "businessTime")
} }
} catch { } catch {
case e: Exception => case e: Exception =>
println("发生插入错误的消息" + x._2.toString()) info("发生插入错误的消息" + x._2.toString())
println(e)
e.printStackTrace() e.printStackTrace()
} }
} }
...@@ -219,6 +235,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -219,6 +235,7 @@ object SysncHiveBatch extends Sparking with Logging {
} }
def writeUnknown(conn: Connection, tableName: String, jsonArr: ArrayBuffer[JSONObject]): Unit = { def writeUnknown(conn: Connection, tableName: String, jsonArr: ArrayBuffer[JSONObject]): Unit = {
// this.synchronized{
val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss") val dateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
val day = new DateTime().toString("yyyy-MM-dd") val day = new DateTime().toString("yyyy-MM-dd")
val sourceJsons = new ArrayBuffer[JSONObject]() val sourceJsons = new ArrayBuffer[JSONObject]()
...@@ -260,22 +277,27 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -260,22 +277,27 @@ object SysncHiveBatch extends Sparking with Logging {
results=results+result results=results+result
} }
results=results+"\n" results=results+"\n"
if (results.trim.size > 0 && results != null) { if (results.trim().size > 0 && results != null) {
val fileName = s"/hive/ODS.db/$tableName/day=$day/000000_0" val i = queue.dequeue()
val fileName = s"/hive/ODS.db/$tableName/partitionday=$day/000000_$i"
val exist = HDFSHelper.exists(hdfs, fileName) val exist = HDFSHelper.exists(hdfs, fileName)
if (!exist) { if (!exist) {
hdfs.createNewFile(new Path(fileName)) hdfs.createNewFile(new Path(fileName))
} }
val outputStream = hdfs.append(new Path(fileName)) val outputStream = hdfs.append(new Path(fileName))
val inputStream = new ByteArrayInputStream(results.toString().getBytes("UTF-8"))
val inputStream = new ByteArrayInputStream(results.getBytes("UTF-8"))
HDFSHelper.transport(inputStream, outputStream) HDFSHelper.transport(inputStream, outputStream)
inputStream.close() inputStream.close()
outputStream.close() outputStream.close()
queue.enqueue(i)
} }
}finally { }finally {
stmt.close() stmt.close()
createPartitionStmt.close() createPartitionStmt.close()
} }
// }
} }
def writeHiveTable(conn: Connection, tableName: String, json: JSONObject): Unit = { def writeHiveTable(conn: Connection, tableName: String, json: JSONObject): Unit = {
...@@ -435,11 +457,11 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -435,11 +457,11 @@ object SysncHiveBatch extends Sparking with Logging {
} }
} }
} }
case "bussinessTime" => { case "businessTime" => {
jsonArr.foreach { json => jsonArr.foreach { json =>
val day = DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") val day = DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
val bussinessTime = DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd HH:mm:ss") val businessTime = DateTime.parse(json.getString("businessTime"), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd HH:mm:ss")
json.put("bussinessTime",bussinessTime) json.put("businessTime",businessTime)
json.put("partitionday", day) json.put("partitionday", day)
val jsons = new ArrayBuffer[JSONObject]() val jsons = new ArrayBuffer[JSONObject]()
jsons.append(json) jsons.append(json)
...@@ -498,7 +520,8 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -498,7 +520,8 @@ object SysncHiveBatch extends Sparking with Logging {
} }
results=results+"\n" results=results+"\n"
if (results.trim.size > 0 && results != null) { if (results.trim.size > 0 && results != null) {
val fileName = s"/hive/ODS.db/$tableName/partitionday=$day/000000_0" val i = queue.dequeue()
val fileName = s"/hive/ODS.db/$tableName/partitionday=$day/000000_$i"
val exist = HDFSHelper.exists(hdfs, fileName) val exist = HDFSHelper.exists(hdfs, fileName)
if (!exist) { if (!exist) {
hdfs.createNewFile(new Path(fileName)) hdfs.createNewFile(new Path(fileName))
...@@ -508,6 +531,7 @@ object SysncHiveBatch extends Sparking with Logging { ...@@ -508,6 +531,7 @@ object SysncHiveBatch extends Sparking with Logging {
HDFSHelper.transport(inputStream, outputStream) HDFSHelper.transport(inputStream, outputStream)
inputStream.close() inputStream.close()
outputStream.close() outputStream.close()
queue.enqueue(i)
} }
} finally { } finally {
stmt.close() stmt.close()
......
import com.hikcreate.data.util.Tools import com.hikcreate.data.util.Tools
import scala.collection.mutable
object Test1 { object Test1 {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val a = Tools.getAddressAndLocationCode(120.485443,30.183996) // val a = Tools.getAddressAndLocationCode(120.485443,30.183996)
println(a._1+"----"+a._2) // println(a._1+"----"+a._2)
// var eventtype="0x2"
// eventtype = eventtype.replace("x","x0")
// println(eventtype)
val list = (1 to(20)).toList
val queue=mutable.Queue[Int]()
(1 to(20)).foreach(i=>queue.enqueue(i))
val i = queue.dequeue()
println(i)
println(queue)
queue.enqueue(i)
println(queue)
// val dequeue = queue
// println(dequeue._1)
// println(queue)
//val queue1 = queue.enqueue(dequeue._1)
//println(queue1)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment