Commit 9662d60b by 杜发飞

1

parent 30f220c7
......@@ -12,7 +12,6 @@ import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import org.joda.time.DateTime
import scala.collection.JavaConversions.mapAsJavaMap
object IgniteClient {
......@@ -166,7 +165,7 @@ object IgniteClient {
.setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmDeal])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
def updateDailyAlarmDealCache(key:BinaryObject,map:Map[String,AnyRef]): Unit = {
......@@ -187,7 +186,7 @@ object IgniteClient {
.setIndexedTypes(classOf[PrimaryKey],classOf[DailyAlarmDetail])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
/**
......@@ -201,7 +200,7 @@ object IgniteClient {
.setIndexedTypes(classOf[PrimaryKey],classOf[AttachmentInfo])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
/**
......@@ -215,7 +214,7 @@ object IgniteClient {
.setIndexedTypes(classOf[PrimaryKey],classOf[IdentificationInfo])
.setDataRegionName("500MB_Region")
.setCacheMode(CacheMode.REPLICATED)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
def main(args: Array[String]): Unit = {
......
......@@ -7,6 +7,9 @@ object ApolloConst {
val config: Config = ConfigService.getConfig("application")
val delayMax: Integer = config.getIntProperty("spark.delay.max",1)
val recipients: Seq[String] = config.getProperty("spark.email.receiver",null).split(",")
val bootstrap: String = config.getProperty("kafka.bootstrap.servers",null)
val zkKafka: String = config.getProperty("kafka.zookerper.servers",null)
......
package com.hikcreate.data.listener
import java.util.concurrent.atomic.AtomicBoolean
import com.hikcreate.data.client.EmailClient
import com.hikcreate.data.common.Logging
import com.hikcreate.data.constant.ApolloConst
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchStarted}
import org.joda.time.DateTime
import scala.collection.mutable.ArrayBuffer
/**
* 监控批处理时间
*/
class BatchProcessListener(ssc:StreamingContext) extends StreamingListener with Logging{
val DELAY_MAX: Int = 1000 * 60 * 1
val msg = new ArrayBuffer[String]()
msg.append(ssc.sparkContext.applicationId)
msg.append(ssc.sparkContext.appName)
val isSend = new AtomicBoolean(true)
msg.append("应用程序ID:" + ssc.sparkContext.applicationId)
msg.append("应用程序名称:" + ssc.sparkContext.appName)
msg.append("应用程序开始时间:" + new DateTime(ssc.sparkContext.startTime).toString("yyyy-MM-dd HH:mm:ss"))
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
val Delay_ts = batchStarted.batchInfo.schedulingDelay.get//调度延迟,单位:毫秒
if( Delay_ts > DELAY_MAX ){
msg.append("数据处理存在延迟")
EmailClient.sendEmail("Spark程序监控",Seq("272558733@qq.com"),Some(msg.mkString("\n")),None,None)
val Delay_ts = batchStarted.batchInfo.schedulingDelay.get / 1000 * 60D
if( Delay_ts > ApolloConst.delayMax && isSend.get()){
msg.append("当前调度等待时间:"+Delay_ts)
EmailClient.sendEmail("Spark程序监控",ApolloConst.recipients,Some(msg.mkString("\n")),None,None)
isSend.set(false)
}
}
}
......@@ -2,9 +2,11 @@ package com.hikcreate.data.listener
import com.hikcreate.data.client.EmailClient
import com.hikcreate.data.common.Logging
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
import com.hikcreate.data.constant.ApolloConst
import org.apache.spark.{ExceptionFailure, SparkConf, TaskFailedReason, TaskKilled}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerTaskEnd}
import org.joda.time.DateTime
import scala.collection.mutable.ArrayBuffer
/**
* 监控spark程序的启动与停止
......@@ -17,12 +19,27 @@ class LifecycleListener(conf:SparkConf) extends SparkListener with Logging {
msg.append("应用程序ID:" + applicationStart.appId.getOrElse(""))
msg.append("应用程序名称:" + applicationStart.appName)
msg.append("应用程序开始时间:" + new DateTime(applicationStart.time).toString("yyyy-MM-dd HH:mm:ss"))
println(msg)
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
msg.append("应用程序结束时间:" + new DateTime(applicationEnd.time).toString("yyyy-MM-dd HH:mm:ss"))
println(msg.mkString("\n"))
EmailClient.sendEmail("Spark程序监控",Seq("272558733@qq.com"),Some(msg.mkString("\n")),None,None)
EmailClient.sendEmail("Spark程序监控",ApolloConst.recipients,Some(msg.mkString("\n")),None,None)
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val info = taskEnd.taskInfo
if (info != null && taskEnd.stageAttemptId != -1) {
val errorMessage: Option[String] = taskEnd.reason match {
case kill:TaskKilled => Some(kill.toErrorString)
case e: ExceptionFailure => Some(e.toErrorString)
case e: TaskFailedReason => Some(e.toErrorString)
case _ => None
}
if (errorMessage.nonEmpty) {
msg.append("异常信息:")
msg.append(errorMessage.get)
EmailClient.sendEmail("spark任务监控",ApolloConst.recipients,Option(msg.mkString("\n")),None,None)
}
}
}
}
......@@ -14,7 +14,6 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer
object SyncIgnite extends Sparking with Logging{
......@@ -92,7 +91,7 @@ object SyncIgnite extends Sparking with Logging{
IgniteClient.vehicleNumberCache.withKeepBinary().put(vehicleNumberKey,vehicleNumberValue)
}
}
case tableKey if tableKey == TableKey(Some("0x1D00"),Some("0x1d02")) => //定时上传驾驶员身份识别信息
case TableKey(Some("0x1D00"),Some("0x1d02")) => //定时上传驾驶员身份识别信息
x._2.foreach{ x=>
val vehicleNo = x.getString("vehicleNo")
val vehicleColor = x.getString("vehicleColor")
......@@ -119,13 +118,11 @@ object SyncIgnite extends Sparking with Logging{
val dateTime = new DateTime(timestamp)
val warnTime = dateTime.toString("yyyy-MM-dd HH:mm:ss")
x.put("warnTime",warnTime)
val key = IgniteClient.getBinaryObject(
new PrimaryKey(x.getString("vehicleNo"),
x.getString("vehicleColor"),
x.getString("deviceId"),
x.getString("warnTime"),
x.getString("fileIndex"))
)
val key = IgniteClient.getBinaryObject(new PrimaryKey(x.getString("vehicleNo"),
x.getString("vehicleColor"),
x.getString("deviceId"),
x.getString("warnTime"),
x.getString("fileIndex")))
val value = JSON.parseObject(x.toJSONString,classOf[AttachmentInfo])
IgniteClient.attachmentCache.withKeepBinary().put(key,value)
}
......@@ -192,7 +189,7 @@ object SyncIgnite extends Sparking with Logging{
"superVisionId"->superVisionId,
"warnTime"->warnTime))}
}
case tableKey if tableKey == TableKey(Some("0x1400"),Some("0x1401")) => //报警督办应答消息
case TableKey(Some("0x1400"),Some("0x1401")) => //报警督办应答消息
x._2.foreach{ x=>
val vehicleNo = x.getString("vehicleNo")
val vehicleColor = x.getString("vehicleColor")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment