Commit d561bbfd by 杜发飞

1

parent c1df6dfe
......@@ -127,6 +127,11 @@
<artifactId>scalaj-http_${scala.binary.version}</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play-mailer_${scala.binary.version}</artifactId>
<version>7.0.0</version>
</dependency>
</dependencies>
<build>
......
package com.hikcreate.ignite.domain1.vehicles.processor;
import com.hikcreate.ignite.domain1.vehicles.DailyTravel;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.joda.time.*;
import org.joda.time.format.DateTimeFormat;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import java.lang.reflect.Field;
public class DailyTravelUpdate implements CacheEntryProcessor<BinaryObject,BinaryObject,Void> {
private DailyTravel dailyTravel;
public DailyTravelUpdate(DailyTravel dailyTravel){
this.dailyTravel = dailyTravel;
}
@Override
public Void process(MutableEntry<BinaryObject, BinaryObject> mutableEntry, Object... objects) throws EntryProcessorException {
BinaryObject value = mutableEntry.getValue();
BinaryObjectBuilder builder = value.toBuilder();
LocalDate today = LocalDate.now(); //今日日期
LocalDate statisticalDate = LocalDate.parse(value.<String>field("date"),DateTimeFormat.forPattern("yyyy-MM-dd")); //统计日期
DateTime eventTime = DateTime.parse(dailyTravel.getTime(),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")); //事件日期
LocalDate eventDate = eventTime.toLocalDate();
if(statisticalDate.isBefore(today)){//统计时间不是当前日,需要重置
dailyTravel.setDate(today.toString("yyyy-MM-dd"));
if(eventDate.equals(today)){ //代表这条数据是今天的
mutableEntry.setValue(build(builder,dailyTravel));
}else{
dailyTravel.setLat(null);
dailyTravel.setLng(null);
dailyTravel.setTime(null);
mutableEntry.setValue(build(builder,dailyTravel));
}
}else if(eventDate.equals(today)){
if(value.<Double>field("lat") == null){
mutableEntry.setValue(build(builder,dailyTravel));
}else{
DateTime lastTime = DateTime.parse(value.<String>field("time"),DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"));
if(lastTime.isBefore(eventTime)){
Double differDistance = getDistance(value.<Double>field("lat"), value.<Double>field("lng"), dailyTravel.getLat(),dailyTravel.getLng());
Double travelMileage = value.<Double>field("travelMileage") == null ? 0D : value.<Double>field("travelMileage") + differDistance;
dailyTravel.setTravelMileage(travelMileage);
Interval interval = new Interval(lastTime,eventTime);
Period period = interval.toPeriod(PeriodType.millis());
int differTime = period.getMillis() / 1000;
Long travelTime = value.<Long>field("travelTime") == null ? 0L : value.<Long>field("travelTime") + differTime;
dailyTravel.setTravelTime(travelTime);
mutableEntry.setValue(build(builder,dailyTravel));
}
}
}
return null;
}
private BinaryObject build(BinaryObjectBuilder builder, Object object){
Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
try {
field.setAccessible(true);
builder.setField(field.getName(),field.get(object));
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
return builder.build();
}
private Double rad(Double d){
return d * Math.PI / 180.0;
}
private Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2){
if(lat1 != null && lng1 != null && lat2 != null && lng2 != null){
Double EARTH_RADIUS = 6371.830D;
Double radLat1 = rad(lat1);
Double radLat2 = rad(lat2);
Double radLng1 = rad(lng1);
Double radLng2 = rad(lng2);
Double s = Math.acos(Math.cos(radLat1) * Math.cos(radLat2) * Math.cos(radLng1 - radLng2) + Math.sin(radLat1) * Math.sin(radLat2)) * EARTH_RADIUS;
return Math.round(s * 1000D) / 1000D;
}
return 0D;
}
}
package com.hikcreate.data.client
import java.io.File
import play.api.libs.mailer._
object EmailClient {
val host = "smtp.qq.com"
val port = 587
val user = Option("272558733@qq.com")
val password = Option("zjthyvgxtyvlbibh")
val configuration: SMTPConfiguration = new SMTPConfiguration(host, port, user = user, password = password)
val mailer: SMTPMailer = new SMTPMailer(configuration)
/**
* 添加本地文件AttachmentFile
* @param name 附件名
* @param filePath 本地文件路径
* @return
*/
def generateLocalAttachment(name:String,filePath:String): AttachmentFile = AttachmentFile(name,new File(filePath))
/**
* 添加非本地文件AttachmentData
* @param name 附件名
* @param data 字节流
* @param mimetype 文件类型
*/
def generateNotLocalAttachment(name:String,data:Array[Byte],mimetype:String): AttachmentData = AttachmentData(name,data,mimetype)
/**
* @param subject 邮件主题
* @param to 邮件接收地址
* @param bodyText 如果bodyText参数和bodyHtml参数同时有,则只会显示bodyHtml中的内容
* @param bodyHtml 一般情况下这两个参数也只会用一个,另一个用None
* @param attachments 附件序列
*/
def sendEmail(subject:String,to:Seq[String], bodyText:Option[String],bodyHtml:Option[String],attachments:Option[Seq[Attachment]]): Unit ={
val email = attachments match {
case Some(x) => Email(subject,user.get,to,bodyText,bodyHtml,attachments = x)
case None => Email(subject,user.get,to,bodyText,bodyHtml)
}
mailer.send(email)
}
}
package com.hikcreate.data.listener
import com.hikcreate.data.client.EmailClient
import com.hikcreate.data.common.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchStarted}
import scala.collection.mutable.ArrayBuffer
/**
* 监控批处理时间
*/
class BatchProcessListener(ssc:StreamingContext) extends StreamingListener with Logging{
val DELAY_MAX = 20
val DELAY_MAX: Int = 1000 * 60 * 1
val msg = new ArrayBuffer[String]()
msg.append(ssc.sparkContext.applicationId)
msg.append(ssc.sparkContext.appName)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
//调度延迟,单位:毫秒
val Delay_ts = batchStarted.batchInfo.schedulingDelay.get
/*if(Delay_ts > DELAY_MAX ){
sendEmail(...)
}*/
val Delay_ts = batchStarted.batchInfo.schedulingDelay.get//调度延迟,单位:毫秒
if( Delay_ts > DELAY_MAX ){
msg.append("数据处理存在延迟")
EmailClient.sendEmail("Spark程序监控",Seq("272558733@qq.com"),Some(msg.mkString("\n")),None,None)
}
}
}
package com.hikcreate.data.listener
import com.hikcreate.data.client.EmailClient
import com.hikcreate.data.common.Logging
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
import org.joda.time.DateTime
import scala.collection.mutable.ArrayBuffer
/**
* 监控spark程序的启动与停止
......@@ -17,11 +17,11 @@ class LifecycleListener(conf:SparkConf) extends SparkListener with Logging {
msg.append("应用程序ID:" + applicationStart.appId.getOrElse(""))
msg.append("应用程序名称:" + applicationStart.appName)
msg.append("应用程序开始时间:" + new DateTime(applicationStart.time).toString("yyyy-MM-dd HH:mm:ss"))
println(msg.toString())
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
msg.append("应用程序结束时间:" + new DateTime(applicationEnd.time).toString("yyyy-MM-dd HH:mm:ss"))
println(msg.mkString("\n"))
EmailClient.sendEmail("Spark程序监控",Seq("272558733@qq.com"),Some(msg.mkString("\n")),None,None)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment