Commit c1df6dfe by 杜发飞

1

parent ef83fb80
package com.hikcreate.data.common
import com.hikcreate.data.listener.LifecycleListener
import org.apache.log4j.{Level, Logger}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
......@@ -13,6 +14,7 @@ trait Sparking {
val conf: SparkConf = new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.extraListeners",classOf[LifecycleListener].getName)
.set("hive.exec.dynamici.partition","true")
.set("hive.exec.dynamic.partition.mode","nonstrict")
.setAppName("test")
......
package com.hikcreate.data.util
package com.hikcreate.data.listener
import com.hikcreate.data.common.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchStarted}
class AppListener (ssc:StreamingContext) extends StreamingListener with Logging{
/**
* 监控批处理时间
*/
class BatchProcessListener(ssc:StreamingContext) extends StreamingListener with Logging{
val DELAY_MAX = 20
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
//调度延迟,单位:毫秒
val Delay_ts = batchStarted.batchInfo.schedulingDelay.get
/*if(Delay_ts > DELAY_MAX ){
sendEmail(...)
......
package com.hikcreate.data.listener
import com.hikcreate.data.common.Logging
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
import org.joda.time.DateTime
import scala.collection.mutable.ArrayBuffer
/**
* 监控spark程序的启动与停止
*/
class LifecycleListener(conf:SparkConf) extends SparkListener with Logging {
val msg = new ArrayBuffer[String]()
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
msg.append("应用程序ID:" + applicationStart.appId.getOrElse(""))
msg.append("应用程序名称:" + applicationStart.appName)
msg.append("应用程序开始时间:" + new DateTime(applicationStart.time).toString("yyyy-MM-dd HH:mm:ss"))
println(msg.toString())
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
msg.append("应用程序结束时间:" + new DateTime(applicationEnd.time).toString("yyyy-MM-dd HH:mm:ss"))
}
}
......@@ -10,7 +10,10 @@ import scala.collection.JavaConverters._
object FullSync extends Sparking{
def main(args: Array[String]): Unit = {
//IgniteClient.ignite.cacheNames().asScala.foreach(x=>IgniteClient.ignite.destroyCache(x))
/*IgniteClient.basicEnterpriseInfo.destroy()
IgniteClient.basicVehicleInfo.destroy()
IgniteClient.basicAlarmTypeInfo.destroy()*/
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//基本企业信息表
sparkSession.sqlContext.read.format("jdbc").options(Map(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment