Commit 2ae921dd by 杜发飞

1

parent 31d52b5c
...@@ -4,6 +4,7 @@ import com.hikcreate.data.listener.LifecycleListener ...@@ -4,6 +4,7 @@ import com.hikcreate.data.listener.LifecycleListener
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
trait Sparking { trait Sparking {
...@@ -13,12 +14,12 @@ trait Sparking { ...@@ -13,12 +14,12 @@ trait Sparking {
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .set("spark.serializer",classOf[KryoSerializer].getName)
.set("spark.extraListeners",classOf[LifecycleListener].getName) .set("spark.extraListeners",classOf[LifecycleListener].getName)
.set("hive.exec.dynamici.partition","true") .set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict") .set("hive.exec.dynamic.partition.mode","nonstrict")
.setAppName("test") //.setAppName("test")
.setMaster("local[*]") //.setMaster("local[*]")
def getKafkaParams(servers:String,groupId: String):Map[String,Object] = { def getKafkaParams(servers:String,groupId: String):Map[String,Object] = {
Map[String,Object]( Map[String,Object](
......
...@@ -20,7 +20,7 @@ class BatchProcessListener(ssc:StreamingContext) extends StreamingListener with ...@@ -20,7 +20,7 @@ class BatchProcessListener(ssc:StreamingContext) extends StreamingListener with
msg.append("应用程序开始时间:" + new DateTime(ssc.sparkContext.startTime).toString("yyyy-MM-dd HH:mm:ss")) msg.append("应用程序开始时间:" + new DateTime(ssc.sparkContext.startTime).toString("yyyy-MM-dd HH:mm:ss"))
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
val Delay_ts = batchStarted.batchInfo.schedulingDelay.get / 1000 * 60D val Delay_ts = batchStarted.batchInfo.schedulingDelay.get / (1000 * 60D)
if( Delay_ts > ApolloConst.delayMax && isSend.get()){ if( Delay_ts > ApolloConst.delayMax && isSend.get()){
msg.append("当前调度等待时间:"+Delay_ts) msg.append("当前调度等待时间:"+Delay_ts)
EmailClient.sendEmail("Spark程序监控",ApolloConst.recipients,Some(msg.mkString("\n")),None,None) EmailClient.sendEmail("Spark程序监控",ApolloConst.recipients,Some(msg.mkString("\n")),None,None)
......
...@@ -40,7 +40,7 @@ object SyncIgnite extends Sparking with Logging{ ...@@ -40,7 +40,7 @@ object SyncIgnite extends Sparking with Logging{
x.foreach{ x => x.foreach{ x =>
try{ try{
//System.exit(1) //System.exit(1)
x._2.foreach(x=>println(x.toJSONString)) //x._2.foreach(x=>println(x.toJSONString))
x._1 match { x._1 match {
case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息 case TableKey(Some("0x1200"),Some("0x1202")) => //车辆定位消息
val value = x._2.toList.grouped(20) val value = x._2.toList.grouped(20)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment