Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
operating-vehicle
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
杜发飞
operating-vehicle
Commits
da710836
Commit
da710836
authored
Nov 05, 2019
by
王建成
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
c8e64ee4
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
348 additions
and
389 deletions
+348
-389
pom.xml
+11
-1
src/main/resources/conf.properties
+9
-10
src/main/scala/com/hikcreate/data/constant/Const.scala
+2
-1
src/main/scala/com/hikcreate/data/sync/SysncHiveBatch.scala
+325
-376
src/main/scala/com/hikcreate/data/util/Tools.scala
+1
-1
No files found.
pom.xml
View file @
da710836
...
@@ -6,7 +6,7 @@
...
@@ -6,7 +6,7 @@
<groupId>
groupId
</groupId>
<groupId>
groupId
</groupId>
<artifactId>
operating-vehicle
</artifactId>
<artifactId>
operating-vehicle
</artifactId>
<version>
1.
2
-SNAPSHOT
</version>
<version>
1.
3
-SNAPSHOT
</version>
<repositories>
<repositories>
<repository>
<repository>
...
@@ -27,6 +27,16 @@
...
@@ -27,6 +27,16 @@
<dependencies>
<dependencies>
<dependency>
<dependency>
<groupId>
com.ctrip.framework.apollo
</groupId>
<artifactId>
apollo-client
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
com.ctrip.framework.apollo
</groupId>
<artifactId>
apollo-core
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-core_${scala.binary.version}
</artifactId>
<artifactId>
spark-core_${scala.binary.version}
</artifactId>
<version>
${spark.version}
</version>
<version>
${spark.version}
</version>
...
...
src/main/resources/conf.properties
View file @
da710836
...
@@ -2,21 +2,20 @@ kafka.bootstrap.servers=39.100.49.76:9092
...
@@ -2,21 +2,20 @@ kafka.bootstrap.servers=39.100.49.76:9092
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
kafka.zookerper.servers
=
10.197.236.211:2181
kafka.zookerper.servers
=
10.197.236.211:2181
window.time
=
5
window.time
=
5
compact.kafka.topic
=
transport_basedata_operation,tbd-transport-data-gathering
application.kafka.topic
=
tbd-transport-data-gathering
application.kafka.topic
=
tbd-transport-data-gathering
basicsInfo.kafka.topic
=
transport_basedata_operation
basicsInfo.kafka.topic
=
transport_basedata_operation
hive.group.id
=
hive
hive.group.id
=
hive
ignite.group.id
=
ignite3
ignite.group.id
=
ignite3
basics.group.id
=
basics2
basics.group.id
=
basics2
hdfs.url
=
hdfs://10.197.236.211:8020
hive.driver
=
org.apache.hive.jdbc.HiveDriver
hive.driver
=
org.apache.hive.jdbc.HiveDriver
hive.url
=
jdbc:hive2://hadoop02:10000/ods
hive.url
=
jdbc:hive2://hadoop02:10000/ods
hive.username
=
hive
hive.username
=
hive
hive.password
=
hive
hive.password
=
hive
areaCodeAndAddress.Url
=
http://10.197.236.100:40612/bcpbase/geocode/regeo
areaCodeAndAddress.Url
=
http://10.197.236.100:40612/bcpbase/geocode/regeo
warnTypes
=
0x0064,0x0065,0x0066
warnTypes
=
0x0064,0x0065,0x0066
hive.unknown.table
=
KAFKA_UNKNOWN_I
hive.unknown.table
=
unknown_mes
###链路管理类
###链路管理类
hive.UP_CONNECT_REQ.table
=
KAFKA_UP_CONNECT_I
hive.UP_CONNECT_REQ.table
=
KAFKA_UP_CONNECT_I
hive.UP_DISCONNECT_REQ.table
=
KAFKA_UP_DISCONNECT_INFORM_I
hive.UP_DISCONNECT_REQ.table
=
KAFKA_UP_DISCONNECT_INFORM_I
...
@@ -46,8 +45,8 @@ hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table=KAFKA_UP_PREVENTION_EXG_MSG_DEVICE
...
@@ -46,8 +45,8 @@ hive.UP_PREVENTION_EXG_MSG_DEVICE_PARAM.table=KAFKA_UP_PREVENTION_EXG_MSG_DEVICE
###智能视频报警交互消息类
###智能视频报警交互消息类
hive.UP_PREVENTION_MSG_FILE_COMPLETE.table
=
KAFKA_UP_PREVENTION_MSG_FILE_COMPLETE_I
hive.UP_PREVENTION_MSG_FILE_COMPLETE.table
=
KAFKA_UP_PREVENTION_MSG_FILE_COMPLETE_I
#基础信息hive表
#基础信息hive表
hive.KAFKA_base_into_platform_info.table
=
KAFKA_base_into_platform_info_I
hive.KAFKA_base_into_platform_info.table
=
kafka_base_into_platform_info_i
hive.KAFKA_base_into_enterprise_info.table
=
KAFKA_base_into_enterprise_info_I
hive.KAFKA_base_into_enterprise_info.table
=
kafka_base_into_enterprise_info_i
hive.KAFKA_base_into_vehicle_info.table
=
KAFKA_base_into_vehicle_info_I
hive.KAFKA_base_into_vehicle_info.table
=
kafka_base_into_vehicle_info_i
hive.KAFKA_base_data_display_config.table
=
KAFKA_base_data_display_config_I
hive.KAFKA_base_data_display_config.table
=
kafka_base_data_display_config_i
hive.KAFKA_base_warning_type.table
=
KAFKA_base_warning_type_I
hive.KAFKA_base_warning_type.table
=
kafka_base_warning_type_i
\ No newline at end of file
\ No newline at end of file
src/main/scala/com/hikcreate/data/constant/Const.scala
View file @
da710836
...
@@ -9,7 +9,7 @@ object Const {
...
@@ -9,7 +9,7 @@ object Const {
val
windowTime
:
Int
=
Config
.
getInt
(
"window.time"
)
val
windowTime
:
Int
=
Config
.
getInt
(
"window.time"
)
val
bootstrap
:
String
=
Config
.
getString
(
"kafka.bootstrap.servers"
)
val
bootstrap
:
String
=
Config
.
getString
(
"kafka.bootstrap.servers"
)
val
zkKafka
:
String
=
Config
.
getString
(
"kafka.zookerper.servers"
)
val
zkKafka
:
String
=
Config
.
getString
(
"kafka.zookerper.servers"
)
val
compactTopic
:
Array
[
String
]
=
Config
.
getString
(
"compact.kafka.topic"
).
split
(
","
)
val
applicationTopic
:
Array
[
String
]
=
Config
.
getString
(
"application.kafka.topic"
).
split
(
","
)
val
applicationTopic
:
Array
[
String
]
=
Config
.
getString
(
"application.kafka.topic"
).
split
(
","
)
val
basicsInfoTopic
:
Array
[
String
]
=
Config
.
getString
(
"basicsInfo.kafka.topic"
).
split
(
","
)
val
basicsInfoTopic
:
Array
[
String
]
=
Config
.
getString
(
"basicsInfo.kafka.topic"
).
split
(
","
)
val
hiveGroupId
:
String
=
Config
.
getString
(
"hive.group.id"
)
val
hiveGroupId
:
String
=
Config
.
getString
(
"hive.group.id"
)
...
@@ -23,6 +23,7 @@ object Const {
...
@@ -23,6 +23,7 @@ object Const {
val
hivePassword
:
String
=
Config
.
getString
(
"hive.password"
)
val
hivePassword
:
String
=
Config
.
getString
(
"hive.password"
)
val
areaCodeAndAddressUrl
:
String
=
Config
.
getString
(
"areaCodeAndAddress.Url"
)
val
areaCodeAndAddressUrl
:
String
=
Config
.
getString
(
"areaCodeAndAddress.Url"
)
val
unKnownTable
:
String
=
Config
.
getString
(
"hive.unknown.table"
)
val
unKnownTable
:
String
=
Config
.
getString
(
"hive.unknown.table"
)
val
hdfsUrl
:
String
=
Config
.
getString
(
"hdfs.url"
)
val
warnTypes
:
Array
[
String
]
=
Config
.
getString
(
"warnTypes"
).
split
(
","
)
val
warnTypes
:
Array
[
String
]
=
Config
.
getString
(
"warnTypes"
).
split
(
","
)
...
...
src/main/scala/com/hikcreate/data/sync/SysncHiveBatch.scala
View file @
da710836
...
@@ -2,18 +2,16 @@ package com.hikcreate.data.sync
...
@@ -2,18 +2,16 @@ package com.hikcreate.data.sync
import
java.io.ByteArrayInputStream
import
java.io.ByteArrayInputStream
import
java.net.URI
import
java.net.URI
import
java.sql.
Connection
import
java.sql.
{
Connection
,
ResultSet
}
import
java.util.Locale
import
java.util.Locale
import
com.alibaba.fastjson.
{
JSON
,
JSONObject
}
import
com.alibaba.fastjson.
{
JSON
,
JSONObject
}
import
com.hikcreate.data.client.DbClient
import
com.hikcreate.data.client.DbClient
import
com.hikcreate.data.common.
{
Logging
,
Sparking
}
import
com.hikcreate.data.common.
{
Logging
,
Sparking
}
import
com.hikcreate.data.constant.Const
import
com.hikcreate.data.constant.Const
import
com.hikcreate.data.model.TableKey
import
com.hikcreate.data.model.TableKey
import
com.hikcreate.data.util.
{
HDFSHelper
,
Tools
,
ZkManager
}
import
com.hikcreate.data.util.
{
HDFSHelper
,
Tools
,
ZkManager
}
import
org.apache.spark.SparkContext
import
org.apache.spark.rdd.RDD
import
org.apache.spark.sql.hive.HiveContext
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.apache.spark.streaming.
{
Seconds
,
StreamingContext
}
import
org.apache.spark.streaming.kafka010._
import
org.apache.spark.streaming.kafka010._
import
org.joda.time.
{
DateTime
,
Duration
}
import
org.joda.time.
{
DateTime
,
Duration
}
...
@@ -21,121 +19,112 @@ import org.joda.time.format.DateTimeFormat
...
@@ -21,121 +19,112 @@ import org.joda.time.format.DateTimeFormat
import
org.apache.hadoop.conf.Configuration
import
org.apache.hadoop.conf.Configuration
import
org.apache.hadoop.fs.
{
FileSystem
,
Path
}
import
org.apache.hadoop.fs.
{
FileSystem
,
Path
}
import
org.apache.hadoop.hdfs.DistributedFileSystem
import
org.apache.hadoop.hdfs.DistributedFileSystem
import
org.apache.hadoop.io.IOUtils
import
org.apache.spark.sql.SparkSession
import
scala.collection.mutable.ArrayBuffer
import
scala.collection.mutable.ArrayBuffer
object
SysncHiveBatch
extends
Sparking
with
Logging
{
object
SysncHiveBatch
extends
Sparking
with
Logging
{
//val sc = new SparkContext(conf)
val
hdfs
:
DistributedFileSystem
=
new
DistributedFileSystem
()
private
val
sparkSesson
=
SparkSession
.
builder
().
config
(
conf
).
enableHiveSupport
().
getOrCreate
()
hdfs
.
initialize
(
URI
.
create
(
Const
.
hdfsUrl
),
new
Configuration
())
private
val
sc
=
sparkSesson
.
sparkContext
val
ssc
=
new
StreamingContext
(
sc
,
Seconds
(
Const
.
windowTime
))
val
hdfs
:
DistributedFileSystem
=
new
DistributedFileSystem
()
hdfs
.
initialize
(
URI
.
create
(
"hdfs://10.197.236.211:8020"
),
new
Configuration
())
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
Const
.
windowTime
))
val
zkManager
=
ZkManager
(
Const
.
zkKafka
)
val
zkManager
=
ZkManager
(
Const
.
zkKafka
)
val
kafkaParams
=
getKafkaParams
(
Const
.
bootstrap
,
Const
.
hiveGroupId
)
val
kafkaParams
=
getKafkaParams
(
Const
.
bootstrap
,
Const
.
hiveGroupId
)
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
applicationTopic
,
Const
.
hiveGroupId
)
val
offsets
=
zkManager
.
getBeginOffset
(
Const
.
compactTopic
,
Const
.
hiveGroupId
)
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
ssc
,
ssc
,
LocationStrategies
.
PreferConsistent
,
LocationStrategies
.
PreferConsistent
,
ConsumerStrategies
.
Subscribe
[
String
,
String
](
Const
.
applicationTopic
,
kafkaParams
,
offsets
))
ConsumerStrategies
.
Subscribe
[
String
,
String
](
Const
.
compactTopic
,
kafkaParams
,
offsets
))
inputStream
.
transform
{
rdd
=>
inputStream
.
transform
{
rdd
=>
offsetRanges
.
clear
()
offsetRanges
.
clear
()
offsetRanges
.
append
(
rdd
.
asInstanceOf
[
HasOffsetRanges
].
offsetRanges
:_
*
)
offsetRanges
.
append
(
rdd
.
asInstanceOf
[
HasOffsetRanges
].
offsetRanges
:
_
*
)
rdd
rdd
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
if
(!
rdd
.
isEmpty
())
{
if
(!
rdd
.
isEmpty
())
{
val
startTime
=
DateTime
.
now
()
val
startTime
=
DateTime
.
now
()
val
groupRdd
=
rdd
.
map
(
JSON
.
parseObject
).
groupBy
(
json
=>
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
))))
rdd
.
map
(
JSON
.
parseObject
).
groupBy
(
json
=>
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))).
foreachPartition
(
x
=>
processRow3
(
x
))
processRow3
(
groupRdd
,
ssc
)
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
hiveGroupId
)
zkManager
.
saveEndOffset
(
offsetRanges
,
Const
.
hiveGroupId
)
offsetRanges
.
foreach
{
x
=>
offsetRanges
.
foreach
{
x
=>
println
(
x
)
println
(
x
)
}
}
val
endTime
=
DateTime
.
now
()
val
endTime
=
DateTime
.
now
()
println
(
DateTime
.
now
()
+
"==============time token: "
+
new
Duration
(
startTime
,
endTime
).
getMillis
+
"
ms=============="
)
println
(
DateTime
.
now
()
+
"==============This Batch Time Token: "
+
new
Duration
(
startTime
,
endTime
).
getMillis
+
"
ms=============="
)
}
}
}
}
ssc
.
start
()
ssc
.
start
()
ssc
.
awaitTermination
()
ssc
.
awaitTermination
()
}
}
def
processRow3
(
x
:
RDD
[(
TableKey
,
Iterable
[
JSONObject
])],
ssc
:
StreamingContext
)
:
Unit
=
{
println
(
"start process data: "
+
DateTime
.
now
())
def
processRow3
(
x
:
Iterator
[(
TableKey
,
Iterable
[
JSONObject
])])
:
Unit
=
{
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
DbClient
.
usingDB
(
Const
.
hivePoolName
)
{
db
=>
DbClient
.
usingDB
(
Const
.
hivePoolName
)
{
db
=>
x
.
foreach
Partition
{
x
=>
x
.
foreach
{
x
=>
x
.
foreach
{
try
{
x
=>try
{
val
tableKey
=
x
.
_1
val
tableKey
=
x
.
_1
if
(!
Const
.
tableMap
.
contains
(
tableKey
)
&&
tableKey
.
msgId
!=
None
)
{
if
(!
Const
.
tableMap
.
contains
(
tableKey
)
&&
tableKey
.
msgId
!=
null
)
{
//未知消息
//未知消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
x
.
_2
.
foreach
{
json
=>
jsonArr
.
append
(
json
)
jsonArr
.
append
(
json
)
}
}
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
jsonArr
)
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
jsonArr
)
}
else
if
(
tableKey
.
msgId
==
null
){
//基础信息
}
else
if
(
tableKey
.
msgId
==
None
&&
Const
.
tableMap
.
contains
(
tableKey
))
{
//x._2.foreach{json=>
//基础信息
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
writeBaseInfoHive
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
x
.
_2
.
toArray
)
// }
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
)))
{
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),x._2.toArray)
//定位补报
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
))){
//定位补报
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
val
flat
=
x
.
_2
.
flatMap
(
x
=>
Tools
.
addLocation
(
x
))
val
flat
=
x
.
_2
.
flatMap
(
x
=>
Tools
.
addLocation
(
x
))
val
value
=
flat
.
toList
.
grouped
(
20
)
val
value
=
flat
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>(
x
.
getDouble
(
"lon"
)/
1000000
,
x
.
getDouble
(
"lat"
)/
1000000
)))
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>
(
x
.
getDouble
(
"lon"
)
/
1000000
,
x
.
getDouble
(
"lat"
)
/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
json
=
line
.
_1
val
location
=
line
.
_2
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
jsonArr
.
append
(
json
)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}
}
// writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"dateTime"
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
))){
//定位消息
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
)))
{
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
//定位消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
val
value
=
x
.
_2
.
toList
.
grouped
(
20
)
val
value
=
x
.
_2
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>(
x
.
getDouble
(
"lon"
)/
1000000
,
x
.
getDouble
(
"lat"
)/
1000000
)))
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>
(
x
.
getDouble
(
"lon"
)
/
1000000
,
x
.
getDouble
(
"lat"
)
/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
json
=
line
.
_1
val
location
=
line
.
_2
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
jsonArr
.
append
(
json
)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}
}
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,"dateTime")
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"dateTime"
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
))){
//报警上传
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
)))
{
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
//报警上传
var
useFul
=
new
ArrayBuffer
[
JSONObject
]()
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
var
useLess
=
new
ArrayBuffer
[
JSONObject
]()
var
useFul
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
var
useLess
=
new
ArrayBuffer
[
JSONObject
]()
val
warnType
=
json
.
getString
(
"warnType"
)
x
.
_2
.
foreach
{
json
=>
if
(
Const
.
warnTypes
.
contains
(
warnType
)){
val
warnType
=
json
.
getString
(
"warnType"
)
if
(
Const
.
warnTypes
.
contains
(
warnType
))
{
useFul
.
append
(
json
)
useFul
.
append
(
json
)
}
else
{
}
else
{
useLess
.
append
(
json
)
useLess
.
append
(
json
)
//writeUnknown(db.conn,Const.unKnownTable,json)
}
}
}
}
val
value
=
useFul
.
filter
(
x
=>
Const
.
warnTypes
.
contains
(
x
.
getString
(
"warnType"
))).
toList
.
grouped
(
20
)
val
value
=
useFul
.
filter
(
x
=>
Const
.
warnTypes
.
contains
(
x
.
getString
(
"warnType"
))).
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
value
.
foreach
{
sub
=>
val
codes
=
{
val
codes
=
{
Tools
.
getAddressAndLocationCodes
(
sub
.
map
{
t
=>
Tools
.
getAddressAndLocationCodes
(
sub
.
map
{
t
=>
val
infoStr
=
t
.
getString
(
"infoContent"
)
val
infoStr
=
t
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
(
infoJson
.
getDouble
(
"LONGITUDE"
)
/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
)
(
infoJson
.
getDouble
(
"LONGITUDE"
)
/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)
/
1000000
)
}
}
)
)
}
}
sub
.
zip
(
codes
).
foreach
{
line
=>
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
json
=
line
.
_1
val
location
=
line
.
_2
val
location
=
line
.
_2
val
infoStr
=
json
.
getString
(
"infoContent"
)
val
infoStr
=
json
.
getString
(
"infoContent"
)
...
@@ -143,232 +132,156 @@ object SysncHiveBatch extends Sparking with Logging {
...
@@ -143,232 +132,156 @@ object SysncHiveBatch extends Sparking with Logging {
val
longitude
=
infoJson
.
get
(
"LONGITUDE"
)
val
longitude
=
infoJson
.
get
(
"LONGITUDE"
)
val
latitude
=
infoJson
.
get
(
"LATITUDE"
)
val
latitude
=
infoJson
.
get
(
"LATITUDE"
)
val
eventtype
=
infoJson
.
get
(
"EVENT_TYPE"
)
val
eventtype
=
infoJson
.
get
(
"EVENT_TYPE"
)
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
json
.
put
(
"fulladdress"
,
location
.
_1
)
json
.
put
(
"fulladdress"
,
location
.
_1
)
json
.
put
(
"districtcode"
,
location
.
_2
)
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
jsonArr
.
append
(
json
)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}
}
if
(
jsonArr
.
size
>
0
&&
jsonArr
!=
null
)
{
if
(
jsonArr
.
size
>
0
&&
jsonArr
!=
null
)
{
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,
"warnTime")
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"warnTime"
)
}
}
if
(
useLess
.
size
>
0
&&
useLess
!=
null
)
{
if
(
useLess
.
size
>
0
&&
useLess
!=
null
)
{
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
useLess
)
writeUnknown
(
db
.
conn
,
Const
.
unKnownTable
,
useLess
)
}
}
}
else
{
//除了以上几种情况外的消息
}
else
{
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
//除了以上几种情况外的消息
x
.
_2
.
foreach
{
json
=>
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
jsonArr
.
append
(
json
)
jsonArr
.
append
(
json
)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
//writeHiveBatch(db.conn,Const.tableMap(tableKey),jsonArr,
"bussinessTime")
writeHdfs
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"bussinessTime"
)
}
}
}
catch
{
}
catch
{
case
e
:
Exception
=>
case
e
:
Exception
=>
println
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
println
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
println
(
e
)
println
(
e
)
e
.
printStackTrace
()
e
.
printStackTrace
()
}
}
}
}
}
}
}
}
}
def
processRow2
(
x
:
Iterator
[(
TableKey
,
Iterable
[
JSONObject
])])
:
Unit
=
{
println
(
"start process data: "
+
DateTime
.
now
())
def
writeBaseInfoHive
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
Array
[
JSONObject
])
:
Unit
=
{
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
val
keySet
=
jsonArr
(
0
).
keySet
().
toArray
(
Array
[
String
]())
DbClient
.
usingDB
(
Const
.
hivePoolName
)
{
db
=>
var
results
=
""
x
.
foreach
{
val
clos
=
new
ArrayBuffer
[
String
]()
x
=>try
{
val
insetSql
=
val
tableKey
=
x
.
_1
s
"""
if
(!
Const
.
tableMap
.
contains
(
tableKey
)
&&
tableKey
.
msgId
!=
null
)
{
//未知消息
|select * from ods.$tableName limit 1
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
"""
.
stripMargin
x
.
_2
.
foreach
{
json
=>
val
stmt
=
conn
.
prepareStatement
(
insetSql
)
jsonArr
.
append
(
json
)
try
{
}
val
rs
=
stmt
.
executeQuery
()
// writeUnknown(ssc,Const.unKnownTable,jsonArr)
(
1
to
rs
.
getMetaData
.
getColumnCount
).
foreach
{
x
=>
}
else
if
(
tableKey
.
msgId
==
null
){
//基础信息
val
column
=
rs
.
getMetaData
.
getColumnName
(
x
).
split
(
"\\."
)(
1
)
//x._2.foreach{json=>
clos
.
append
(
column
)
// writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
}
// }
jsonArr
.
foreach
{
json
=>
writeBaseInfoHive
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
x
.
_2
.
toArray
)
var
result
=
""
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
))){
//定位补报
if
(
clos
.
size
!=
keySet
.
size
){
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
val
unKnownJson
=
new
ArrayBuffer
[
JSONObject
]()
val
flat
=
x
.
_2
.
flatMap
(
x
=>
Tools
.
addLocation
(
x
))
unKnownJson
.
append
(
json
)
val
value
=
flat
.
toList
.
grouped
(
20
)
writeUnknown
(
conn
,
Const
.
unKnownTable
,
unKnownJson
)
value
.
foreach
{
sub
=>
info
(
s
"The JSON field does not correspond to the $tableName field ==> "
+
json
.
toJSONString
)
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>(
x
.
getDouble
(
"lon"
)/
1000000
,
x
.
getDouble
(
"lat"
)/
1000000
)))
info
(
s
" $tableName field ==> "
+
clos
.
toList
)
sub
.
zip
(
codes
).
foreach
{
line
=>
info
(
s
" JSON field ==> "
+
keySet
.
toList
)
val
json
=
line
.
_1
}
else
{
val
location
=
line
.
_2
clos
.
foreach
{
clo
=>
json
.
put
(
"districtcode"
,
location
.
_2
)
keySet
.
foreach
{
key
=>
jsonArr
.
append
(
json
)
if
(
key
.
toLowerCase
.
equals
(
clo
))
{
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
result
=
result
+
"\t"
+
json
.
getString
(
key
)
}
}
writeHiveBatch
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"dateTime"
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1202"
))){
//定位消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
val
value
=
x
.
_2
.
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
=
Tools
.
getAddressAndLocationCodes
(
sub
.
map
(
x
=>(
x
.
getDouble
(
"lon"
)/
1000000
,
x
.
getDouble
(
"lat"
)/
1000000
)))
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
writeHiveBatch
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"dateTime"
)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1400"
),
Some
(
"0x1402"
))){
//报警上传
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
var
useFul
=
new
ArrayBuffer
[
JSONObject
]()
var
useLess
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
val
warnType
=
json
.
getString
(
"warnType"
)
if
(
Const
.
warnTypes
.
contains
(
warnType
)){
useFul
.
append
(
json
)
}
else
{
useLess
.
append
(
json
)
//writeUnknown(db.conn,Const.unKnownTable,json)
}
}
val
value
=
useFul
.
filter
(
x
=>
Const
.
warnTypes
.
contains
(
x
.
getString
(
"warnType"
))).
toList
.
grouped
(
20
)
value
.
foreach
{
sub
=>
val
codes
={
Tools
.
getAddressAndLocationCodes
(
sub
.
map
{
t
=>
val
infoStr
=
t
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
(
infoJson
.
getDouble
(
"LONGITUDE"
)/
1000000
,
infoJson
.
getDouble
(
"LATITUDE"
)/
1000000
)
}
)
}
sub
.
zip
(
codes
).
foreach
{
line
=>
val
json
=
line
.
_1
val
location
=
line
.
_2
val
infoStr
=
json
.
getString
(
"infoContent"
)
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
val
longitude
=
infoJson
.
get
(
"LONGITUDE"
)
val
latitude
=
infoJson
.
get
(
"LATITUDE"
)
val
eventtype
=
infoJson
.
get
(
"EVENT_TYPE"
)
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
json
.
put
(
"fulladdress"
,
location
.
_1
)
json
.
put
(
"districtcode"
,
location
.
_2
)
jsonArr
.
append
(
json
)
//writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
}
}
}
if
(
jsonArr
.
size
>
0
&&
jsonArr
!=
null
){
writeHiveBatch
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"warnTime"
)
}
if
(
useLess
.
size
>
0
&&
useLess
!=
null
){
//writeUnknown(db.conn,Const.unKnownTable,useLess)
}
}
else
{
//除了以上几种情况外的消息
var
jsonArr
=
new
ArrayBuffer
[
JSONObject
]()
x
.
_2
.
foreach
{
json
=>
jsonArr
.
append
(
json
)
// writeHiveTable(db.conn,Const.tableMap(tableKey),json)
}
writeHiveBatch
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
jsonArr
,
"bussinessTime"
)
}
}
}
catch
{
result
=
result
.
drop
(
1
)
+
"\n"
case
e
:
Exception
=>
results
=
results
+
result
println
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
println
(
e
)
e
.
printStackTrace
()
}
}
}
}
}
results
=
results
+
"\n"
}
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
def
processRow
(
iterator
:
Iterator
[
String
])
:
Unit
=
{
val
fileName
=
s
"/hive/ODS.db/$tableName/000000_0"
DbClient
.
init
(
Const
.
hivePoolName
,
Const
.
hiveDriver
,
Const
.
hiveUrl
,
Const
.
hiveUsername
,
Const
.
hivePassword
)
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
DbClient
.
usingDB
(
Const
.
hivePoolName
){
db
=>
if
(!
exist
)
{
iterator
.
foreach
{
x
=>
hdfs
.
createNewFile
(
new
Path
(
fileName
))
try
{
val
json
=
JSON
.
parseObject
(
x
)
val
tableKey
=
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))
if
(!
Const
.
tableMap
.
contains
(
tableKey
))
{
//writeUnknown(db.conn,Const.unKnownTable,json)
}
else
if
(
!
json
.
containsKey
(
"msgId"
)){
//writeBaseInfoHive(db.conn,Const.tableMap(tableKey),json)
}
else
if
(
tableKey
==
TableKey
(
Some
(
"0x1200"
),
Some
(
"0x1203"
))){
Tools
.
addLocation
(
json
).
foreach
(
x
=>
writeHiveTable
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
x
))
}
else
{
writeHiveTable
(
db
.
conn
,
Const
.
tableMap
(
tableKey
),
json
)
}
}
catch
{
case
e
:
Exception
=>
error
(
x
)
e
.
printStackTrace
()
}
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
toString
().
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
}
}
}
}
finally
{
}
def
writeBaseInfoHive
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
Array
[
JSONObject
])
:
Unit
=
{
val
keys
=
jsonArr
(
0
).
keySet
().
toArray
(
Array
[
String
]())
val
insetSql
=
s
"""
|insert into ods.$tableName
|(${keys.map(x=>x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
insetSql
)
jsonArr
.
foreach
{
json
=>
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))}
stmt
.
addBatch
()
}
try
{
stmt
.
executeBatch
()
info
(
s
"insert date to HiveTable $tableName SUCCESS"
)
}
catch
{
case
e
:
Exception
=>{
println
(
"Exception Messages==>"
+
e
)
println
(
s
"hive table $tableName insert data failed==>"
+
jsonArr
.
toList
)
}
}
finally
{
stmt
.
close
()
stmt
.
close
()
}
}
}
}
def
writeUnknown
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
])
:
Unit
=
{
def
writeUnknown
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
])
:
Unit
=
{
val
dateTime
=
new
DateTime
().
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
dateTime
=
new
DateTime
().
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
day
=
new
DateTime
().
toString
(
"yyyy-MM-dd"
)
val
day
=
new
DateTime
().
toString
(
"yyyy-MM-dd"
)
val
results
=
new
StringBuilder
()
val
sourceJsons
=
new
ArrayBuffer
[
JSONObject
]()
jsonArr
.
foreach
{
json
=>
val
source
=
new
JSONObject
()
source
.
put
(
"partitionday"
,
day
)
source
.
put
(
"dateTime"
,
dateTime
)
source
.
put
(
"jsondata"
,
json
.
toJSONString
)
sourceJsons
.
append
(
source
)
}
val
keySet
=
sourceJsons
(
0
).
keySet
().
toArray
(
Array
[
String
]())
var
results
=
""
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
createPartitionStmt
.
execute
()
val
clos
=
new
ArrayBuffer
[
String
]()
val
sql
=
val
sql
=
"""
s
"""
|select * from
|select * from ods.$tableName limit 1
"""
.
stripMargin
"""
.
stripMargin
// val = conn.prepareStatement()
val
stmt
=
conn
.
prepareStatement
(
sql
)
try
{
jsonArr
.
foreach
{
json
=>
val
rs
:
ResultSet
=
stmt
.
executeQuery
()
val
result
=
dateTime
+
"\t"
+
json
.
toJSONString
+
"\t"
+
day
+
"\n"
(
1
to
rs
.
getMetaData
.
getColumnCount
).
foreach
{
x
=>
results
.
append
(
result
)
val
column
=
rs
.
getMetaData
.
getColumnName
(
x
).
split
(
"\\."
)(
1
)
}
clos
.
append
(
column
)
if
(
results
.
size
>
0
&&
results
!=
null
){
}
val
fileName
=
s
"/hive/ODS.db/$tableName/day=$day/000000_0"
sourceJsons
.
foreach
{
json
=>
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
var
result
=
""
if
(!
exist
){
clos
.
foreach
{
clo
=>
hdfs
.
createNewFile
(
new
Path
(
fileName
))
keySet
.
foreach
{
key
=>
if
(
key
.
toLowerCase
.
equals
(
clo
))
{
result
=
result
+
"\t"
+
json
.
getString
(
key
)
}
}
}
result
=
result
.
drop
(
1
)
+
"\n"
results
=
results
+
result
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/day=$day/000000_0"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
toString
().
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
}
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
}
finally
{
val
inputStream
=
new
ByteArrayInputStream
(
results
.
toString
().
getBytes
())
stmt
.
close
()
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
createPartitionStmt
.
close
()
inputStream
.
close
()
outputStream
.
close
()
println
(
"保存到本地"
)
}
}
}
}
def
writeHiveTable
(
conn
:
Connection
,
tableName
:
String
,
json
:
JSONObject
)
:
Unit
=
{
def
writeHiveTable
(
conn
:
Connection
,
tableName
:
String
,
json
:
JSONObject
)
:
Unit
=
{
println
(
"start insert hive : "
+
DateTime
.
now
())
println
(
"start insert hive : "
+
DateTime
.
now
())
//定位消息,增加区县代码字段及值
//定位消息,增加区县代码字段及值
// if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
// if( tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1202")))
// || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
// || tableName == Const.tableMap(TableKey(Some("0x1200"),Some("0x1203")))){
...
@@ -393,15 +306,15 @@ object SysncHiveBatch extends Sparking with Logging {
...
@@ -393,15 +306,15 @@ object SysncHiveBatch extends Sparking with Logging {
// }
// }
var
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
var
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
val
day
=
if
(
keys
.
contains
(
"dateTime"
))
{
val
day
=
if
(
keys
.
contains
(
"dateTime"
))
{
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
}
else
if
(
keys
.
contains
(
"warnTime"
))
{
}
else
if
(
keys
.
contains
(
"warnTime"
))
{
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
}
else
{
}
else
{
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
}
}
// json.put("partitionday",day)
// json.put("partitionday",day)
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
keys
=
json
.
keySet
().
toArray
(
Array
[
String
]())
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
val
setStmt
=
conn
.
prepareStatement
(
"set set hive.exec.dynamic.partition.mode=nonstrict"
)
val
setStmt
=
conn
.
prepareStatement
(
"set set hive.exec.dynamic.partition.mode=nonstrict"
)
...
@@ -409,7 +322,7 @@ object SysncHiveBatch extends Sparking with Logging {
...
@@ -409,7 +322,7 @@ object SysncHiveBatch extends Sparking with Logging {
s
"""
s
"""
|insert into $tableName
|insert into $tableName
|partition(partitionday)
|partition(partitionday)
|(${keys.map(x
=>
x.toLowerCase()).mkString(",")})
|(${keys.map(x
=>
x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
"""
.
stripMargin
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
val
stmt
=
conn
.
prepareStatement
(
sql
)
...
@@ -419,94 +332,93 @@ object SysncHiveBatch extends Sparking with Logging {
...
@@ -419,94 +332,93 @@ object SysncHiveBatch extends Sparking with Logging {
setStmt
.
execute
()
setStmt
.
execute
()
stmt
.
execute
()
stmt
.
execute
()
info
(
s
"insert date to HiveTable $tableName SUCCESS"
)
info
(
s
"insert date to HiveTable $tableName SUCCESS"
)
}
catch
{
}
catch
{
case
e
:
Exception
=>
{
case
e
:
Exception
=>
{
println
(
"Exception Messages==>"
+
e
)
println
(
"Exception Messages==>"
+
e
)
println
(
s
"hive table $tableName insert data failed==>"
+
json
)
println
(
s
"hive table $tableName insert data failed==>"
+
json
)
}
}
}
finally
{
}
finally
{
createPartitionStmt
.
close
()
createPartitionStmt
.
close
()
setStmt
.
close
()
setStmt
.
close
()
stmt
.
close
()
stmt
.
close
()
}
}
println
(
"insert hive end : "
+
DateTime
.
now
())
println
(
"insert hive end : "
+
DateTime
.
now
())
}
}
def
writeHiveBatch
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
],
dateKey
:
String
)
:
Unit
=
{
def
writeHiveBatch
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
],
dateKey
:
String
)
:
Unit
={
var
map
:
Map
[
String
,
ArrayBuffer
[
JSONObject
]]
=
Map
()
var
map
:
Map
[
String
,
ArrayBuffer
[
JSONObject
]]
=
Map
()
dateKey
match
{
dateKey
match
{
case
"dateTime"
=>
{
case
"dateTime"
=>{
jsonArr
.
foreach
{
json
=>
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
day
=
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
if
(
map
.
contains
(
day
)){
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
map
.
updated
(
day
,
jSONObjects
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
}
else
{
map
.+(
day
->
json
)
map
.+(
day
->
json
)
}
}
}
}
}
case
"warnTime"
=>{
}
jsonArr
.
foreach
{
json
=>
case
"warnTime"
=>
{
val
day
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
jsonArr
.
foreach
{
json
=>
val
day
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
if
(
map
.
contains
(
day
)){
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
}
else
{
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
map
+=(
day
->
jsons
)
}
}
}
case
"bussinessTime"
=>{
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
)){
map
+=
(
day
->
jsons
)
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
map
.+(
day
->
json
)
}
}
}
}
}
}
}
case
"bussinessTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
map
.+(
day
->
json
)
}
}
}
}
val
partitionDays
=
map
.
keySet
val
partitionDays
=
map
.
keySet
partitionDays
.
foreach
{
day
=>
partitionDays
.
foreach
{
day
=>
val
jsons
:
ArrayBuffer
[
JSONObject
]
=
map
.
get
(
day
).
get
val
jsons
:
ArrayBuffer
[
JSONObject
]
=
map
.
get
(
day
).
get
val
keys
=
jsons
(
0
).
keySet
().
toArray
(
Array
[
String
]())
val
keys
=
jsons
(
0
).
keySet
().
toArray
(
Array
[
String
]())
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
createPartitionStmt
.
execute
()
createPartitionStmt
.
execute
()
val
sql
=
val
sql
=
s
"""
s
"""
|insert into $tableName
|insert into $tableName
|partition(partitionday='$day')
|partition(partitionday='$day')
|(${keys.map(x
=>
x.toLowerCase()).mkString(",")})
|(${keys.map(x
=>
x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
"""
.
stripMargin
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
val
stmt
=
conn
.
prepareStatement
(
sql
)
jsons
.
foreach
{
json
=>
jsons
.
foreach
{
json
=>
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
stmt
.
addBatch
()
stmt
.
addBatch
()
}
}
try
{
try
{
stmt
.
executeBatch
()
stmt
.
executeBatch
()
}
catch
{
}
catch
{
case
e
:
Exception
=>
{
case
e
:
Exception
=>
{
println
(
jsons
.
toList
)
println
(
jsons
.
toList
)
println
(
jsons
.
toList
)
println
(
jsons
.
toList
)
println
(
e
)
println
(
e
)
}
}
}
finally
{
}
finally
{
stmt
.
close
()
stmt
.
close
()
createPartitionStmt
.
close
()
createPartitionStmt
.
close
()
}
}
...
@@ -514,83 +426,120 @@ object SysncHiveBatch extends Sparking with Logging {
...
@@ -514,83 +426,120 @@ object SysncHiveBatch extends Sparking with Logging {
}
}
}
}
def
writeHdfs
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
],
dateKey
:
String
)
:
Unit
=
{
def
writeHdfs
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
],
dateKey
:
String
)
:
Unit
=
{
var
map
:
Map
[
String
,
ArrayBuffer
[
JSONObject
]]
=
Map
()
var
map
:
Map
[
String
,
ArrayBuffer
[
JSONObject
]]
=
Map
()
dateKey
match
{
dateKey
match
{
case
"dateTime"
=>{
case
"dateTime"
=>
{
jsonArr
.
foreach
{
json
=>
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
dateTime
=
DateTime
.
parse
(
json
.
getString
(
"dateTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
day
=
dateTime
.
split
(
" "
)(
0
)
json
.
put
(
"dateTime"
,
dateTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
)){
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
}
else
{
map
.+(
day
->
json
)
jsons
.
append
(
json
)
map
+=(
day
->
jsons
)
}
}
}
}
}
}
case
"warnTime"
=>
{
case
"warnTime"
=>
{
jsonArr
.
foreach
{
json
=>
jsonArr
.
foreach
{
json
=>
val
day
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
val
day
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd"
,
Locale
.
CHINESE
)
val
warnTime
=
new
DateTime
(
json
.
getLong
(
"warnTime"
)
*
1000
).
toString
(
"yyyy-MM-dd HH:mm:ss"
,
Locale
.
CHINESE
)
if
(
map
.
contains
(
day
)){
json
.
put
(
"warnTime"
,
warnTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
}
else
{
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
map
+=(
day
->
jsons
)
jsons
.
append
(
json
)
map
+=(
day
->
jsons
)
}
}
}
}
}
}
case
"bussinessTime"
=>{
case
"bussinessTime"
=>
{
jsonArr
.
foreach
{
json
=>
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
bussinessTime
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd HH:mm:ss"
)
json
.
put
(
"bussinessTime"
,
bussinessTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
jsons
.
append
(
json
)
if
(
map
.
contains
(
day
)){
if
(
map
.
contains
(
day
))
{
val
jSONObjects
=
map
.
get
(
day
).
get
.++:(
jsons
)
val
jSONObjects
=
json
+:
map
.
get
(
day
).
get
map
.
updated
(
day
,
jSONObjects
)
map
.
updated
(
day
,
jSONObjects
)
}
else
{
}
else
{
map
.+(
day
->
json
)
jsons
.
append
(
json
)
map
+=(
day
->
jsons
)
}
}
}
}
}
}
}
}
val
partitionDays
=
map
.
keySet
val
partitionDays
=
map
.
keySet
partitionDays
.
foreach
{
day
=>
partitionDays
.
foreach
{
day
=>
val
jsons
:
ArrayBuffer
[
JSONObject
]
=
map
.
get
(
day
).
get
val
jsons
:
ArrayBuffer
[
JSONObject
]
=
map
.
get
(
day
).
get
val
keys
=
jsons
(
0
).
keySet
().
toArray
(
Array
[
String
]())
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionSql
=
s
"alter table ods.$tableName ADD IF NOT EXISTS PARTITION(partitionday='$day')"
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
val
createPartitionStmt
=
conn
.
prepareStatement
(
createPartitionSql
)
createPartitionStmt
.
execute
()
createPartitionStmt
.
execute
()
//val results = new StringBuilder()
var
results
=
""
val
clos
=
new
ArrayBuffer
[
String
]()
val
sql
=
val
sql
=
s
"""
s
"""
|insert into $tableName
|select * from ods.$tableName limit 1
|partition(partitionday='$day')
|(${keys.map(x=>x.toLowerCase()).mkString(",")})
|values (${keys.map(_ => "?").mkString(",")})
"""
.
stripMargin
"""
.
stripMargin
val
stmt
=
conn
.
prepareStatement
(
sql
)
val
stmt
=
conn
.
prepareStatement
(
sql
)
jsons
.
foreach
{
json
=>
try
{
(
1
to
keys
.
length
).
foreach
{
index
=>
stmt
.
setObject
(
index
,
json
.
get
(
keys
(
index
-
1
)))
}
val
rs
=
stmt
.
executeQuery
()
stmt
.
addBatch
()
(
1
to
rs
.
getMetaData
.
getColumnCount
).
foreach
{
x
=>
}
val
column
=
rs
.
getMetaData
.
getColumnName
(
x
).
split
(
"\\."
)(
1
)
try
{
clos
.
append
(
column
)
stmt
.
executeBatch
()
}
catch
{
case
e
:
Exception
=>{
println
(
jsons
.
toList
)
println
(
jsons
.
toList
)
println
(
e
)
}
}
}
finally
{
jsons
.
foreach
{
json
=>
val
keySet
=
json
.
keySet
().
toArray
(
Array
[
String
]())
var
result
=
""
if
(
clos
.
size
!=
keySet
.
size
){
val
unKnownJson
=
new
ArrayBuffer
[
JSONObject
]()
unKnownJson
.
append
(
json
)
writeUnknown
(
conn
,
Const
.
unKnownTable
,
unKnownJson
)
info
(
s
"The JSON field does not correspond to the $tableName field ==> "
+
json
.
toJSONString
)
info
(
s
" $tableName field ==> "
+
clos
.
toList
)
info
(
s
" JSON field ==> "
+
keySet
.
toList
)
}
else
{
clos
.
foreach
{
clo
=>
keySet
.
foreach
{
key
=>
if
(
key
.
toLowerCase
.
equals
(
clo
))
{
result
=
result
+
"\t"
+
json
.
getString
(
key
)
}
}
}
result
=
result
.
drop
(
1
)+
"\n"
results
=
results
+
result
}
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/partitionday=$day/000000_0"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
}
}
finally
{
stmt
.
close
()
stmt
.
close
()
createPartitionStmt
.
close
()
createPartitionStmt
.
close
()
}
}
}
}
}
}
...
...
src/main/scala/com/hikcreate/data/util/Tools.scala
View file @
da710836
...
@@ -77,7 +77,7 @@ object Tools extends Logging{
...
@@ -77,7 +77,7 @@ object Tools extends Logging{
val
startTime
=
DateTime
.
now
()
val
startTime
=
DateTime
.
now
()
val
response
=
Http
(
Const
.
areaCodeAndAddressUrl
).
postData
(
json
.
toJSONString
).
header
(
"content-type"
,
"application/json"
).
asString
val
response
=
Http
(
Const
.
areaCodeAndAddressUrl
).
postData
(
json
.
toJSONString
).
header
(
"content-type"
,
"application/json"
).
asString
val
endTime
=
DateTime
.
now
()
val
endTime
=
DateTime
.
now
()
println
(
"经纬度列表size:"
+
buffer
.
size
+
"===》http response time :"
+
new
Duration
(
startTime
,
endTime
).
getMillis
)
//
println("经纬度列表size:"+buffer.size+"===》http response time :"+new Duration(startTime,endTime).getMillis)
val
body
=
JSON
.
parseObject
(
response
.
body
)
val
body
=
JSON
.
parseObject
(
response
.
body
)
val
items
=
body
.
getJSONObject
(
"result"
).
getJSONArray
(
"regeoItems"
)
val
items
=
body
.
getJSONObject
(
"result"
).
getJSONArray
(
"regeoItems"
)
(
0
until
items
.
size
()).
map
{
index
=>
(
0
until
items
.
size
()).
map
{
index
=>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment