Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
operating-vehicle
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
杜发飞
operating-vehicle
Commits
b67aa811
Commit
b67aa811
authored
Nov 18, 2019
by
杜发飞
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/master'
parents
0347cfbd
5a2d7476
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
64 additions
and
19 deletions
+64
-19
src/main/resources/conf.properties
+1
-1
src/main/scala/com/hikcreate/data/common/Sparking.scala
+2
-2
src/main/scala/com/hikcreate/data/constant/ApolloConst.scala
+1
-1
src/main/scala/com/hikcreate/data/sync/SysncHiveBatch.scala
+37
-13
src/test/scala/Test1.scala
+23
-2
No files found.
src/main/resources/conf.properties
View file @
b67aa811
kafka.bootstrap.servers
=
39.100.49.76:9092
#kafka.bootstrap.servers=10.197.236.154:9092
kafka.zookerper.servers
=
10.197.236.
154
:2181
kafka.zookerper.servers
=
10.197.236.
211
:2181
#,10.197.236.169:2181,10.197.236.184:2181/kafka
#kafka.zookerper.servers=172.26.111.183:2181,172.26.111.178:2181,172.26.111.186:2181/tbd_kafka
#kafka.zookerper.servers=10.197.236.211:2181
...
...
src/main/scala/com/hikcreate/data/common/Sparking.scala
View file @
b67aa811
...
...
@@ -18,8 +18,8 @@ trait Sparking {
.
set
(
"spark.extraListeners"
,
classOf
[
LifecycleListener
].
getName
)
.
set
(
"hive.exec.dynamici.partition"
,
"true"
)
.
set
(
"hive.exec.dynamic.partition.mode"
,
"nonstrict"
)
//
.setAppName("test")
//
.setMaster("local[*]")
.
setAppName
(
"test"
)
.
setMaster
(
"local[*]"
)
def
getKafkaParams
(
servers
:
String
,
groupId
:
String
)
:
Map
[
String
,
Object
]
=
{
Map
[
String
,
Object
](
...
...
src/main/scala/com/hikcreate/data/constant/ApolloConst.scala
View file @
b67aa811
...
...
@@ -8,7 +8,7 @@ object ApolloConst {
val
config
:
Config
=
ConfigService
.
getConfig
(
"application"
)
val
queueSize
:
Int
=
config
.
getIntProperty
(
"queue.size"
,
20
)
var
hdfsUrl
:
String
=
config
.
getProperty
(
"hdfs.url"
,
null
)
var
windowTime
:
Long
=
config
.
getLongProperty
(
"window.time"
,
1L
)
var
compactTopic
:
Seq
[
String
]
=
config
.
getProperty
(
"compact.kafka.topic"
,
null
).
split
(
","
)
...
...
src/main/scala/com/hikcreate/data/sync/SysncHiveBatch.scala
View file @
b67aa811
...
...
@@ -3,6 +3,7 @@ import java.io.ByteArrayInputStream
import
java.net.URI
import
java.sql.
{
Connection
,
ResultSet
}
import
java.util.Locale
import
com.alibaba.fastjson.
{
JSON
,
JSONObject
}
import
com.hikcreate.data.client.DbClient
import
com.hikcreate.data.common.
{
Logging
,
Sparking
}
...
...
@@ -17,22 +18,33 @@ import org.joda.time.format.DateTimeFormat
import
org.apache.hadoop.conf.Configuration
import
org.apache.hadoop.fs.
{
FileSystem
,
Path
}
import
org.apache.hadoop.hdfs.DistributedFileSystem
import
scala.collection.mutable
import
scala.collection.mutable.ArrayBuffer
object
SysncHiveBatch
extends
Sparking
with
Logging
{
val
hdfs
:
DistributedFileSystem
=
new
DistributedFileSystem
()
hdfs
.
initialize
(
URI
.
create
(
ApolloConst
.
hdfsUrl
),
new
Configuration
())
val
hdfsConf
=
new
Configuration
()
hdfsConf
.
setBoolean
(
"dfs.client.block.write.replace-datanode-on-failure.enable"
,
true
)
hdfs
.
initialize
(
URI
.
create
(
ApolloConst
.
hdfsUrl
),
hdfsConf
)
val
queue
=
mutable
.
Queue
[
Int
]()
(
1
to
(
ApolloConst
.
queueSize
)).
map
(
i
=>
queue
.
enqueue
(
i
))
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
ssc
=
new
StreamingContext
(
conf
,
Seconds
(
ApolloConst
.
windowTime
))
val
zkManager
=
ZkManager
(
ApolloConst
.
zkKafka
)
val
kafkaParams
=
getKafkaParams
(
ApolloConst
.
bootstrap
,
ApolloConst
.
hiveGroupId
)
// val kafkaParams = getKafkaParams("10.197.236.154:9092,10.197.236.169:9092,10.197.236.184:9092", ApolloConst.hiveGroupId)
val
offsets
=
zkManager
.
getBeginOffset
(
ApolloConst
.
compactTopic
,
ApolloConst
.
hiveGroupId
)
// val offsets = zkManager.getBeginOffset("relay_hangzhou_test".split(","), ApolloConst.hiveGroupId)
val
offsetRanges
=
new
ArrayBuffer
[
OffsetRange
]()
ssc
.
addStreamingListener
(
new
BatchProcessListener
(
ssc
))
val
inputStream
=
KafkaUtils
.
createDirectStream
[
String
,
String
](
ssc
,
LocationStrategies
.
PreferConsistent
,
//ApolloConst.compactTopic
//"relay_hangzhou_test".split(",")
ConsumerStrategies
.
Subscribe
[
String
,
String
](
ApolloConst
.
compactTopic
,
kafkaParams
,
offsets
))
inputStream
.
transform
{
rdd
=>
offsetRanges
.
clear
()
...
...
@@ -41,7 +53,9 @@ object SysncHiveBatch extends Sparking with Logging {
}.
map
(
x
=>
x
.
value
()).
foreachRDD
{
rdd
=>
if
(!
rdd
.
isEmpty
())
{
val
startTime
=
DateTime
.
now
()
rdd
.
map
(
JSON
.
parseObject
).
groupBy
(
json
=>
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
)))).
foreachPartition
(
x
=>
processRow3
(
x
))
val
rdd1
=
rdd
.
map
(
JSON
.
parseObject
).
groupBy
(
json
=>
TableKey
(
Option
(
json
.
getString
(
"msgId"
)),
Option
(
json
.
getString
(
"dataType"
))))
rdd1
.
foreachPartition
(
x
=>
processRow3
(
x
))
zkManager
.
saveEndOffset
(
offsetRanges
,
ApolloConst
.
hiveGroupId
)
offsetRanges
.
foreach
{
x
=>
println
(
x
)
...
...
@@ -129,7 +143,10 @@ object SysncHiveBatch extends Sparking with Logging {
val
infoJson
=
Tools
.
getInfoContentJsonobj
(
infoStr
)
val
longitude
=
infoJson
.
get
(
"LONGITUDE"
)
val
latitude
=
infoJson
.
get
(
"LATITUDE"
)
val
eventtype
=
infoJson
.
get
(
"EVENT_TYPE"
)
var
eventtype
=
infoJson
.
getString
(
"EVENT_TYPE"
)
if
(
eventtype
.
length
==
3
){
eventtype
=
eventtype
.
replace
(
"x"
,
"x0"
)
}
json
.
put
(
"longitude"
,
infoJson
.
get
(
"LONGITUDE"
))
json
.
put
(
"latitude"
,
infoJson
.
get
(
"LATITUDE"
))
json
.
put
(
"eventtype"
,
infoJson
.
get
(
"EVENT_TYPE"
))
...
...
@@ -150,12 +167,11 @@ object SysncHiveBatch extends Sparking with Logging {
x
.
_2
.
foreach
{
json
=>
jsonArr
.
append
(
json
)
}
writeHdfs
(
db
.
conn
,
ApolloConst
.
tableMap
(
tableKey
),
jsonArr
,
"bus
s
inessTime"
)
writeHdfs
(
db
.
conn
,
ApolloConst
.
tableMap
(
tableKey
),
jsonArr
,
"businessTime"
)
}
}
catch
{
case
e
:
Exception
=>
println
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
println
(
e
)
info
(
"发生插入错误的消息"
+
x
.
_2
.
toString
())
e
.
printStackTrace
()
}
}
...
...
@@ -219,6 +235,7 @@ object SysncHiveBatch extends Sparking with Logging {
}
def
writeUnknown
(
conn
:
Connection
,
tableName
:
String
,
jsonArr
:
ArrayBuffer
[
JSONObject
])
:
Unit
=
{
// this.synchronized{
val
dateTime
=
new
DateTime
().
toString
(
"yyyy-MM-dd HH:mm:ss"
)
val
day
=
new
DateTime
().
toString
(
"yyyy-MM-dd"
)
val
sourceJsons
=
new
ArrayBuffer
[
JSONObject
]()
...
...
@@ -260,22 +277,27 @@ object SysncHiveBatch extends Sparking with Logging {
results
=
results
+
result
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/day=$day/000000_0"
if
(
results
.
trim
().
size
>
0
&&
results
!=
null
)
{
val
i
=
queue
.
dequeue
()
val
fileName
=
s
"/hive/ODS.db/$tableName/partitionday=$day/000000_$i"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
}
val
outputStream
=
hdfs
.
append
(
new
Path
(
fileName
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
toString
().
getBytes
(
"UTF-8"
))
val
inputStream
=
new
ByteArrayInputStream
(
results
.
getBytes
(
"UTF-8"
))
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
queue
.
enqueue
(
i
)
}
}
finally
{
stmt
.
close
()
createPartitionStmt
.
close
()
}
// }
}
def
writeHiveTable
(
conn
:
Connection
,
tableName
:
String
,
json
:
JSONObject
)
:
Unit
=
{
...
...
@@ -435,11 +457,11 @@ object SysncHiveBatch extends Sparking with Logging {
}
}
}
case
"bus
s
inessTime"
=>
{
case
"businessTime"
=>
{
jsonArr
.
foreach
{
json
=>
val
day
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd"
)
val
bus
s
inessTime
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd HH:mm:ss"
)
json
.
put
(
"bus
sinessTime"
,
bus
sinessTime
)
val
businessTime
=
DateTime
.
parse
(
json
.
getString
(
"businessTime"
),
DateTimeFormat
.
forPattern
(
"yyyy-MM-dd HH:mm:ss"
)).
toString
(
"yyyy-MM-dd HH:mm:ss"
)
json
.
put
(
"bus
inessTime"
,
bu
sinessTime
)
json
.
put
(
"partitionday"
,
day
)
val
jsons
=
new
ArrayBuffer
[
JSONObject
]()
jsons
.
append
(
json
)
...
...
@@ -498,7 +520,8 @@ object SysncHiveBatch extends Sparking with Logging {
}
results
=
results
+
"\n"
if
(
results
.
trim
.
size
>
0
&&
results
!=
null
)
{
val
fileName
=
s
"/hive/ODS.db/$tableName/partitionday=$day/000000_0"
val
i
=
queue
.
dequeue
()
val
fileName
=
s
"/hive/ODS.db/$tableName/partitionday=$day/000000_$i"
val
exist
=
HDFSHelper
.
exists
(
hdfs
,
fileName
)
if
(!
exist
)
{
hdfs
.
createNewFile
(
new
Path
(
fileName
))
...
...
@@ -508,6 +531,7 @@ object SysncHiveBatch extends Sparking with Logging {
HDFSHelper
.
transport
(
inputStream
,
outputStream
)
inputStream
.
close
()
outputStream
.
close
()
queue
.
enqueue
(
i
)
}
}
finally
{
stmt
.
close
()
...
...
src/test/scala/Test1.scala
View file @
b67aa811
import
com.hikcreate.data.util.Tools
import
scala.collection.mutable
object
Test1
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
a
=
Tools
.
getAddressAndLocationCode
(
120.485443
,
30.183996
)
println
(
a
.
_1
+
"----"
+
a
.
_2
)
// val a = Tools.getAddressAndLocationCode(120.485443,30.183996)
// println(a._1+"----"+a._2)
// var eventtype="0x2"
// eventtype = eventtype.replace("x","x0")
// println(eventtype)
val
list
=
(
1
to
(
20
)).
toList
val
queue
=
mutable
.
Queue
[
Int
]()
(
1
to
(
20
)).
foreach
(
i
=>
queue
.
enqueue
(
i
))
val
i
=
queue
.
dequeue
()
println
(
i
)
println
(
queue
)
queue
.
enqueue
(
i
)
println
(
queue
)
// val dequeue = queue
// println(dequeue._1)
// println(queue)
//val queue1 = queue.enqueue(dequeue._1)
//println(queue1)
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment