Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
operating-vehicle
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
杜发飞
operating-vehicle
Commits
109b0e80
Commit
109b0e80
authored
Oct 18, 2019
by
杜发飞
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1
parent
d4b37168
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
290 additions
and
109 deletions
+290
-109
src/main/java/com/hikcreate/ignite/pojo/alarm/DailyAlarmDeal.java
+7
-14
src/main/java/com/hikcreate/ignite/pojo/alarm/DailyAlarmProcess.java
+39
-0
src/main/java/com/hikcreate/ignite/pojo/basic/VehicleInfo.java
+152
-0
src/main/java/com/hikcreate/ignite/processor/DailyAlarmProcessUpdate.java
+12
-5
src/main/scala/com/hikcreate/data/SyncHive.scala
+1
-3
src/main/scala/com/hikcreate/data/SyncIgnite.scala
+50
-61
src/main/scala/com/hikcreate/data/client/IgniteClient.scala
+28
-25
src/test/scala/Test1.scala
+1
-1
No files found.
src/main/java/com/hikcreate/ignite/pojo/alarm/DailyAlarmDeal.java
View file @
109b0e80
...
@@ -8,28 +8,21 @@ public class DailyAlarmDeal implements Serializable {
...
@@ -8,28 +8,21 @@ public class DailyAlarmDeal implements Serializable {
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
long
serialVersionUID
=
1L
;
@QuerySqlField
@QuerySqlField
private
String
province
;
//省 浙江省
private
String
useNature
;
//使用性质 公交
@QuerySqlField
private
String
city
;
//市 杭州市
@QuerySqlField
private
String
area
;
//区 萧山区
@QuerySqlField
@QuerySqlField
private
String
useNature
;
//使用性质 公交
private
String
supervisionId
;
//报警督办 ID
@QuerySqlField
@QuerySqlField
private
String
date
;
//报警日期
private
String
warnTime
;
//报警时间
@QuerySqlField
@QuerySqlField
private
Boolean
isDeal
=
false
;
//是否处理
private
Boolean
isDeal
=
false
;
//是否处理
public
DailyAlarmDeal
(
String
province
,
String
city
,
String
area
,
String
useNature
,
String
date
)
{
public
DailyAlarmDeal
(
String
useNature
,
String
supervisionId
,
String
warnTime
,
Boolean
isDeal
)
{
this
.
province
=
province
;
this
.
city
=
city
;
this
.
area
=
area
;
this
.
useNature
=
useNature
;
this
.
useNature
=
useNature
;
this
.
date
=
date
;
this
.
supervisionId
=
supervisionId
;
this
.
warnTime
=
warnTime
;
this
.
isDeal
=
isDeal
;
}
}
}
}
src/main/java/com/hikcreate/ignite/pojo/alarm/DailyAlarmProcess.java
0 → 100644
View file @
109b0e80
package
com
.
hikcreate
.
ignite
.
pojo
.
alarm
;
import
org.apache.ignite.cache.query.annotations.QuerySqlField
;
import
java.io.Serializable
;
public
class
DailyAlarmProcess
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
@QuerySqlField
private
String
vehicleNo
;
//车牌号
@QuerySqlField
private
String
vehicleColor
;
//车牌颜色
@QuerySqlField
private
String
useNature
;
//使用性质 公交
@QuerySqlField
private
String
supervisionId
;
//报警督办 ID
@QuerySqlField
private
String
warnTime
;
//报警时间
@QuerySqlField
private
Boolean
isDeal
;
//是否处理
public
DailyAlarmProcess
(
String
useNature
,
String
supervisionId
,
String
warnTime
,
Boolean
isDeal
)
{
this
.
useNature
=
useNature
;
this
.
supervisionId
=
supervisionId
;
this
.
warnTime
=
warnTime
;
this
.
isDeal
=
isDeal
;
}
public
DailyAlarmProcess
(
Boolean
isDeal
)
{
this
.
isDeal
=
isDeal
;
}
}
src/main/java/com/hikcreate/ignite/pojo/basic/VehicleInfo.java
View file @
109b0e80
...
@@ -79,4 +79,156 @@ public class VehicleInfo implements Serializable{
...
@@ -79,4 +79,156 @@ public class VehicleInfo implements Serializable{
public
void
setUseNature
(
String
useNature
)
{
public
void
setUseNature
(
String
useNature
)
{
this
.
useNature
=
useNature
;
this
.
useNature
=
useNature
;
}
}
public
String
getId
()
{
return
id
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
String
getVehicleCode
()
{
return
vehicleCode
;
}
public
void
setVehicleCode
(
String
vehicleCode
)
{
this
.
vehicleCode
=
vehicleCode
;
}
public
String
getPlateNum
()
{
return
plateNum
;
}
public
void
setPlateNum
(
String
plateNum
)
{
this
.
plateNum
=
plateNum
;
}
public
String
getPlateColor
()
{
return
plateColor
;
}
public
void
setPlateColor
(
String
plateColor
)
{
this
.
plateColor
=
plateColor
;
}
public
Date
getIntoTime
()
{
return
intoTime
;
}
public
void
setIntoTime
(
Date
intoTime
)
{
this
.
intoTime
=
intoTime
;
}
public
String
getBusinessScope
()
{
return
businessScope
;
}
public
void
setBusinessScope
(
String
businessScope
)
{
this
.
businessScope
=
businessScope
;
}
public
String
getBusinessScopeDetail
()
{
return
businessScopeDetail
;
}
public
void
setBusinessScopeDetail
(
String
businessScopeDetail
)
{
this
.
businessScopeDetail
=
businessScopeDetail
;
}
public
String
getIndentifier
()
{
return
indentifier
;
}
public
void
setIndentifier
(
String
indentifier
)
{
this
.
indentifier
=
indentifier
;
}
public
Boolean
getIntoStatus
()
{
return
intoStatus
;
}
public
void
setIntoStatus
(
Boolean
intoStatus
)
{
this
.
intoStatus
=
intoStatus
;
}
public
String
getStatus
()
{
return
status
;
}
public
void
setStatus
(
String
status
)
{
this
.
status
=
status
;
}
public
String
getOperatingCertificateNo
()
{
return
operatingCertificateNo
;
}
public
void
setOperatingCertificateNo
(
String
operatingCertificateNo
)
{
this
.
operatingCertificateNo
=
operatingCertificateNo
;
}
public
String
getClassLine
()
{
return
classLine
;
}
public
void
setClassLine
(
String
classLine
)
{
this
.
classLine
=
classLine
;
}
public
String
getEnterpriseCode
()
{
return
enterpriseCode
;
}
public
void
setEnterpriseCode
(
String
enterpriseCode
)
{
this
.
enterpriseCode
=
enterpriseCode
;
}
public
String
getDrivingPermitNo
()
{
return
drivingPermitNo
;
}
public
void
setDrivingPermitNo
(
String
drivingPermitNo
)
{
this
.
drivingPermitNo
=
drivingPermitNo
;
}
public
String
getVehicleBrand
()
{
return
vehicleBrand
;
}
public
void
setVehicleBrand
(
String
vehicleBrand
)
{
this
.
vehicleBrand
=
vehicleBrand
;
}
public
String
getBusinessTime
()
{
return
businessTime
;
}
public
void
setBusinessTime
(
String
businessTime
)
{
this
.
businessTime
=
businessTime
;
}
public
String
getProvince
()
{
return
province
;
}
public
void
setProvince
(
String
province
)
{
this
.
province
=
province
;
}
public
String
getCity
()
{
return
city
;
}
public
void
setCity
(
String
city
)
{
this
.
city
=
city
;
}
public
String
getArea
()
{
return
area
;
}
public
void
setArea
(
String
area
)
{
this
.
area
=
area
;
}
}
}
src/main/java/com/hikcreate/ignite/processor/DailyAlarm
Deal
Update.java
→
src/main/java/com/hikcreate/ignite/processor/DailyAlarm
Process
Update.java
View file @
109b0e80
...
@@ -5,17 +5,24 @@ import org.apache.ignite.binary.BinaryObjectBuilder;
...
@@ -5,17 +5,24 @@ import org.apache.ignite.binary.BinaryObjectBuilder;
import
org.apache.ignite.cache.CacheEntryProcessor
;
import
org.apache.ignite.cache.CacheEntryProcessor
;
import
javax.cache.processor.EntryProcessorException
;
import
javax.cache.processor.EntryProcessorException
;
import
javax.cache.processor.MutableEntry
;
import
javax.cache.processor.MutableEntry
;
import
java.util.Map
;
public
class
DailyAlarmDealUpdate
implements
CacheEntryProcessor
<
BinaryObject
,
BinaryObject
,
Void
>
{
public
class
DailyAlarmProcessUpdate
implements
CacheEntryProcessor
<
BinaryObject
,
BinaryObject
,
Void
>
{
private
Map
<
String
,
Object
>
parameter
;
public
DailyAlarmProcessUpdate
(
Map
<
String
,
Object
>
parameter
){
this
.
parameter
=
parameter
;
}
@Override
@Override
public
Void
process
(
MutableEntry
<
BinaryObject
,
BinaryObject
>
mutableEntry
,
Object
...
objects
)
throws
EntryProcessorException
{
public
Void
process
(
MutableEntry
<
BinaryObject
,
BinaryObject
>
mutableEntry
,
Object
...
objects
)
throws
EntryProcessorException
{
BinaryObject
value
=
mutableEntry
.
getValue
();
BinaryObject
value
=
mutableEntry
.
getValue
();
if
(
value
!=
null
){
BinaryObjectBuilder
builder
=
value
.
toBuilder
();
BinaryObjectBuilder
builder
=
value
.
toBuilder
();
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
parameter
.
entrySet
())
{
builder
.
setField
(
"isDeal"
,
true
);
builder
.
setField
(
entry
.
getKey
(),
entry
.
getValue
());
mutableEntry
.
setValue
(
builder
.
build
());
}
}
mutableEntry
.
setValue
(
builder
.
build
());
return
null
;
return
null
;
}
}
}
}
src/main/scala/com/hikcreate/data/SyncHive.scala
View file @
109b0e80
...
@@ -102,8 +102,6 @@ object SyncHive extends Sparking with Logging {
...
@@ -102,8 +102,6 @@ object SyncHive extends Sparking with Logging {
if
(
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
)))
if
(
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
)))
||
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
)))){
||
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
)))){
val
addressAndLocation
=
Tools
.
getAddressAndLocationCode
(
json
.
getDouble
(
"lon"
)/
1000000
,
json
.
getDouble
(
"lat"
)/
1000000
)
val
addressAndLocation
=
Tools
.
getAddressAndLocationCode
(
json
.
getDouble
(
"lon"
)/
1000000
,
json
.
getDouble
(
"lat"
)/
1000000
)
println
(
addressAndLocation
.
_1
)
println
(
addressAndLocation
.
_2
)
json
.
put
(
"districtcode"
,
addressAndLocation
.
_2
)
json
.
put
(
"districtcode"
,
addressAndLocation
.
_2
)
}
}
//报警信息 增加区县代码和事件类型,经纬度,详细地址
//报警信息 增加区县代码和事件类型,经纬度,详细地址
...
@@ -117,7 +115,7 @@ object SyncHive extends Sparking with Logging {
...
@@ -117,7 +115,7 @@ object SyncHive extends Sparking with Logging {
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
val
addressAndLocation
=
Tools
.
getAddressAndLocationCode
(
infoJson
.
getDouble
(
"LONGITUDE"
)
,
infoJson
.
getDouble
(
"LATITUDE"
)
)
val
addressAndLocation
=
Tools
.
getAddressAndLocationCode
(
infoJson
.
getDouble
(
"LONGITUDE"
)
/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
)
json
.
put
(
"fulladdress"
,
addressAndLocation
.
_1
)
json
.
put
(
"fulladdress"
,
addressAndLocation
.
_1
)
println
(
addressAndLocation
.
_1
)
println
(
addressAndLocation
.
_1
)
json
.
put
(
"districtcode"
,
addressAndLocation
.
_2
)
json
.
put
(
"districtcode"
,
addressAndLocation
.
_2
)
...
...
src/main/scala/com/hikcreate/data/SyncIgnite.scala
View file @
109b0e80
...
@@ -3,19 +3,20 @@ package com.hikcreate.data
...
@@ -3,19 +3,20 @@ package com.hikcreate.data
import
com.hikcreate.data.common.
{
Logging
,
Sparking
}
import
com.hikcreate.data.common.
{
Logging
,
Sparking
}
import
com.hikcreate.data.constant.Const
import
com.hikcreate.data.constant.Const
import
com.hikcreate.data.util.
{
Tools
,
ZkManager
}
import
com.hikcreate.data.util.
{
Tools
,
ZkManager
}
import
com.hikcreate.data.client.IgniteClient
import
com.alibaba.fastjson.JSON
import
com.alibaba.fastjson.JSON
import
com.hikcreate.data.client.IgniteClient
import
com.hikcreate.data.client.IgniteClient.updateDailyAlarmDealCache
import
com.hikcreate.data.model.TableKey
import
com.hikcreate.data.model.TableKey
import
com.hikcreate.ignite.pojo.PrimaryKey
import
com.hikcreate.ignite.pojo.PrimaryKey
import
com.hikcreate.ignite.pojo.alarm.
{
DailyAlarm
Detail
,
DailyAlarm
,
DailyAlarmDeal
}
import
com.hikcreate.ignite.pojo.alarm.
{
DailyAlarm
,
DailyAlarmDetail
,
DailyAlarmProcess
}
import
com.hikcreate.ignite.pojo.basic.
{
AlarmInfo
,
AttachmentInfo
,
VehicleInfo
}
import
com.hikcreate.ignite.pojo.basic.
{
AlarmInfo
,
AttachmentInfo
,
VehicleInfo
}
import
com.hikcreate.ignite.pojo.vehicles.
{
AlarmNumber
,
DailyMileage
,
DriverNumber
,
VehicleNumber
}
import
com.hikcreate.ignite.pojo.vehicles.
{
AlarmNumber
,
DailyMileage
,
DriverNumber
,
VehicleNumber
}
import
org.apache.ignite.binary.BinaryObject
import
org.apache.ignite.cache.query.SqlQuery
import
org.apache.ignite.cache.query.SqlQuery
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.apache.spark.streaming.kafka010._
import
org.apache.spark.streaming.kafka010._
import
org.joda.time.DateTime
import
org.joda.time.DateTime
import
org.joda.time.format.DateTimeFormat
import
org.joda.time.format.DateTimeFormat
import
scala.collection.mutable.ArrayBuffer
import
scala.collection.mutable.ArrayBuffer
object
SyncIgnite
extends
Sparking
with
Logging
{
object
SyncIgnite
extends
Sparking
with
Logging
{
...
@@ -26,7 +27,6 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -26,7 +27,6 @@ object SyncIgnite extends Sparking with Logging{
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
applicationTopic
,
Const
.
igniteGroupId
)
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
applicationTopic
,
Const
.
igniteGroupId
)
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
1
))
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
1
))
ssc
.
sparkContext
.
setLogLevel
(
"WARN"
)
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
ssc
,
ssc
,
LocationStrategies
.
PreferConsistent
,
LocationStrategies
.
PreferConsistent
,
...
@@ -35,7 +35,7 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -35,7 +35,7 @@ object SyncIgnite extends Sparking with Logging{
offsetRanges
.
clear
()
offsetRanges
.
clear
()
offsetRanges
.
append
(
rdd
.
asInstanceOf
[
HasOffsetRanges
].
offsetRanges
:
_
*
)
offsetRanges
.
append
(
rdd
.
asInstanceOf
[
HasOffsetRanges
].
offsetRanges
:
_
*
)
rdd
rdd
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
if
(!
rdd
.
isEmpty
())
{
if
(!
rdd
.
isEmpty
())
{
rdd
.
foreachPartition
(
iterator
=>
processRow
(
iterator
))
rdd
.
foreachPartition
(
iterator
=>
processRow
(
iterator
))
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
igniteGroupId
)
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
igniteGroupId
)
...
@@ -46,31 +46,30 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -46,31 +46,30 @@ object SyncIgnite extends Sparking with Logging{
}
}
def
processRow
(
iterator
:
Iterator
[
String
])
:
Unit
=
{
def
processRow
(
iterator
:
Iterator
[
String
])
:
Unit
=
{
iterator
.
foreach
{
x
=>
iterator
.
foreach
{
x
=>
try
{
try
{
println
(
x
)
val
json
=
JSON
.
parseObject
(
x
)
val
json
=
JSON
.
parseObject
(
x
)
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))
match
{
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))
match
{
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
))
=>
//车辆定位消息
//车辆定位消息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
))
=>
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
lon
=
json
.
getDouble
(
"lon"
)/
1000000
val
lon
=
json
.
getDouble
(
"lon"
)/
1000000
val
lat
=
json
.
getDouble
(
"lat"
)/
1000000
val
lat
=
json
.
getDouble
(
"lat"
)/
1000000
val
code
=
Tools
.
getLocationCode
(
lon
,
lat
)
val
code
=
Tools
.
getLocationCode
(
lon
,
lat
)
val
time
=
json
.
getString
(
"dateTime"
)
val
time
=
json
.
getString
(
"dateTime"
)
//定位时间
val
date
=
DateTime
.
parse
(
time
,
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toLocalDate
.
toString
(
"yyyy-MM-dd"
)
val
date
=
DateTime
.
parse
(
time
,
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toLocalDate
.
toString
(
"yyyy-MM-dd"
)
val
vehicleNumberKey
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
vehicleNo
,
vehicleColor
))
val
vehicleNumberKey
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
vehicleNo
,
vehicleColor
))
val
vehicleNumberValue
=
new
VehicleNumber
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
)
val
vehicleNumberValue
=
new
VehicleNumber
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
)
IgniteClient
.
vehicleNumberCache
.
withKeepBinary
().
put
(
vehicleNumberKey
,
vehicleNumberValue
)
IgniteClient
.
vehicleNumberCache
.
withKeepBinary
().
put
(
vehicleNumberKey
,
vehicleNumberValue
)
val
mileageKey
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
))
val
mileageKey
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
))
if
(
IgniteClient
.
mileageCache
.
withKeepBinary
().
containsKey
(
mileageKey
)){
if
(
IgniteClient
.
mileageCache
.
withKeepBinary
().
containsKey
(
mileageKey
)){
IgniteClient
.
updateMileageCache
(
mileageKey
,
lat
,
lon
,
time
)
IgniteClient
.
updateMileageCache
(
mileageKey
,
lat
,
lon
,
time
)
}
else
{
}
else
{
val
value
=
new
DailyMileage
(
"33"
,
"0
9
"
,
"22"
,
""
,
date
,
lat
,
lon
,
time
,
0D
,
0L
)
val
value
=
new
DailyMileage
(
"33"
,
"0
1
"
,
"22"
,
""
,
date
,
lat
,
lon
,
time
,
0D
,
0L
)
IgniteClient
.
mileageCache
.
withKeepBinary
().
put
(
mileageKey
,
value
)
IgniteClient
.
mileageCache
.
withKeepBinary
().
put
(
mileageKey
,
value
)
}
}
/*val vehicleInfoOptional = getVehicleInfo(vehicleNo,vehicleColor)
/*val vehicleInfoOptional = getVehicleInfo(vehicleNo,vehicleColor)
if(vehicleInfoOptional.isDefined){
if(vehicleInfoOptional.isDefined){
val vehicleInfo = vehicleInfoOptional.get
val vehicleInfo = vehicleInfoOptional.get
...
@@ -90,19 +89,14 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -90,19 +89,14 @@ object SyncIgnite extends Sparking with Logging{
IgniteClient.updateMileageCache(mileageKey,lat,lon,time)//今日车辆在线情况 累计行驶 累计安全行驶里程 当前在线车辆
IgniteClient.updateMileageCache(mileageKey,lat,lon,time)//今日车辆在线情况 累计行驶 累计安全行驶里程 当前在线车辆
}
}
}*/
}*/
//车辆定位消息补报
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
))
=>
//车辆定位消息补报
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
))
=>
//定时上传驾驶员身份识别信息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1D00"
),
Some
(
"0x1d02"
))
=>
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1D00"
),
Some
(
"0x1d02"
))
=>
//定时上传驾驶员身份识别信息
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
driverId
=
json
.
getString
(
"driverId"
)
val
driverId
=
json
.
getString
(
"driverId"
)
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
driverId
))
val
vehicleInfoOptional
=
getVehicleInfo
(
vehicleNo
,
vehicleColor
)
val
value
=
new
DriverNumber
(
"33"
,
"09"
,
"22"
,
"班线车"
)
IgniteClient
.
driverNumberCache
.
withKeepBinary
().
put
(
key
,
value
)
/*val vehicleInfoOptional = getVehicleInfo(vehicleNo,vehicleColor)
if
(
vehicleInfoOptional
.
isDefined
){
if
(
vehicleInfoOptional
.
isDefined
){
val
vehicleInfo
=
vehicleInfoOptional
.
get
val
vehicleInfo
=
vehicleInfoOptional
.
get
val
vehicleProvince
=
vehicleInfo
.
getProvince
val
vehicleProvince
=
vehicleInfo
.
getProvince
...
@@ -112,13 +106,13 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -112,13 +106,13 @@ object SyncIgnite extends Sparking with Logging{
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
driverId
))
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
driverId
))
val
driverNumber
=
new
DriverNumber
(
vehicleProvince
,
vehicleCity
,
vehicleArea
,
useNature
)
val
driverNumber
=
new
DriverNumber
(
vehicleProvince
,
vehicleCity
,
vehicleArea
,
useNature
)
IgniteClient
.
driverNumberCache
.
withKeepBinary
().
put
(
key
,
driverNumber
)
//接入驾驶人数
IgniteClient
.
driverNumberCache
.
withKeepBinary
().
put
(
key
,
driverNumber
)
//接入驾驶人数
}
*/
}
//上报报警信息消息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
))
=>
//上报报警信息消息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
))
=>
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
timestamp
=
json
.
getString
(
"warnTime"
)
*
1000
val
warnType
=
json
.
getString
(
"warnType"
)
val
warnType
=
json
.
getString
(
"warnType"
)
val
timestamp
=
json
.
getLong
(
"warnTime"
)
*
1000
if
(
Const
.
warnTypes
.
contains
(
warnType
)){
if
(
Const
.
warnTypes
.
contains
(
warnType
)){
val
infoStr
=
json
.
getString
(
"infoContent"
)
val
infoStr
=
json
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
...
@@ -126,7 +120,6 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -126,7 +120,6 @@ object SyncIgnite extends Sparking with Logging{
val
lon
=
infoJson
.
getDouble
(
"LONGITUDE"
)/
1000000
val
lon
=
infoJson
.
getDouble
(
"LONGITUDE"
)/
1000000
val
lat
=
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
val
lat
=
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
val
code
=
Tools
.
getLocationCode
(
lon
,
lat
)
val
code
=
Tools
.
getLocationCode
(
lon
,
lat
)
//val timestamp = infoJson.getLong("WARN_TIME") * 1000
val
time
=
new
DateTime
(
timestamp
)
//com.alibaba.fastjson.JSONException: can not cast to long, value 0x000000005DA83E33
val
time
=
new
DateTime
(
timestamp
)
//com.alibaba.fastjson.JSONException: can not cast to long, value 0x000000005DA83E33
val
warnTime
=
time
.
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
warnTime
=
time
.
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
date
=
time
.
toString
(
"yyyy-MM-dd"
)
val
date
=
time
.
toString
(
"yyyy-MM-dd"
)
...
@@ -135,16 +128,12 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -135,16 +128,12 @@ object SyncIgnite extends Sparking with Logging{
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
))
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
))
//累计行驶报警数
//累计行驶报警数
val
alarmNumber
=
new
AlarmNumber
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
,
1L
)
val
alarmNumber
=
new
AlarmNumber
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
,
1L
)
if
(!
IgniteClient
.
alarmNumberCache
.
withKeepBinary
().
containsKey
(
key
)){
if
(!
IgniteClient
.
alarmNumberCache
.
withKeepBinary
().
putIfAbsent
(
key
,
alarmNumber
)){
IgniteClient
.
alarmNumberCache
.
withKeepBinary
().
put
(
key
,
alarmNumber
)
}
else
{
IgniteClient
.
updateAlarmNumberCache
(
key
)
IgniteClient
.
updateAlarmNumberCache
(
key
)
}
}
//今日报警数
//今日报警数
val
dailyAlarmNumber
=
new
DailyAlarm
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
,
date
,
1L
)
val
dailyAlarmNumber
=
new
DailyAlarm
(
code
.
_1
,
code
.
_2
,
code
.
_3
,
""
,
date
,
1L
)
if
(!
IgniteClient
.
dailyAlarmNumberCache
.
withKeepBinary
().
containsKey
(
key
)){
if
(!
IgniteClient
.
dailyAlarmNumberCache
.
withKeepBinary
().
putIfAbsent
(
key
,
dailyAlarmNumber
)){
IgniteClient
.
dailyAlarmNumberCache
.
withKeepBinary
().
put
(
key
,
dailyAlarmNumber
)
}
else
{
IgniteClient
.
updateDailyAlarmCache
(
key
,
warnTime
)
IgniteClient
.
updateDailyAlarmCache
(
key
,
warnTime
)
}
}
//今日报警详情
//今日报警详情
...
@@ -158,50 +147,50 @@ object SyncIgnite extends Sparking with Logging{
...
@@ -158,50 +147,50 @@ object SyncIgnite extends Sparking with Logging{
IgniteClient
.
dailyAlarmDetailCache
.
withKeepBinary
().
put
(
alarmDetailKey
,
dailyAlarmDetail
)
IgniteClient
.
dailyAlarmDetailCache
.
withKeepBinary
().
put
(
alarmDetailKey
,
dailyAlarmDetail
)
}
}
}
}
//智能视频报警附件上传结果上报
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1C00"
),
Some
(
"0x1c02"
))
=>
//智能视频报警附件上传结果上报
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x1C00"
),
Some
(
"0x1c02"
))
=>
json
.
remove
(
"msgId"
)
json
.
remove
(
"msgId"
)
json
.
remove
(
"dataType"
)
json
.
remove
(
"dataType"
)
val
timestamp
=
json
.
getLong
(
"warnTime"
)
*
1000
val
timestamp
=
json
.
getLong
(
"warnTime"
)
*
1000
val
t
ime
=
new
DateTime
(
timestamp
)
val
dateT
ime
=
new
DateTime
(
timestamp
)
val
warnTime
=
t
ime
.
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
warnTime
=
dateT
ime
.
toString
(
"yyyy-MM-dd HH:mm:ss"
)
json
.
put
(
"warnTime"
,
warnTime
)
json
.
put
(
"warnTime"
,
warnTime
)
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
json
.
getString
(
"vehicleNo"
),
json
.
getString
(
"vehicleColor"
),
val
key
=
IgniteClient
.
getBinaryObject
(
json
.
getString
(
"deviceId"
),
json
.
getString
(
"warnTime"
),
json
.
getString
(
"fileIndex"
)))
new
PrimaryKey
(
json
.
getString
(
"vehicleNo"
),
json
.
getString
(
"vehicleColor"
),
json
.
getString
(
"deviceId"
),
json
.
getString
(
"warnTime"
),
json
.
getString
(
"fileIndex"
))
)
val
value
=
JSON
.
parseObject
(
json
.
toJSONString
,
classOf
[
AttachmentInfo
])
val
value
=
JSON
.
parseObject
(
json
.
toJSONString
,
classOf
[
AttachmentInfo
])
IgniteClient
.
attachmentCache
.
withKeepBinary
().
put
(
key
,
value
)
IgniteClient
.
attachmentCache
.
withKeepBinary
().
put
(
key
,
value
)
//报警督办请求信息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x
1400"
),
Some
(
"0x1401"
))
=>
//报警督办请求信息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x
9400"
),
Some
(
"0x9401"
))
=>
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
supervisionId
=
json
.
getString
(
"supervisionId"
)
val
superVisionId
=
json
.
getString
(
"superVisionId"
)
val
warnTime
=
json
.
getString
(
"warnTime"
)
//UTC时间格式
val
timestamp
=
json
.
getLong
(
"warnTime"
)
*
1000
//关联上报报警信息消息得到省市区
val
time
=
new
DateTime
(
timestamp
)
val
alarmDetail
:
BinaryObject
=
IgniteClient
.
dailyAlarmDetailCache
.
withKeepBinary
().
get
(
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
,
warnTime
)))
val
warnTime
=
time
.
toString
(
"yyyy-MM-dd HH:mm:ss"
)
if
(
alarmDetail
!=
null
){
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
,
superVisionId
))
val
province
:
String
=
alarmDetail
.
field
(
"province"
)
val
value
=
new
DailyAlarmProcess
(
""
,
superVisionId
,
warnTime
,
false
)
val
city
:
String
=
alarmDetail
.
field
(
"city"
)
if
(!
IgniteClient
.
dailyAlarmProcessCache
.
withKeepBinary
().
putIfAbsent
(
key
,
value
)){
val
area
:
String
=
alarmDetail
.
field
(
"area"
)
updateDailyAlarmDealCache
(
key
,
Map
(
val
useNature
:
String
=
alarmDetail
.
field
(
"useNature"
)
"useNature"
->
""
,
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
,
supervisionId
))
"superVisionId"
->
superVisionId
,
val
value
=
new
DailyAlarmDeal
(
province
,
city
,
area
,
useNature
,
DateTime
.
parse
(
warnTime
,
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd'T'HH:mm:ss'Z'"
)).
toString
(
"yyyy-MM-dd"
))
"warnTime"
->
warnTime
))
IgniteClient
.
dailyAlarmDealCache
.
withKeepBinary
().
put
(
key
,
value
)
}
}
//报警督办应答消息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x
9400"
),
Some
(
"0x9401"
))
=>
//报警督办应答消息
case
tableKey
if
tableKey
==
TableKey
(
Some
(
"0x
1400"
),
Some
(
"0x1401"
))
=>
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleNo
=
json
.
getString
(
"vehicleNo"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
vehicleColor
=
json
.
getString
(
"vehicleColor"
)
val
supervisionId
=
json
.
getString
(
"supervisionId"
)
val
superVisionId
=
json
.
getString
(
"superVisionId"
)
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
,
supervisionId
))
val
key
=
IgniteClient
.
getBinaryObject
(
new
PrimaryKey
(
vehicleNo
,
vehicleColor
,
superVisionId
))
if
(
IgniteClient
.
dailyAlarmDealCache
.
withKeepBinary
().
containsKey
(
key
)){
val
value
=
new
DailyAlarmProcess
(
true
)
IgniteClient
.
updateDailyAlarmDealCache
(
key
)
// 更新dailyAlarmDealCache为处理
if
(!
IgniteClient
.
dailyAlarmProcessCache
.
withKeepBinary
().
putIfAbsent
(
key
,
value
)){
updateDailyAlarmDealCache
(
key
,
Map
(
"isDeal"
->
(
true
:
java.lang.Boolean
)))
}
}
case
_
=>
case
_
=>
}
}
}
catch
{
}
catch
{
case
e
:
Exception
=>
case
e
:
Exception
=>
println
(
x
)
e
.
printStackTrace
()
e
.
printStackTrace
()
}
}
}
}
...
...
src/main/scala/com/hikcreate/data/client/IgniteClient.scala
View file @
109b0e80
...
@@ -2,8 +2,10 @@ package com.hikcreate.data.client
...
@@ -2,8 +2,10 @@ package com.hikcreate.data.client
import
java.util
import
java.util
import
java.util.concurrent.TimeUnit
import
java.util.concurrent.TimeUnit
import
com.alibaba.fastjson.JSON
import
com.hikcreate.ignite.pojo.PrimaryKey
import
com.hikcreate.ignite.pojo.PrimaryKey
import
com.hikcreate.ignite.pojo.alarm.
{
DailyAlarm
,
DailyAlarmDeal
,
DailyAlarmDetail
}
import
com.hikcreate.ignite.pojo.alarm.
{
DailyAlarm
,
DailyAlarmDeal
,
DailyAlarmDetail
,
DailyAlarmProcess
}
import
com.hikcreate.ignite.pojo.basic._
import
com.hikcreate.ignite.pojo.basic._
import
com.hikcreate.ignite.pojo.vehicles.
{
AlarmNumber
,
DailyMileage
,
DriverNumber
,
VehicleNumber
}
import
com.hikcreate.ignite.pojo.vehicles.
{
AlarmNumber
,
DailyMileage
,
DriverNumber
,
VehicleNumber
}
import
com.hikcreate.ignite.processor._
import
com.hikcreate.ignite.processor._
...
@@ -13,6 +15,8 @@ import org.apache.ignite.cache.{CacheMode, QueryEntity}
...
@@ -13,6 +15,8 @@ import org.apache.ignite.cache.{CacheMode, QueryEntity}
import
org.apache.ignite.cache.query.SqlFieldsQuery
import
org.apache.ignite.cache.query.SqlFieldsQuery
import
org.apache.ignite.configuration.
{
CacheConfiguration
,
IgniteConfiguration
}
import
org.apache.ignite.configuration.
{
CacheConfiguration
,
IgniteConfiguration
}
import
org.apache.ignite.
{
Ignite
,
IgniteCache
,
Ignition
}
import
org.apache.ignite.
{
Ignite
,
IgniteCache
,
Ignition
}
import
org.joda.time.DateTime
import
scala.collection.JavaConverters._
import
scala.collection.JavaConverters._
/**
/**
* binary type无法自动更改
* binary type无法自动更改
...
@@ -182,6 +186,27 @@ object IgniteClient {
...
@@ -182,6 +186,27 @@ object IgniteClient {
}
}
/**
/**
* 今日报警处理数
* 主键:车牌号 车牌颜色 报警督办ID
* 数据来源: 报警督办请求消息 报警督办应答消息
*/
lazy
val
dailyAlarmProcessCache
:
IgniteCache
[
PrimaryKey
,
DailyAlarmProcess
]
=
ignite
.
getOrCreateCache
(
new
CacheConfiguration
[
PrimaryKey
,
DailyAlarmProcess
]()
.
setSqlSchema
(
"Alarm"
)
.
setName
(
"DailyAlarmProcess"
)
.
setIndexedTypes
(
classOf
[
PrimaryKey
],
classOf
[
DailyAlarmProcess
])
.
setDataRegionName
(
"500MB_Region"
)
.
setCacheMode
(
CacheMode
.
REPLICATED
)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
def
updateDailyAlarmDealCache
(
key
:
BinaryObject
,
map
:
Map
[
String
,
AnyRef
])
:
Unit
=
{
dailyAlarmProcessCache
.
withKeepBinary
()
.
invoke
(
key
,
new
DailyAlarmProcessUpdate
(
map
.
asJava
))
}
/**
* 今日报警详情
* 今日报警详情
* 主键:车牌号 车牌颜色 报警时间 信息 ID
* 主键:车牌号 车牌颜色 报警时间 信息 ID
* 数据来源:上报报警信息消息,智能视频报警附件上传结果上报
* 数据来源:上报报警信息消息,智能视频报警附件上传结果上报
...
@@ -197,33 +222,11 @@ object IgniteClient {
...
@@ -197,33 +222,11 @@ object IgniteClient {
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
)
/**
* 今日报警处理量
* 主键:车牌号 车牌颜色 报警督办ID
* 数据来源: 报警督办请求消息 报警督办应答消息
*/
lazy
val
dailyAlarmDealCache
:
IgniteCache
[
PrimaryKey
,
DailyAlarmDeal
]
=
ignite
.
getOrCreateCache
(
new
CacheConfiguration
[
PrimaryKey
,
DailyAlarmDeal
]()
.
setSqlSchema
(
"Alarm"
)
.
setName
(
"DailyAlarmDeal"
)
.
setIndexedTypes
(
classOf
[
PrimaryKey
],
classOf
[
DailyAlarmDeal
])
.
setDataRegionName
(
"500MB_Region"
)
.
setCacheMode
(
CacheMode
.
REPLICATED
)
//.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.HOURS,24)))
)
def
updateDailyAlarmDealCache
(
key
:
BinaryObject
)
:
Unit
=
{
dailyAlarmDealCache
.
withKeepBinary
()
.
invoke
(
key
,
new
DailyAlarmDealUpdate
())
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
a
=
ignite
.
binary
.
`type`
(
"com.hikcreate.ignite.pojo.alarm.AlarmDetail"
)
val
a
=
ignite
.
binary
.
`type`
(
"com.hikcreate.ignite.pojo.alarm.AlarmDetail"
)
val
builder
=
ignite
.
binary
().
builder
(
"com.hikcreate.ignite.pojo.alarm.AlarmDetail"
)
ignite
.
binary
()
val
build
=
builder
.
setField
(
"warnType"
,
1L
,
classOf
[
Long
]).
build
()
println
(
a
.
fieldTypeName
(
"warnType"
))
//无法自动更新修改后的字段类型
println
(
a
.
fieldTypeName
(
"warnType"
))
//无法自动更新修改后的字段类型
//alarmDetailCache.destroy()
//alarmDetailCache.destroy()
//basicCompanyInfo.clear()
//basicCompanyInfo.clear()
...
...
src/test/scala/Test1.scala
View file @
109b0e80
...
@@ -5,7 +5,7 @@ object Test1 {
...
@@ -5,7 +5,7 @@ object Test1 {
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
a
=
Tools
.
getAddressAndLocationCode
(
1
15.0031
,
27.2773
)
val
a
=
Tools
.
getAddressAndLocationCode
(
1
20.485443
,
30.183996
)
println
(
a
.
_1
+
"----"
+
a
.
_2
)
println
(
a
.
_1
+
"----"
+
a
.
_2
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment