Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
operating-vehicle
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
杜发飞
operating-vehicle
Commits
31d52b5c
Commit
31d52b5c
authored
Nov 05, 2019
by
杜发飞
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/master'
parents
9662d60b
34f0acb4
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
879 additions
and
42 deletions
+879
-42
pom.xml
+11
-1
src/main/resources/conf.properties
+10
-10
src/main/scala/com/hikcreate/data/constant/Const.scala
+3
-2
src/main/scala/com/hikcreate/data/sync/SyncHive.scala
+140
-26
src/main/scala/com/hikcreate/data/sync/SysncHiveBatch.scala
+546
-0
src/main/scala/com/hikcreate/data/util/HDFSHelper.scala
+166
-0
src/main/scala/com/hikcreate/data/util/Tools.scala
+3
-3
No files found.
pom.xml
View file @
31d52b5c
...
@@ -6,7 +6,7 @@
...
@@ -6,7 +6,7 @@
<groupId>
groupId
</groupId>
<groupId>
groupId
</groupId>
<artifactId>
operating-vehicle
</artifactId>
<artifactId>
operating-vehicle
</artifactId>
<version>
1.
0
-SNAPSHOT
</version>
<version>
1.
3
-SNAPSHOT
</version>
<repositories>
<repositories>
<repository>
<repository>
...
@@ -27,6 +27,16 @@
...
@@ -27,6 +27,16 @@
<dependencies>
<dependencies>
<dependency>
<dependency>
<groupId>
com.ctrip.framework.apollo
</groupId>
<artifactId>
apollo-client
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
com.ctrip.framework.apollo
</groupId>
<artifactId>
apollo-core
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-core_${scala.binary.version}
</artifactId>
<artifactId>
spark-core_${scala.binary.version}
</artifactId>
<version>
${spark.version}
</version>
<version>
${spark.version}
</version>
...
...
src/main/resources/conf.properties
View file @
31d52b5c
...
@@ -4,7 +4,9 @@ kafka.bootstrap.servers=39.100.49.76:9092
...
@@ -4,7 +4,9 @@ kafka.bootstrap.servers=39.100.49.76:9092
kafka.zookerper.servers
=
10.197.236.154:2181
kafka.zookerper.servers
=
10.197.236.154:2181
#,10.197.236.169:2181,10.197.236.184:2181/kafka
#,10.197.236.169:2181,10.197.236.184:2181/kafka
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
#kafka.zookerper.servers=10.197.236.211:2181
window.time
=
5
compact.kafka.topic
=
transport_basedata_operation,tbd-transport-data-gathering
application.kafka.topic
=
tbd-transport-data-gathering
application.kafka.topic
=
tbd-transport-data-gathering
basicsInfo.kafka.topic
=
transport_basedata_operation
basicsInfo.kafka.topic
=
transport_basedata_operation
hive.group.id
=
hive
hive.group.id
=
hive
...
@@ -16,10 +18,8 @@ hive.url=jdbc:hive2://hadoop02:10000/ods
...
@@ -16,10 +18,8 @@ hive.url=jdbc:hive2://hadoop02:10000/ods
hive.username
=
hive
hive.username
=
hive
hive.password
=
hive
hive.password
=
hive
areaCodeAndAddress.Url
=
http://10.197.236.100:40612/bcpbase/geocode/regeo
areaCodeAndAddress.Url
=
http://10.197.236.100:40612/bcpbase/geocode/regeo
warnTypes
=
0x0064,0x0065,0x0066
warnTypes
=
0x0064,0x0065,0x0066
hive.unknown.table
=
KAFKA_UNKNOWN_I
hive.unknown.table
=
unknown_mes
###链路管理类
###链路管理类
hive.UP_CONNECT_REQ.table
=
KAFKA_UP_CONNECT_I
hive.UP_CONNECT_REQ.table
=
KAFKA_UP_CONNECT_I
hive.UP_DISCONNECT_REQ.table
=
KAFKA_UP_DISCONNECT_INFORM_I
hive.UP_DISCONNECT_REQ.table
=
KAFKA_UP_DISCONNECT_INFORM_I
...
@@ -49,8 +49,8 @@ hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table=KAFKA_UP_PREVENTION_EXG_MSG_DEVICE
...
@@ -49,8 +49,8 @@ hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table=KAFKA_UP_PREVENTION_EXG_MSG_DEVICE
###智能视频报警交互消息类
###智能视频报警交互消息类
hive.UP_PREVENTION_MSG_FILE_COMPLETE.table
=
KAFKA_UP_PREVENTION_MSG_FILE_COMPLETE_I
hive.UP_PREVENTION_MSG_FILE_COMPLETE.table
=
KAFKA_UP_PREVENTION_MSG_FILE_COMPLETE_I
#基础信息hive表
#基础信息hive表
hive.KAFKA_base_into_platform_info.table
=
KAFKA_base_into_platform_info_I
hive.KAFKA_base_into_platform_info.table
=
kafka_base_into_platform_info_i
hive.KAFKA_base_into_enterprise_info.table
=
KAFKA_base_into_enterprise_info_I
hive.KAFKA_base_into_enterprise_info.table
=
kafka_base_into_enterprise_info_i
hive.KAFKA_base_into_vehicle_info.table
=
KAFKA_base_into_vehicle_info_I
hive.KAFKA_base_into_vehicle_info.table
=
kafka_base_into_vehicle_info_i
hive.KAFKA_base_data_display_config.table
=
KAFKA_base_data_display_config_I
hive.KAFKA_base_data_display_config.table
=
kafka_base_data_display_config_i
hive.KAFKA_base_warning_type.table
=
KAFKA_base_warning_type_I
hive.KAFKA_base_warning_type.table
=
kafka_base_warning_type_i
\ No newline at end of file
\ No newline at end of file
src/main/scala/com/hikcreate/data/constant/Const.scala
View file @
31d52b5c
...
@@ -6,10 +6,10 @@ import com.hikcreate.data.util.Config
...
@@ -6,10 +6,10 @@ import com.hikcreate.data.util.Config
object
Const
{
object
Const
{
Config
.
load
(
"conf.properties"
)
Config
.
load
(
"conf.properties"
)
val
windowTime
:
Int
=
Config
.
getInt
(
"window.time"
)
val
bootstrap
:
String
=
Config
.
getString
(
"kafka.bootstrap.servers"
)
val
bootstrap
:
String
=
Config
.
getString
(
"kafka.bootstrap.servers"
)
val
zkKafka
:
String
=
Config
.
getString
(
"kafka.zookerper.servers"
)
val
zkKafka
:
String
=
Config
.
getString
(
"kafka.zookerper.servers"
)
val
compactTopic
:
Array
[
String
]
=
Config
.
getString
(
"compact.kafka.topic"
).
split
(
","
)
val
applicationTopic
:
Array
[
String
]
=
Config
.
getString
(
"application.kafka.topic"
).
split
(
","
)
val
applicationTopic
:
Array
[
String
]
=
Config
.
getString
(
"application.kafka.topic"
).
split
(
","
)
val
basicsInfoTopic
:
Array
[
String
]
=
Config
.
getString
(
"basicsInfo.kafka.topic"
).
split
(
","
)
val
basicsInfoTopic
:
Array
[
String
]
=
Config
.
getString
(
"basicsInfo.kafka.topic"
).
split
(
","
)
val
hiveGroupId
:
String
=
Config
.
getString
(
"hive.group.id"
)
val
hiveGroupId
:
String
=
Config
.
getString
(
"hive.group.id"
)
...
@@ -23,6 +23,7 @@ object Const {
...
@@ -23,6 +23,7 @@ object Const {
val
hivePassword
:
String
=
Config
.
getString
(
"hive.password"
)
val
hivePassword
:
String
=
Config
.
getString
(
"hive.password"
)
val
areaCodeAndAddressUrl
:
String
=
Config
.
getString
(
"areaCodeAndAddress.Url"
)
val
areaCodeAndAddressUrl
:
String
=
Config
.
getString
(
"areaCodeAndAddress.Url"
)
val
unKnownTable
:
String
=
Config
.
getString
(
"hive.unknown.table"
)
val
unKnownTable
:
String
=
Config
.
getString
(
"hive.unknown.table"
)
val
hdfsUrl
:
String
=
Config
.
getString
(
"hdfs.url"
)
val
warnTypes
:
Array
[
String
]
=
Config
.
getString
(
"warnTypes"
).
split
(
","
)
val
warnTypes
:
Array
[
String
]
=
Config
.
getString
(
"warnTypes"
).
split
(
","
)
...
...
src/main/scala/com/hikcreate/data/sync/SyncHive.scala
View file @
31d52b5c
...
@@ -11,19 +11,21 @@ import com.hikcreate.data.model.TableKey
...
@@ -11,19 +11,21 @@ import com.hikcreate.data.model.TableKey
import
com.hikcreate.data.util.
{
Tools
,
ZkManager
}
import
com.hikcreate.data.util.
{
Tools
,
ZkManager
}
import
org.apache.spark.streaming.kafka010._
import
org.apache.spark.streaming.kafka010._
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.joda.time.
DateTime
import
org.joda.time.
{
DateTime
,
Duration
}
import
org.joda.time.format.DateTimeFormat
import
org.joda.time.format.DateTimeFormat
import
scala.collection.mutable.ArrayBuffer
import
scala.collection.mutable.ArrayBuffer
object
SyncHive
extends
Sparking
with
Logging
{
object
SyncHive
extends
Sparking
with
Logging
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
zkManager
=
ZkManager
(
Const
.
zkKafka
)
val
zkManager
=
ZkManager
(
Const
.
zkKafka
)
val
kafkaParams
=
getKafkaParams
(
Const
.
bootstrap
,
Const
.
hiveGroupId
)
val
kafkaParams
=
getKafkaParams
(
Const
.
bootstrap
,
Const
.
hiveGroupId
)
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
applicationTopic
,
Const
.
hiveGroupId
)
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
applicationTopic
,
Const
.
hiveGroupId
)
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
1
))
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
5
))
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
ssc
,
ssc
,
LocationStrategies
.
PreferConsistent
,
LocationStrategies
.
PreferConsistent
,
...
@@ -34,14 +36,111 @@ object SyncHive extends Sparking with Logging {
...
@@ -34,14 +36,111 @@ object SyncHive extends Sparking with Logging {
rdd
rdd
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
if
(!
rdd
.
isEmpty
()){
if
(!
rdd
.
isEmpty
()){
rdd
.
foreachPartition
(
iterator
=>
processRow
(
iterator
))
val
startTime
=
DateTime
.
now
()
rdd
.
map
(
JSON
.
parseObject
).
groupBy
(
json
=>
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))).
foreachPartition
(
x
=>
processRow2
(
x
))
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
hiveGroupId
)
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
hiveGroupId
)
offsetRanges
.
foreach
{
x
=>
println
(
x
)
}
// println(offsetRanges(0))
val
endTime
=
DateTime
.
now
()
println
(
DateTime
.
now
()+
"==============time token: "
+
new
Duration
(
startTime
,
endTime
).
getMillis
+
"ms=============="
)
}
}
}
}
ssc
.
start
()
ssc
.
start
()
ssc
.
awaitTermination
()
ssc
.
awaitTermination
()
}
}
def
processRow2
(
x
:
Iterator
[(
TableKey
,
Iterable
[
JSONObject
])])
:
Unit
=
{
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
DbClient
.
usingDB
(
Const
.
hivePoolName
)
{
db
=>
x
.
foreach
{
x
=>try
{
val
tableKey
=
x
.
_1
if
(!
Const
.
tableMap
.
contains
(
tableKey
)
&&
tableKey
.
msgId
!=
null
)
{
//未知消息
x
.
_2
.
foreach
{
json
=>
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
json
)
}
}
else
if
(
tableKey
.
msgId
==
null
){
//基础信息
x
.
_2
.
foreach
{
json
=>
writeBaseInfoHive
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
json
)
}
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
))){
//定位补报
var
jsonList
=new
ArrayBuffer
[
JSONObject
]()
val
flat
=
x
.
_2
.
flatMap
(
x
=>
Tools
.
addLocation
(
x
))
val
value
=
flat
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>(
x
.
getDouble
(
"lon"
)/
1000000
,
x
.
getDouble
(
"lat"
)/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonList
.
append
(
json
)
writeHiveTable
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
json
)
}
}
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
))){
//定位消息
val
value
=
x
.
_2
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>(
x
.
getDouble
(
"lon"
)/
1000000
,
x
.
getDouble
(
"lat"
)/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
writeHiveTable
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
json
)
}
}
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
))){
//报警上传
var
useFul
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
val
warnType
=
json
.
getString
(
"warnType"
)
if
(
Const
.
warnTypes
.
contains
(
warnType
)){
useFul
.
append
(
json
)
}
else
{
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
json
)
}
}
val
value
=
useFul
.
filter
(
x
=>
Const
.
warnTypes
.
contains
(
x
.
getString
(
"warnType"
))).
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
={
Tools
.
getAddressAndLocationCodes
(
sub
.
map
{
t
=>
val
infoStr
=
t
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
(
infoJson
.
getDouble
(
"LONGITUDE"
)/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
)
}
)
}
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
val
infoStr
=
json
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
val
longitude
=
infoJson
.
get
(
"LONGITUDE"
)
val
latitude
=
infoJson
.
get
(
"LATITUDE"
)
val
eventtype
=
infoJson
.
get
(
"EVENT_TYPE"
)
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
json
.
put
(
"fulladdress"
,
location
.
_1
)
json
.
put
(
"districtcode"
,
location
.
_2
)
writeHiveTable
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
json
)
}
}
}
else
{
//除了以上几种情况外的消息
x
.
_2
.
foreach
{
json
=>
writeHiveTable
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
json
)
}
}
}
catch
{
case
e
:
Exception
=>
println
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
e
.
printStackTrace
()
}
}
}
}
def
processRow
(
iterator
:
Iterator
[
String
])
:
Unit
=
{
def
processRow
(
iterator
:
Iterator
[
String
])
:
Unit
=
{
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
DbClient
.
usingDB
(
Const
.
hivePoolName
){
db
=>
DbClient
.
usingDB
(
Const
.
hivePoolName
){
db
=>
...
@@ -79,8 +178,13 @@ object SyncHive extends Sparking with Logging {
...
@@ -79,8 +178,13 @@ object SyncHive extends Sparking with Logging {
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))}
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))}
try
{
try
{
stmt
.
execute
()
stmt
.
execute
()
println
(
s
"insert date to HiveTable $tableName SUCCESS"
)
info
(
s
"insert date to HiveTable $tableName SUCCESS"
)
}
finally
{
}
catch
{
case
e
:
Exception
=>{
println
(
"Exception Messages==>"
+
e
)
println
(
s
"hive table $tableName insert data failed==>"
+
json
)
}
}
finally
{
stmt
.
close
()
stmt
.
close
()
}
}
}
}
...
@@ -101,27 +205,27 @@ object SyncHive extends Sparking with Logging {
...
@@ -101,27 +205,27 @@ object SyncHive extends Sparking with Logging {
def
writeHiveTable
(
conn
:
Connection
,
tableName
:
String
,
json
:
JSONObject
)
:
Unit
=
{
def
writeHiveTable
(
conn
:
Connection
,
tableName
:
String
,
json
:
JSONObject
)
:
Unit
=
{
//定位消息,增加区县代码字段及值
//定位消息,增加区县代码字段及值
if
(
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
)))
//
if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
||
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
)))){
//
|| tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
val
addressAndLocation
=
Tools
.
getAddressAndLocationCode
(
json
.
getDouble
(
"lon"
)/
1000000
,
json
.
getDouble
(
"lat"
)/
1000000
)
//
val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
json
.
put
(
"districtcode"
,
addressAndLocation
.
_2
)
//
json.put("districtcode",addressAndLocation._2)
}
//
}
//报警信息 增加区县代码和事件类型,经纬度,详细地址
//报警信息 增加区县代码和事件类型,经纬度,详细地址
if
(
tableName
==
Const
.
tableMap
(
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
)))){
//
if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
val
warnType
=
json
.
getString
(
"warnType"
)
//
val warnType = json.getString("warnType")
if
(!
Const
.
warnTypes
.
contains
(
warnType
)){
//
if (!Const.warnTypes.contains(warnType)){
writeUnknown
(
conn
,
Const
.
unKnownTable
,
json
)
//
writeUnknown(conn,Const.unKnownTable,json)
}
//
}
val
infoStr
=
json
.
getString
(
"infoContent"
)
//
val infoStr = json.getString("infoContent")
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
//
val infoJson = Tools.getInfoContentJsonobj(infoStr)
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
//
json.put("longitude",infoJson.get("LONGITUDE"))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
//
json.put("latitude",infoJson.get("LATITUDE"))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
//
json.put("eventtype",infoJson.get("EVENT_TYPE"))
val
addressAndLocation
=
Tools
.
getAddressAndLocationCode
(
infoJson
.
getDouble
(
"LONGITUDE"
)/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
)
//
val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
json
.
put
(
"fulladdress"
,
addressAndLocation
.
_1
)
//
json.put("fulladdress",addressAndLocation._1)
println
(
addressAndLocation
.
_1
)
//
println(addressAndLocation._1)
json
.
put
(
"districtcode"
,
addressAndLocation
.
_2
)
//
json.put("districtcode",addressAndLocation._2)
}
//
}
val
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
val
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
val
day
=
if
(
keys
.
contains
(
"dateTime"
))
{
val
day
=
if
(
keys
.
contains
(
"dateTime"
))
{
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
...
@@ -130,6 +234,8 @@ object SyncHive extends Sparking with Logging {
...
@@ -130,6 +234,8 @@ object SyncHive extends Sparking with Logging {
}
else
{
}
else
{
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
}
}
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
val
sql
=
val
sql
=
s
"""
s
"""
|insert into $tableName
|insert into $tableName
...
@@ -140,8 +246,16 @@ object SyncHive extends Sparking with Logging {
...
@@ -140,8 +246,16 @@ object SyncHive extends Sparking with Logging {
val
stmt
=
conn
.
prepareStatement
(
sql
)
val
stmt
=
conn
.
prepareStatement
(
sql
)
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
try
{
try
{
createPartitionStmt
.
execute
()
stmt
.
execute
()
stmt
.
execute
()
}
finally
{
info
(
s
"insert date to HiveTable $tableName SUCCESS"
)
}
catch
{
case
e
:
Exception
=>{
println
(
"Exception Messages==>"
+
e
)
println
(
s
"hive table $tableName insert data failed==>"
+
json
)
}
}
finally
{
createPartitionStmt
.
close
()
stmt
.
close
()
stmt
.
close
()
}
}
}
}
...
...
src/main/scala/com/hikcreate/data/sync/SysncHiveBatch.scala
0 → 100644
View file @
31d52b5c
package
com.hikcreate.data.sync
import
java.io.ByteArrayInputStream
import
java.net.URI
import
java.sql.
{
Connection
,
ResultSet
}
import
java.util.Locale
import
com.alibaba.fastjson.
{
JSON
,
JSONObject
}
import
com.hikcreate.data.client.DbClient
import
com.hikcreate.data.common.
{
Logging
,
Sparking
}
import
com.hikcreate.data.constant.Const
import
com.hikcreate.data.model.TableKey
import
com.hikcreate.data.util.
{
HDFSHelper
,
Tools
,
ZkManager
}
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.apache.spark.streaming.kafka010._
import
org.joda.time.
{
DateTime
,
Duration
}
import
org.joda.time.format.DateTimeFormat
import
org.apache.hadoop.conf.Configuration
import
org.apache.hadoop.fs.
{
FileSystem
,
Path
}
import
org.apache.hadoop.hdfs.DistributedFileSystem
import
scala.collection.mutable.ArrayBuffer
object
SysncHiveBatch
extends
Sparking
with
Logging
{
val
hdfs
:
DistributedFileSystem
=
new
DistributedFileSystem
()
hdfs
.
initialize
(
URI
.
create
(
Const
.
hdfsUrl
),
new
Configuration
())
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
Const
.
windowTime
))
val
zkManager
=
ZkManager
(
Const
.
zkKafka
)
val
kafkaParams
=
getKafkaParams
(
Const
.
bootstrap
,
Const
.
hiveGroupId
)
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
compactTopic
,
Const
.
hiveGroupId
)
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
ssc
,
LocationStrategies
.
PreferConsistent
,
ConsumerStrategies
.
Subscribe
[
String
,
String
](
Const
.
compactTopic
,
kafkaParams
,
offsets
))
inputStream
.
transform
{
rdd
=>
offsetRanges
.
clear
()
offsetRanges
.
append
(
rdd
.
asInstanceOf
[
HasOffsetRanges
].
offsetRanges
:
_
*
)
rdd
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
if
(!
rdd
.
isEmpty
())
{
val
startTime
=
DateTime
.
now
()
rdd
.
map
(
JSON
.
parseObject
).
groupBy
(
json
=>
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))).
foreachPartition
(
x
=>
processRow3
(
x
))
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
hiveGroupId
)
offsetRanges
.
foreach
{
x
=>
println
(
x
)
}
val
endTime
=
DateTime
.
now
()
println
(
DateTime
.
now
()
+
"==============This Batch Time Token: "
+
new
Duration
(
startTime
,
endTime
).
getMillis
+
" ms=============="
)
}
}
ssc
.
start
()
ssc
.
awaitTermination
()
}
def
processRow3
(
x
:
Iterator
[(
TableKey
,
Iterable
[
JSONObject
])])
:
Unit
=
{
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
DbClient
.
usingDB
(
Const
.
hivePoolName
)
{
db
=>
x
.
foreach
{
x
=>
try
{
val
tableKey
=
x
.
_1
if
(!
Const
.
tableMap
.
contains
(
tableKey
)
&&
tableKey
.
msgId
!=
None
)
{
//未知消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
jsonArr
.
append
(
json
)
}
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
jsonArr
)
}
else
if
(
tableKey
.
msgId
==
None
&&
Const
.
tableMap
.
contains
(
tableKey
))
{
//基础信息
writeBaseInfoHive
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
x
.
_2
.
toArray
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
)))
{
//定位补报
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
val
flat
=
x
.
_2
.
flatMap
(
x
=>
Tools
.
addLocation
(
x
))
val
value
=
flat
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>
(
x
.
getDouble
(
"lon"
)
/
1000000
,
x
.
getDouble
(
"lat"
)
/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
}
}
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"dateTime"
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
)))
{
//定位消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
val
value
=
x
.
_2
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>
(
x
.
getDouble
(
"lon"
)
/
1000000
,
x
.
getDouble
(
"lat"
)
/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
}
}
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"dateTime"
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
)))
{
//报警上传
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
var
useFul
=
new
ArrayBuffer
[
JSONObject
]()
var
useLess
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
val
warnType
=
json
.
getString
(
"warnType"
)
if
(
Const
.
warnTypes
.
contains
(
warnType
))
{
useFul
.
append
(
json
)
}
else
{
useLess
.
append
(
json
)
}
}
val
value
=
useFul
.
filter
(
x
=>
Const
.
warnTypes
.
contains
(
x
.
getString
(
"warnType"
))).
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
=
{
Tools
.
getAddressAndLocationCodes
(
sub
.
map
{
t
=>
val
infoStr
=
t
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
(
infoJson
.
getDouble
(
"LONGITUDE"
)
/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)
/
1000000
)
}
)
}
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
val
infoStr
=
json
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
val
longitude
=
infoJson
.
get
(
"LONGITUDE"
)
val
latitude
=
infoJson
.
get
(
"LATITUDE"
)
val
eventtype
=
infoJson
.
get
(
"EVENT_TYPE"
)
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
json
.
put
(
"fulladdress"
,
location
.
_1
)
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
}
}
if
(
jsonArr
.
size
>
0
&&
jsonArr
!=
null
)
{
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"warnTime"
)
}
if
(
useLess
.
size
>
0
&&
useLess
!=
null
)
{
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
useLess
)
}
}
else
{
//除了以上几种情况外的消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
jsonArr
.
append
(
json
)
}
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"bussinessTime"
)
}
}
catch
{
case
e
:
Exception
=>
println
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
println
(
e
)
e
.
printStackTrace
()
}
}
}
}
def
writeBaseInfoHive
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
Array
[
JSONObject
])
:
Unit
=
{
val
keySet
=
jsonArr
(
0
).
keySet
().
toArray
(
Array
[
String
]())
var
results
=
""
val
clos
=
new
ArrayBuffer
[
String
]()
val
insetSql
=
s
"""
|select * from ods.$tableName limit 1
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
insetSql
)
try
{
val
rs
=
stmt
.
executeQuery
()
(
1
to
rs
.
getMetaData
.
getColumnCount
).
foreach
{
x
=>
val
column
=
rs
.
getMetaData
.
getColumnName
(
x
).
split
(
"\\."
)(
1
)
clos
.
append
(
column
)
}
jsonArr
.
foreach
{
json
=>
var
result
=
""
if
(
clos
.
size
!=
keySet
.
size
){
val
unKnownJson
=
new
ArrayBuffer
[
JSONObject
]()
unKnownJson
.
append
(
json
)
writeUnknown
(
conn
,
Const
.
unKnownTable
,
unKnownJson
)
info
(
s
"The JSON field does not correspond to the $tableName field ==> "
+
json
.
toJSONString
)
info
(
s
" $tableName field ==> "
+
clos
.
toList
)
info
(
s
" JSON field ==> "
+
keySet
.
toList
)
}
else
{
clos
.
foreach
{
clo
=>
keySet
.
foreach
{
key
=>
if
(
key
.
toLowerCase
.
equals
(
clo
))
{
result
=
result
+
"\t"
+
json
.
getString
(
key
)
}
}
}
result
=
result
.
drop
(
1
)
+
"\n"
results
=
results
+
result
}
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/000000_0"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
toString
().
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
}
}
finally
{
stmt
.
close
()
}
}
def
writeUnknown
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
])
:
Unit
=
{
val
dateTime
=
new
DateTime
().
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
day
=
new
DateTime
().
toString
(
"yyyy-MM-dd"
)
val
sourceJsons
=
new
ArrayBuffer
[
JSONObject
]()
jsonArr
.
foreach
{
json
=>
val
source
=
new
JSONObject
()
source
.
put
(
"partitionday"
,
day
)
source
.
put
(
"dateTime"
,
dateTime
)
source
.
put
(
"jsondata"
,
json
.
toJSONString
)
sourceJsons
.
append
(
source
)
}
val
keySet
=
sourceJsons
(
0
).
keySet
().
toArray
(
Array
[
String
]())
var
results
=
""
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
createPartitionStmt
.
execute
()
val
clos
=
new
ArrayBuffer
[
String
]()
val
sql
=
s
"""
|select * from ods.$tableName limit 1
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
try
{
val
rs
:
ResultSet
=
stmt
.
executeQuery
()
(
1
to
rs
.
getMetaData
.
getColumnCount
).
foreach
{
x
=>
val
column
=
rs
.
getMetaData
.
getColumnName
(
x
).
split
(
"\\."
)(
1
)
clos
.
append
(
column
)
}
sourceJsons
.
foreach
{
json
=>
var
result
=
""
clos
.
foreach
{
clo
=>
keySet
.
foreach
{
key
=>
if
(
key
.
toLowerCase
.
equals
(
clo
))
{
result
=
result
+
"\t"
+
json
.
getString
(
key
)
}
}
}
result
=
result
.
drop
(
1
)
+
"\n"
results
=
results
+
result
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/day=$day/000000_0"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
toString
().
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
}
}
finally
{
stmt
.
close
()
createPartitionStmt
.
close
()
}
}
def
writeHiveTable
(
conn
:
Connection
,
tableName
:
String
,
json
:
JSONObject
)
:
Unit
=
{
println
(
"start insert hive : "
+
DateTime
.
now
())
//定位消息,增加区县代码字段及值
// if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
// || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
// val addressAndLocation = Tools.getAddressAndLocationCode(json.getDouble("lon")/1000000,json.getDouble("lat")/1000000)
// json.put("districtcode",addressAndLocation._2)
// }
//报警信息 增加区县代码和事件类型,经纬度,详细地址
// if(tableName == Const.tableMap(TableKey(Some("0x1400"),Some("0x1402")))){
// val warnType = json.getString("warnType")
// if (!Const.warnTypes.contains(warnType)){
// writeUnknown(conn,Const.unKnownTable,json)
// }
// val infoStr = json.getString("infoContent")
// val infoJson = Tools.getInfoContentJsonobj(infoStr)
// json.put("longitude",infoJson.get("LONGITUDE"))
// json.put("latitude",infoJson.get("LATITUDE"))
// json.put("eventtype",infoJson.get("EVENT_TYPE"))
// val addressAndLocation = Tools.getAddressAndLocationCode(infoJson.getDouble("LONGITUDE")/1000000,infoJson.getDouble("LATITUDE")/1000000)
// json.put("fulladdress",addressAndLocation._1)
// println(addressAndLocation._1)
// json.put("districtcode",addressAndLocation._2)
// }
var
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
val
day
=
if
(
keys
.
contains
(
"dateTime"
))
{
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
}
else
if
(
keys
.
contains
(
"warnTime"
))
{
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
}
else
{
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
}
// json.put("partitionday",day)
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
val
setStmt
=
conn
.
prepareStatement
(
"set set hive.exec.dynamic.partition.mode=nonstrict"
)
val
sql
=
s
"""
|insert into $tableName
|partition(partitionday)
|(${keys.map(x => x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
try
{
createPartitionStmt
.
execute
()
setStmt
.
execute
()
stmt
.
execute
()
info
(
s
"insert date to HiveTable $tableName SUCCESS"
)
}
catch
{
case
e
:
Exception
=>
{
println
(
"Exception Messages==>"
+
e
)
println
(
s
"hive table $tableName insert data failed==>"
+
json
)
}
}
finally
{
createPartitionStmt
.
close
()
setStmt
.
close
()
stmt
.
close
()
}
println
(
"insert hive end : "
+
DateTime
.
now
())
}
def
writeHiveBatch
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
],
dateKey
:
String
)
:
Unit
=
{
var
map
:
Map
[
String
,
ArrayBuffer
[
JSONObject
]]
=
Map
()
dateKey
match
{
case
"dateTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
map
.+(
day
->
json
)
}
}
}
case
"warnTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
}
else
{
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
map
+=
(
day
->
jsons
)
}
}
}
case
"bussinessTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
map
.+(
day
->
json
)
}
}
}
}
val
partitionDays
=
map
.
keySet
partitionDays
.
foreach
{
day
=>
val
jsons
:
ArrayBuffer
[
JSONObject
]
=
map
.
get
(
day
).
get
val
keys
=
jsons
(
0
).
keySet
().
toArray
(
Array
[
String
]())
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
createPartitionStmt
.
execute
()
val
sql
=
s
"""
|insert into $tableName
|partition(partitionday='$day')
|(${keys.map(x => x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
jsons
.
foreach
{
json
=>
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
stmt
.
addBatch
()
}
try
{
stmt
.
executeBatch
()
}
catch
{
case
e
:
Exception
=>
{
println
(
jsons
.
toList
)
println
(
jsons
.
toList
)
println
(
e
)
}
}
finally
{
stmt
.
close
()
createPartitionStmt
.
close
()
}
}
}
def
writeHdfs
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
],
dateKey
:
String
)
:
Unit
=
{
var
map
:
Map
[
String
,
ArrayBuffer
[
JSONObject
]]
=
Map
()
dateKey
match
{
case
"dateTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
dateTime
=
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
day
=
dateTime
.
split
(
" "
)(
0
)
json
.
put
(
"dateTime"
,
dateTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
}
else
{
jsons
.
append
(
json
)
map
+=(
day
->
jsons
)
}
}
}
case
"warnTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
val
warnTime
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd HH:mm:ss"
,
Locale
.
CHINESE
)
json
.
put
(
"warnTime"
,
warnTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
}
else
{
map
+=(
day
->
jsons
)
}
}
}
case
"bussinessTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
bussinessTime
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd HH:mm:ss"
)
json
.
put
(
"bussinessTime"
,
bussinessTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
}
else
{
jsons
.
append
(
json
)
map
+=(
day
->
jsons
)
}
}
}
}
val
partitionDays
=
map
.
keySet
partitionDays
.
foreach
{
day
=>
val
jsons
:
ArrayBuffer
[
JSONObject
]
=
map
.
get
(
day
).
get
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
createPartitionStmt
.
execute
()
//val results = new StringBuilder()
var
results
=
""
val
clos
=
new
ArrayBuffer
[
String
]()
val
sql
=
s
"""
|select * from ods.$tableName limit 1
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
try
{
val
rs
=
stmt
.
executeQuery
()
(
1
to
rs
.
getMetaData
.
getColumnCount
).
foreach
{
x
=>
val
column
=
rs
.
getMetaData
.
getColumnName
(
x
).
split
(
"\\."
)(
1
)
clos
.
append
(
column
)
}
jsons
.
foreach
{
json
=>
val
keySet
=
json
.
keySet
().
toArray
(
Array
[
String
]())
var
result
=
""
if
(
clos
.
size
!=
keySet
.
size
){
val
unKnownJson
=
new
ArrayBuffer
[
JSONObject
]()
unKnownJson
.
append
(
json
)
writeUnknown
(
conn
,
Const
.
unKnownTable
,
unKnownJson
)
info
(
s
"The JSON field does not correspond to the $tableName field ==> "
+
json
.
toJSONString
)
info
(
s
" $tableName field ==> "
+
clos
.
toList
)
info
(
s
" JSON field ==> "
+
keySet
.
toList
)
}
else
{
clos
.
foreach
{
clo
=>
keySet
.
foreach
{
key
=>
if
(
key
.
toLowerCase
.
equals
(
clo
))
{
result
=
result
+
"\t"
+
json
.
getString
(
key
)
}
}
}
result
=
result
.
drop
(
1
)+
"\n"
results
=
results
+
result
}
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/partitionday=$day/000000_0"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
}
}
finally
{
stmt
.
close
()
createPartitionStmt
.
close
()
}
}
}
}
src/main/scala/com/hikcreate/data/util/HDFSHelper.scala
0 → 100644
View file @
31d52b5c
package
com.hikcreate.data.util
import
java.io.
{
FileSystem
=>
_
,
_
}
import
org.apache.hadoop.fs._
import
scala.collection.mutable.ListBuffer
object
HDFSHelper
{
def
isDir
(
hdfs
:
FileSystem
,
name
:
String
)
:
Boolean
=
{
hdfs
.
isDirectory
(
new
Path
(
name
))
}
def
isDir
(
hdfs
:
FileSystem
,
name
:
Path
)
:
Boolean
=
{
hdfs
.
isDirectory
(
name
)
}
def
isFile
(
hdfs
:
FileSystem
,
name
:
String
)
:
Boolean
=
{
hdfs
.
isFile
(
new
Path
(
name
))
}
def
isFile
(
hdfs
:
FileSystem
,
name
:
Path
)
:
Boolean
=
{
hdfs
.
isFile
(
name
)
}
def
createFile
(
hdfs
:
FileSystem
,
name
:
String
)
:
Boolean
=
{
hdfs
.
createNewFile
(
new
Path
(
name
))
}
def
createFile
(
hdfs
:
FileSystem
,
name
:
Path
)
:
Boolean
=
{
hdfs
.
createNewFile
(
name
)
}
def
createFolder
(
hdfs
:
FileSystem
,
name
:
String
)
:
Boolean
=
{
hdfs
.
mkdirs
(
new
Path
(
name
))
}
def
createFolder
(
hdfs
:
FileSystem
,
name
:
Path
)
:
Boolean
=
{
hdfs
.
mkdirs
(
name
)
}
def
exists
(
hdfs
:
FileSystem
,
name
:
String
)
:
Boolean
=
{
hdfs
.
exists
(
new
Path
(
name
))
}
def
exists
(
hdfs
:
FileSystem
,
name
:
Path
)
:
Boolean
=
{
hdfs
.
exists
(
name
)
}
def
transport
(
inputStream
:
InputStream
,
outputStream
:
OutputStream
)
:
Unit
={
val
buffer
=
new
Array
[
Byte
](
64
*
1000
)
var
len
=
inputStream
.
read
(
buffer
)
while
(
len
!=
-
1
)
{
outputStream
.
write
(
buffer
,
0
,
len
-
1
)
len
=
inputStream
.
read
(
buffer
)
}
outputStream
.
flush
()
inputStream
.
close
()
outputStream
.
close
()
}
class
MyPathFilter
extends
PathFilter
{
override
def
accept
(
path
:
Path
)
:
Boolean
=
true
}
/**
* create a target file and provide parent folder if necessary
*/
def
createLocalFile
(
fullName
:
String
)
:
File
=
{
val
target
:
File
=
new
File
(
fullName
)
if
(!
target
.
exists
){
val
index
=
fullName
.
lastIndexOf
(
File
.
separator
)
val
parentFullName
=
fullName
.
substring
(
0
,
index
)
val
parent
:
File
=
new
File
(
parentFullName
)
if
(!
parent
.
exists
)
parent
.
mkdirs
else
if
(!
parent
.
isDirectory
)
parent
.
mkdir
target
.
createNewFile
}
target
}
/**
* delete file in hdfs
* @return true: success, false: failed
*/
def
deleteFile
(
hdfs
:
FileSystem
,
path
:
String
)
:
Boolean
=
{
if
(
isDir
(
hdfs
,
path
))
hdfs
.
delete
(
new
Path
(
path
),
true
)
//true: delete files recursively
else
hdfs
.
delete
(
new
Path
(
path
),
false
)
}
/**
* get all file children's full name of a hdfs dir, not include dir children
* @param fullName the hdfs dir's full name
*/
def
listChildren
(
hdfs
:
FileSystem
,
fullName
:
String
,
holder
:
ListBuffer
[
String
])
:
ListBuffer
[
String
]
=
{
val
filesStatus
=
hdfs
.
listStatus
(
new
Path
(
fullName
),
new
MyPathFilter
)
for
(
status
<-
filesStatus
){
val
filePath
:
Path
=
status
.
getPath
if
(
isFile
(
hdfs
,
filePath
))
holder
+=
filePath
.
toString
else
listChildren
(
hdfs
,
filePath
.
toString
,
holder
)
}
holder
}
def
copyFile
(
hdfs
:
FileSystem
,
source
:
String
,
target
:
String
)
:
Unit
=
{
val
sourcePath
=
new
Path
(
source
)
val
targetPath
=
new
Path
(
target
)
if
(!
exists
(
hdfs
,
targetPath
))
createFile
(
hdfs
,
targetPath
)
val
inputStream
:
FSDataInputStream
=
hdfs
.
open
(
sourcePath
)
val
outputStream
:
FSDataOutputStream
=
hdfs
.
create
(
targetPath
)
transport
(
inputStream
,
outputStream
)
}
def
copyFolder
(
hdfs
:
FileSystem
,
sourceFolder
:
String
,
targetFolder
:
String
)
:
Unit
=
{
val
holder
:
ListBuffer
[
String
]
=
new
ListBuffer
[
String
]
val
children
:
List
[
String
]
=
listChildren
(
hdfs
,
sourceFolder
,
holder
).
toList
for
(
child
<-
children
)
copyFile
(
hdfs
,
child
,
child
.
replaceFirst
(
sourceFolder
,
targetFolder
))
}
def
copyFileFromLocal
(
hdfs
:
FileSystem
,
localSource
:
String
,
hdfsTarget
:
String
)
:
Unit
=
{
val
targetPath
=
new
Path
(
hdfsTarget
)
if
(!
exists
(
hdfs
,
targetPath
))
createFile
(
hdfs
,
targetPath
)
val
inputStream
:
FileInputStream
=
new
FileInputStream
(
localSource
)
val
outputStream
:
FSDataOutputStream
=
hdfs
.
create
(
targetPath
)
transport
(
inputStream
,
outputStream
)
}
def
copyFileToLocal
(
hdfs
:
FileSystem
,
hdfsSource
:
String
,
localTarget
:
String
)
:
Unit
=
{
val
localFile
:
File
=
createLocalFile
(
localTarget
)
val
inputStream
:
FSDataInputStream
=
hdfs
.
open
(
new
Path
(
hdfsSource
))
val
outputStream
:
FileOutputStream
=
new
FileOutputStream
(
localFile
)
transport
(
inputStream
,
outputStream
)
}
def
copyFolderFromLocal
(
hdfs
:
FileSystem
,
localSource
:
String
,
hdfsTarget
:
String
)
:
Unit
=
{
val
localFolder
:
File
=
new
File
(
localSource
)
val
allChildren
:
Array
[
File
]
=
localFolder
.
listFiles
for
(
child
<-
allChildren
){
val
fullName
=
child
.
getAbsolutePath
val
nameExcludeSource
:
String
=
fullName
.
substring
(
localSource
.
length
)
val
targetFileFullName
:
String
=
hdfsTarget
+
Path
.
SEPARATOR
+
nameExcludeSource
if
(
child
.
isFile
)
copyFileFromLocal
(
hdfs
,
fullName
,
targetFileFullName
)
else
copyFolderFromLocal
(
hdfs
,
fullName
,
targetFileFullName
)
}
}
def
copyFolderToLocal
(
hdfs
:
FileSystem
,
hdfsSource
:
String
,
localTarget
:
String
)
:
Unit
=
{
val
holder
:
ListBuffer
[
String
]
=
new
ListBuffer
[
String
]
val
children
:
List
[
String
]
=
listChildren
(
hdfs
,
hdfsSource
,
holder
).
toList
val
hdfsSourceFullName
=
hdfs
.
getFileStatus
(
new
Path
(
hdfsSource
)).
getPath
.
toString
val
index
=
hdfsSourceFullName
.
length
for
(
child
<-
children
){
val
nameExcludeSource
:
String
=
child
.
substring
(
index
+
1
)
val
targetFileFullName
:
String
=
localTarget
+
File
.
separator
+
nameExcludeSource
copyFileToLocal
(
hdfs
,
child
,
targetFileFullName
)
}
}
}
src/main/scala/com/hikcreate/data/util/Tools.scala
View file @
31d52b5c
...
@@ -73,10 +73,10 @@ object Tools extends Logging{
...
@@ -73,10 +73,10 @@ object Tools extends Logging{
arr
.
add
(
lonAndLat
)
arr
.
add
(
lonAndLat
)
}
}
json
.
put
(
"locations"
,
arr
)
json
.
put
(
"locations"
,
arr
)
//
val startTime = DateTime.now()
val
startTime
=
DateTime
.
now
()
val
response
=
Http
(
Const
.
areaCodeAndAddressUrl
).
postData
(
json
.
toJSONString
).
header
(
"content-type"
,
"application/json"
).
asString
val
response
=
Http
(
Const
.
areaCodeAndAddressUrl
).
postData
(
json
.
toJSONString
).
header
(
"content-type"
,
"application/json"
).
asString
//
val endTime = DateTime.now()
val
endTime
=
DateTime
.
now
()
//println("http请求时间:"+new Duration(startTime,endTime).getMillis/1000
)
// println("经纬度列表size:"+buffer.size+"===》http response time :"+new Duration(startTime,endTime).getMillis
)
val
body
=
JSON
.
parseObject
(
response
.
body
)
val
body
=
JSON
.
parseObject
(
response
.
body
)
val
items
=
body
.
getJSONObject
(
"result"
).
getJSONArray
(
"regeoItems"
)
val
items
=
body
.
getJSONObject
(
"result"
).
getJSONArray
(
"regeoItems"
)
(
0
until
items
.
size
()).
map
{
index
=>
(
0
until
items
.
size
()).
map
{
index
=>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment