Commit ce2a82c6 by 李辅翼

v3

parent 59759779
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<hbase.version>1.2.0-cdh5.15.1</hbase.version>
</properties> </properties>
...@@ -28,6 +29,29 @@ ...@@ -28,6 +29,29 @@
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-annotation_1.0_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jaspic_1.0_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
...@@ -35,12 +59,18 @@ ...@@ -35,12 +59,18 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!--<dependency>-->
<!--<groupId>oracle</groupId>-->
<!--<artifactId>ojdbc6</artifactId>-->
<!--<version>11.2.0.3</version>-->
<!--<scope>system</scope>-->
<!--<systemPath>${project.basedir}/src/main/resources/lib/ojdbc6-11.2.0.3.jar</systemPath>-->
<!--</dependency>-->
<dependency> <dependency>
<groupId>oracle</groupId> <groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId> <artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version> <version>11.2.0.3</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/ojdbc6-11.2.0.3.jar</systemPath>
</dependency> </dependency>
<dependency> <dependency>
...@@ -56,6 +86,14 @@ ...@@ -56,6 +86,14 @@
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId> <artifactId>servlet-api</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-annotation_1.0_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jaspic_1.0_spec</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>
...@@ -63,6 +101,14 @@ ...@@ -63,6 +101,14 @@
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId> <groupId>redis.clients</groupId>
<artifactId>jedis</artifactId> <artifactId>jedis</artifactId>
<version>2.9.0</version> <version>2.9.0</version>
...@@ -80,6 +126,9 @@ ...@@ -80,6 +126,9 @@
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.51</version> <version>1.2.51</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
...@@ -91,11 +140,6 @@ ...@@ -91,11 +140,6 @@
<version>1.26.5</version> <version>1.26.5</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -2,16 +2,25 @@ package com.hikcreate.controller; ...@@ -2,16 +2,25 @@ package com.hikcreate.controller;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.hikcreate.drv_photo_pic.DrvPhoto; import com.hikcreate.drv_photo_pic.DrvPhoto;
import com.hikcreate.drv_photo_pic.VioPic;
import com.hikcreate.entity.PicByte;
import com.hikcreate.service.fdfs.service.FileService; import com.hikcreate.service.fdfs.service.FileService;
import com.hikcreate.utils.DateUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.client.RestTemplate;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.imageio.stream.FileImageOutputStream;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.*; import java.io.*;
import java.util.Date;
import java.util.Map;
import org.springframework.http.*;
/** /**
...@@ -21,60 +30,69 @@ import java.io.*; ...@@ -21,60 +30,69 @@ import java.io.*;
*/ */
@RestController @RestController
@RequestMapping("/pic") @RequestMapping("/pic")
@Validated
public class PicController { public class PicController {
@Autowired @Autowired
private VioPic vioPic;
@Autowired
private DrvPhoto drvPhoto; private DrvPhoto drvPhoto;
@Autowired @Autowired
private FileService fileService; private FileService fileService;
@Value("${ftpUrl}")
private String ftpUrl;
@GetMapping("/drvPhotoHis") @GetMapping("/drvPhotoHis")
public void getHisDrvPhoto(@RequestParam("time") String time, HttpServletResponse response){ public void getHisDrvPhoto(@RequestParam("time") String time, HttpServletResponse response) {
try { try {
boolean result=drvPhoto.getHisDrvPhoto(time); boolean result = drvPhoto.getHisDrvPhoto(time);
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("result",result); jsonObject.put("result", result);
response.setContentType("application/json;charset=UTF-8"); response.setContentType("application/json;charset=UTF-8");
response.getWriter().write(jsonObject.toJSONString()); response.getWriter().write(jsonObject.toJSONString());
}catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@GetMapping("/testHttp")
@GetMapping("/testFastDfs") public void testHttp(@RequestParam("param") String param, HttpServletResponse response) {
public void testFastDfs(@RequestParam("url") String url, HttpServletResponse response){
try { try {
File file=new File("C:\\Users\\lifuyi5\\Downloads\\2019-05-15"); RestTemplate restTemplate = new RestTemplate();
FileInputStream fis = new FileInputStream(file); MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
ByteArrayOutputStream bos = new ByteArrayOutputStream(fis.available()); params.add("urls", param);
byte[] b = new byte[1024]; HttpHeaders headers = new HttpHeaders();
int len = -1; HttpMethod method = HttpMethod.POST;
while((len = fis.read(b)) != -1) { headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
bos.write(b, 0, len); HttpEntity<MultiValueMap<String, String>> requestEntity = new HttpEntity<>(params, headers);
ResponseEntity<PicByte> resp = restTemplate.exchange(ftpUrl, method, requestEntity, PicByte.class);
Map<String, byte[]> map = resp.getBody().getMap();
//遍历Map
String path = "/home/pic/";
File file;
for (Map.Entry<String, byte[]> entry : map.entrySet()) {
String[] split = entry.getKey().split("/");
file = new File(path + split[split.length - 1]);
if (file.exists()) {
file.delete();
}
FileImageOutputStream imageOutput = new FileImageOutputStream(file);
imageOutput.write(entry.getValue(), 0, entry.getValue().length);
imageOutput.close();
} }
byte[] fileByte = bos.toByteArray();
String urlFast = fileService.uploadFile(fileByte, "2019-05-15");
response.setContentType("application/json;charset=UTF-8"); response.setContentType("application/json;charset=UTF-8");
response.getWriter().write(urlFast); response.getWriter().write(path);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@GetMapping("/testUrl") @GetMapping("/testIncVio")
public void testUrl(@RequestParam("url") String url, HttpServletResponse response){ public void testFastDfs(@RequestParam("past") int past) {
String date = DateUtil.getDate();
try { vioPic.getIncrementVioPic(DateUtil.formatDate(DateUtil.getPastDate(new Date(),-past)),date);
System.out.println("dddddddddddddd");
response.setContentType("application/json;charset=UTF-8");
response.getWriter().write("cjdncvjf");
} catch (IOException e) {
e.printStackTrace();
}
} }
} }
...@@ -12,4 +12,6 @@ public interface DrvPhoto { ...@@ -12,4 +12,6 @@ public interface DrvPhoto {
boolean getHisDrvPhoto(String time); boolean getHisDrvPhoto(String time);
void getIncrementDrvPhoto();
} }
package com.hikcreate.drv_photo_pic;
public interface VioPic {
void getIncrementVioPic(String startDay,String endDay);
}
package com.hikcreate.drv_photo_pic.impl; package com.hikcreate.drv_photo_pic.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.hikcreate.drv_photo_pic.DrvPhoto; import com.hikcreate.drv_photo_pic.DrvPhoto;
import com.hikcreate.service.fdfs.service.FileService;
import com.hikcreate.utils.DateUtil; import com.hikcreate.utils.DateUtil;
import com.hikcreate.utils.redis.RedisClientUtil;
import oracle.sql.BLOB; import oracle.sql.BLOB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.Connection; import java.io.BufferedInputStream;
import java.sql.DriverManager; import java.io.IOException;
import java.sql.PreparedStatement; import java.sql.*;
import java.sql.ResultSet; import java.util.Date;
import java.util.zip.CRC32;
/** /**
...@@ -22,15 +34,20 @@ import java.sql.ResultSet; ...@@ -22,15 +34,20 @@ import java.sql.ResultSet;
@Service("drvPhotoImpl") @Service("drvPhotoImpl")
public class DrvPhotoImpl implements DrvPhoto { public class DrvPhotoImpl implements DrvPhoto {
@Autowired
private RedisClientUtil redisClientUtil;
private static Logger logger = LoggerFactory.getLogger(DrvPhotoImpl.class); private static Logger logger = LoggerFactory.getLogger(DrvPhotoImpl.class);
@Value("${url}") @Value("${url}")
private static String url; private String url;
@Value("${username}") @Value("${username}")
private static String username; private String username;
@Value("${password}") @Value("${password}")
private static String password; private String password;
@Value("${roundDay}") @Value("${roundDay}")
private static int roundDay; private int roundDay;
@Autowired
private FileService fileService;
private static String startTime; private static String startTime;
private static String endTime; private static String endTime;
...@@ -46,6 +63,25 @@ public class DrvPhotoImpl implements DrvPhoto { ...@@ -46,6 +63,25 @@ public class DrvPhotoImpl implements DrvPhoto {
private static String mqzp; private static String mqzp;
private static String bjcsbj; private static String bjcsbj;
private static Configuration hbaseConf;
private static org.apache.hadoop.hbase.client.Connection hbaseConn;
private static TableName guizhouTablename;
private static TableName guiyanTablename;
private static Table guizhoutable;
private static Table guiyantable;
@Value("${hbase.zookeeper.property.clientPort}")
private String hbaseAddress;
@Value("${hbase.zookeeper.quorum}")
private String hbaseZkQuorum;
@Value("${hbase.master}")
private String hbaseMaster;
@Value("${hbase.drv_photo.table}")
private String driverPhotoTable;
@Value("${hbase.driverlicense.table}")
private String driverTable;
private static CRC32 crc = new CRC32();
/** /**
* 同步历史图片,图片同步到time时间 * 同步历史图片,图片同步到time时间
* *
...@@ -53,7 +89,12 @@ public class DrvPhotoImpl implements DrvPhoto { ...@@ -53,7 +89,12 @@ public class DrvPhotoImpl implements DrvPhoto {
* @return * @return
*/ */
public boolean getHisDrvPhoto(String time) { public boolean getHisDrvPhoto(String time) {
hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseAddress);
hbaseConf.set("hbase.zookeeper.quorum",hbaseZkQuorum);
hbaseConf.set("hbase.master",hbaseMaster);
try { try {
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
endTime = time; endTime = time;
Class.forName("oracle.jdbc.OracleDriver"); Class.forName("oracle.jdbc.OracleDriver");
Connection connection1 = DriverManager.getConnection(url, username, password); Connection connection1 = DriverManager.getConnection(url, username, password);
...@@ -70,6 +111,7 @@ public class DrvPhotoImpl implements DrvPhoto { ...@@ -70,6 +111,7 @@ public class DrvPhotoImpl implements DrvPhoto {
//循环获取图片 //循环获取图片
String sql; String sql;
String lastTime; String lastTime;
String rowkey;
while (endTime.compareTo(startTime) > 0 || endTime.compareTo(startTime) == 0) { while (endTime.compareTo(startTime) > 0 || endTime.compareTo(startTime) == 0) {
lastTime = DateUtil.formatDate(DateUtil.getPastDate(DateUtil.parseDate(endTime), roundDay)); lastTime = DateUtil.formatDate(DateUtil.getPastDate(DateUtil.parseDate(endTime), roundDay));
Connection connection = DriverManager.getConnection(url, username, password); Connection connection = DriverManager.getConnection(url, username, password);
...@@ -77,41 +119,181 @@ public class DrvPhotoImpl implements DrvPhoto { ...@@ -77,41 +119,181 @@ public class DrvPhotoImpl implements DrvPhoto {
+ endTime + "' and ZP is not null and fzjg is not null and sfzmhm is not null"; + endTime + "' and ZP is not null and fzjg is not null and sfzmhm is not null";
PreparedStatement pstm = connection.prepareStatement(sql); PreparedStatement pstm = connection.prepareStatement(sql);
ResultSet resultSet = pstm.executeQuery(); ResultSet resultSet = pstm.executeQuery();
guizhouTablename=TableName.valueOf(driverPhotoTable);
guizhoutable=hbaseConn.getTable(guizhouTablename);
guiyanTablename=TableName.valueOf(driverTable);
guiyantable=hbaseConn.getTable(guiyanTablename);
while (resultSet.next()) { while (resultSet.next()) {
sfzmhm=resultSet.getString("SFZMHM"); saveToHbase(resultSet,fileService);
zp= (BLOB) resultSet.getBlob("ZP");
gxsj=resultSet.getString("GXSJ");
xzqh=resultSet.getString("XZQH");
flag=resultSet.getString("FLAG");
xh=resultSet.getString("XH");
rksj=resultSet.getString("RKSJ");
fzjg=resultSet.getString("FZJG");
} }
endTime = lastTime; endTime = lastTime;
resultSet.close(); resultSet.close();
pstm.close(); pstm.close();
connection.close(); connection.close();
Thread.sleep(1000);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
if(guizhoutable!=null){
try {
guizhoutable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(guiyantable!=null){
try {
guiyantable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(hbaseConn!=null){
try {
hbaseConn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} }
return true; return true;
} }
/**
* 每天增量同步
*/
@Override
public void getIncrementDrvPhoto() {
hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseAddress);
hbaseConf.set("hbase.zookeeper.quorum",hbaseZkQuorum);
hbaseConf.set("hbase.master",hbaseMaster);
try {
Class.forName("oracle.jdbc.OracleDriver");
Connection connection = DriverManager.getConnection(url, username, password);
String sql="select * from GYJG.DRV_PHOTO where GXSJ between sysdate-1 and sysdate and ZP is not null and fzjg is not null and sfzmhm is not null";
PreparedStatement pstm = connection.prepareStatement(sql);
ResultSet resultSet = pstm.executeQuery();
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
guizhouTablename=TableName.valueOf(driverPhotoTable);
guizhoutable=hbaseConn.getTable(guizhouTablename);
guiyanTablename=TableName.valueOf(driverTable);
guiyantable=hbaseConn.getTable(guiyanTablename);
while (resultSet.next()) {
saveToHbase(resultSet,fileService);
}
public static void main(String[] args) { }catch (Exception e) {
String s = "2019-07-02"; e.printStackTrace();
String s1 = DateUtil.formatDate(DateUtil.getPastDate(DateUtil.parseDate(s), -4)); } finally {
System.out.println(s1); if(guizhoutable!=null){
String time = "2019-07-02 00:00:00"; try {
System.out.println(time.substring(0, 10)); guizhoutable.close();
System.out.println(s.compareTo(time.substring(0, 10))); } catch (IOException e) {
e.printStackTrace();
}
}
if(guiyantable!=null){
try {
guiyantable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(hbaseConn!=null){
try {
hbaseConn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void saveToHbase(ResultSet resultSet,FileService fileService) throws Exception {
paramResultSet(resultSet);
//存fastdfs
String fastDfsUrl = uploadFastDfs(fileService);
//存hbase
//贵州省库
//rowkey
saveGuiZhouHbase(fastDfsUrl);
//贵阳市库
if("贵A".equals(fzjg)){
String rowkey=sfzmhm;
Put put=new Put(rowkey.getBytes());
put.addColumn("info".getBytes(),"picUrl".getBytes(),fastDfsUrl.getBytes());
guiyantable.put(put);
}
}
public static String getPrefixString(long prefixLong) {
if (prefixLong < 10L) {
return "00" + prefixLong;
} else if (prefixLong >= 10L && prefixLong < 100L) {
return "0" + prefixLong;
} else {
return "" + prefixLong;
}
}
private static void saveGuiZhouHbase(String fastDfsUrl) throws IOException {
//rowkey
String rowkey;
crc.update(sfzmhm.getBytes());
long prefixLong=Math.abs(crc.getValue())%300;
String prefixString=getPrefixString(prefixLong);
if(gxsj.length()>=19){
rowkey=prefixString+"-"+gxsj.substring(0,19)+"-"+sfzmhm;
}else {
rowkey=prefixString+"-"+gxsj+"-"+sfzmhm;
}
Put put=new Put(rowkey.getBytes());
put.addColumn("info".getBytes(),"idCard".getBytes(),sfzmhm.getBytes());
put.addColumn("info".getBytes(),"picUrl".getBytes(),fastDfsUrl.getBytes());
put.addColumn("info".getBytes(),"updateTime".getBytes(),(gxsj==null?"null":gxsj).getBytes());
put.addColumn("info".getBytes(),"administrativeDivision".getBytes(),(xzqh==null?"null":xzqh).getBytes());
put.addColumn("info".getBytes(),"flag".getBytes(),(flag==null?"null":flag).getBytes());
put.addColumn("info".getBytes(),"serialNumber".getBytes(),(xh==null?"null":xh).getBytes());
put.addColumn("info".getBytes(),"insertTime".getBytes(),(rksj==null?"null":rksj).getBytes());
put.addColumn("info".getBytes(),"licensingAuthority".getBytes(),(fzjg==null?"null":fzjg).getBytes());
guizhoutable.put(put);
}
private synchronized static String uploadFastDfs(FileService fileService) throws Exception {
//bolb转byte
BufferedInputStream is = new BufferedInputStream(zp.getBinaryStream());
byte[] bytes = new byte[(int) zp.length()];
int len = bytes.length;
int offset = 0;
int read = 0;
while (offset < len && (read = is.read(bytes, offset, len - offset)) >= 0) {
offset += read;
}
//存fastdfs
String fastDfsUrl = fileService.uploadFile(bytes, sfzmhm + gxsj + ".jpeg");
is.close();
return fastDfsUrl;
}
private synchronized static void paramResultSet(ResultSet resultSet) throws SQLException {
sfzmhm=resultSet.getString("SFZMHM");
zp= (BLOB) resultSet.getBlob("ZP");
gxsj=resultSet.getString("GXSJ");
xzqh=resultSet.getString("XZQH");
flag=resultSet.getString("FLAG");
xh=resultSet.getString("XH");
rksj=resultSet.getString("RKSJ");
fzjg=resultSet.getString("FZJG");
}
public static void main(String[] args) {
String s = "{\"captureType\":\"1\",\"carColor\":\"11\",\"carLogo\":\"0\",\"carSpeed\":\"38.00\",\"conLevel\":\"?\",\"dataSource\":\"1\",\"departmentCode\":\"\",\"devCode\":\"520100000900015905\",\"directionCode\":\"4\",\"firmCode\":\"02\",\"jyw\":\"-1\",\"kafkaSource\":\"2\",\"locationDesc\":\"朝阳洞路与望城路交叉口\",\"monitorTime1\":\"2019-07-11 13:49:53.000\",\"monitorTime2\":\"?\",\"monitorTime3\":\"?\",\"monitorTime4\":\"?\",\"picFeatures\":\"\",\"picUrl1\":\"ftp://vion9:vion9@52.1.123.218:21/52.3.120.233/kk/2019-07-11/13/2019071113495366000000060.jpg\",\"picUrl2\":\"\",\"picUrl3\":\"\",\"picsNum\":\"1\",\"plateColor\":\"4\",\"plateNum\":\"无牌\",\"plateType\":\"00\",\"roadNumCode\":\"3\",\"watchTime\":\"2019-07-11 13:49:53.000\"}";
JSONObject jsonObject = JSONObject.parseObject(new String(s));
jsonObject.put("type", "oc");
System.out.println(jsonObject.toJSONString());
} }
} }
package com.hikcreate.drv_photo_pic.impl;
import com.hikcreate.drv_photo_pic.VioPic;
import com.hikcreate.entity.PicByte;
import com.hikcreate.service.fdfs.service.FileService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.sql.*;
import java.sql.Connection;
import java.util.Map;
@Service("vioPicImpl")
public class VioPicImpl implements VioPic {
private static Logger logger = LoggerFactory.getLogger(VioPicImpl.class);
@Value("${hive.url}")
private String hiveUrl;
@Value("${hive.user}")
private String hiveUser;
@Value("${hive.password}")
private String hivePassWord;
@Value("${increment.vio.pic.sql}")
private String incrementVioPicSql;
@Value("${hbase.zookeeper.property.clientPort}")
private String hbaseAddress;
@Value("${hbase.zookeeper.quorum}")
private String hbaseZkQuorum;
@Value("${hbase.master}")
private String hbaseMaster;
@Value("${hbase.vio.table}")
private String vioTableStr;
@Value("${ftpUrl}")
private String ftpUrl;
@Autowired
private static FileService fileService;
private String rowkey;
private static TableName vioTableName;
private static Table vioTable;
private static Configuration hbaseConf;
private static org.apache.hadoop.hbase.client.Connection hbaseConn;
private static String hphm;
private static String hpzl;
private static String wfsj;
private static String url1;
private static String url2;
private static String url3;
private static Put put;
private static MultiValueMap<String, String> params;
private static HttpEntity<MultiValueMap<String, String>> requestEntity;
private static HttpHeaders headers;
private static HttpMethod method;
private static RestTemplate restTemplate;
private static String[] colValue;
/**
* 增量同步违法图片
*/
@Override
public synchronized void getIncrementVioPic(String startDay, String endDay) {
try {
String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
Class.forName(JDBC_DRIVER);
logger.info(hiveUrl + "-----------" + hivePassWord + "--------" + hiveUser);
Connection connection = DriverManager.getConnection(hiveUrl, hiveUser, hivePassWord);
PreparedStatement pstm = connection.prepareStatement(incrementVioPicSql);
pstm.setString(1, endDay);
pstm.setString(2, startDay);
// incrementVioPicSql="SELECT a.ccarnumber hphm,a.clicensetype hpzl,b.wfsj wfsj,a.cpic1path url1,a.cpic2path url2,a.cpic3path url3 from " +
// "(SELECT * from kakou.vio_violation_pic_his_ods WHERE substr(export_time,0,10)<='"+endDay+"' and substr(export_time,0,10)>'"+startDay+"') a " +
// "INNER JOIN (SELECT * from default.vio_surveil_all WHERE clsj='null' or clsj='' or clsj is null) b WHERE a.ccarnumber=b.hphm " +
// "and a.clicensetype=b.hpzl and substr(a.dillegaldate,0,16)=substr(b.wfsj,0,16)";
// PreparedStatement pstm = connection.prepareStatement(incrementVioPicSql);
logger.info(pstm.toString());
logger.info(startDay);
logger.info(endDay);
ResultSet resultSet = pstm.executeQuery();
hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseAddress);
hbaseConf.set("hbase.zookeeper.quorum", hbaseZkQuorum);
hbaseConf.set("hbase.master", hbaseMaster);
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
logger.info("success connection --------------------------");
vioTableName = TableName.valueOf(vioTableStr);
vioTable = hbaseConn.getTable(vioTableName);
restTemplate = new RestTemplate();
headers = new HttpHeaders();
method = HttpMethod.POST;
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
logger.info("dgewhdjdddddddddddddd-------------");
loop1:
while (resultSet.next()) {
logger.info("数据库查询出东西了------------");
hphm = resultSet.getString("hphm");
hpzl = resultSet.getString("hpzl");
wfsj = resultSet.getString("wfsj");
url1 = resultSet.getString("url1");
url2 = resultSet.getString("url2");
url3 = resultSet.getString("url3");
rowkey = hphm + "#" + hpzl + "#" + wfsj;
Get get = new Get(rowkey.getBytes());
put = new Put(rowkey.getBytes());
get.addColumn(Bytes.toBytes("useful"), Bytes.toBytes("ftpUrl1"));
get.addColumn(Bytes.toBytes("useful"), Bytes.toBytes("ftpUrl2"));
get.addColumn(Bytes.toBytes("useful"), Bytes.toBytes("ftpUrl3"));
get.addColumn(Bytes.toBytes("useful"), Bytes.toBytes("ftpUrl4"));
get.addColumn(Bytes.toBytes("useful"), Bytes.toBytes("ftpUrl5"));
get.addColumn(Bytes.toBytes("useful"), Bytes.toBytes("ftpUrl6"));
Result result = vioTable.get(get);
colValue = new String[6];
for (Cell cell : result.rawCells()) {
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
//保存值
switch (colName) {
case "ftpUrl1":
colValue[0] = value;
case "ftpUrl2":
colValue[1] = value;
case "ftpUrl3":
colValue[2] = value;
case "ftpUrl4":
colValue[3] = value;
case "ftpUrl5":
colValue[4] = value;
case "ftpUrl6":
colValue[5] = value;
}
}
logger.info("查询get结果:"+colValue.toString());
//插入数据操作
//url1
if (url1 != null && !"".equals(url1)) {
//判断之前是否已经插入过图片了,防止多次插入
if (url1.equals(colValue[0]) || url1.equals(colValue[3])) {
continue loop1;
}
if (colValue[0] == null || "".equals(colValue[0])) {
//url1插入ftpurl1
putData("ftpUrl1", "fastDfsUrl1", url1, ftpUrl);
} else {
if (colValue[3] == null || "".equals(colValue[3])) {
//url1插入ftpurl4
putData("ftpUrl4", "fastDfsUrl4", url1, ftpUrl);
}
}
}
if (url2 != null && !"".equals(url2)) {
if (url2.equals(colValue[1]) || url2.equals(colValue[4])) {
continue loop1;
}
if (colValue[1] == null || "".equals(colValue[1])) {
//url1插入ftpurl1
putData("ftpUrl2", "fastDfsUrl2", url2, ftpUrl);
} else {
if (colValue[4] == null || "".equals(colValue[4])) {
//url1插入ftpurl4
putData("ftpUrl5", "fastDfsUrl5", url2, ftpUrl);
}
}
}
if (url3 != null && !"".equals(url3)) {
if (url3.equals(colValue[2]) || url3.equals(colValue[5])) {
continue loop1;
}
if (colValue[2] == null || "".equals(colValue[2])) {
//url1插入ftpurl1
putData("ftpUrl3", "fastDfsUrl3", url3, ftpUrl);
} else {
if (colValue[5] == null || "".equals(colValue[5])) {
//url1插入ftpurl4
putData("ftpUrl6", "fastDfsUrl6", url3, ftpUrl);
}
}
}
vioTable.put(put);
}
resultSet.close();
pstm.close();
connection.close();
logger.info("执行完毕:---------------------");
} catch (Exception e) {
e.printStackTrace();
Writer writer = new StringWriter();
e.printStackTrace(new PrintWriter(writer));
logger.error("打印错误:" + writer.toString());
} finally {
if (vioTable != null) {
try {
vioTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (hbaseConn != null) {
try {
hbaseConn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void putData(String hbaseFtpCol, String hbaseFastCol, String url, String ftpUrl) {
put.addColumn("useful".getBytes(), hbaseFtpCol.getBytes(), url.getBytes());
//存fastDfs
params = new LinkedMultiValueMap<String, String>();
params.add("urls", url);
requestEntity = new HttpEntity<>(params, headers);
ResponseEntity<PicByte> resp = restTemplate.exchange(ftpUrl, method, requestEntity, PicByte.class);
Map<String, byte[]> map = resp.getBody().getMap();
if(map.size()>0){
for (Map.Entry<String, byte[]> entry : map.entrySet()) {
String[] arr = url.split("/");
String fastDfsUrl = fileService.uploadFile(entry.getValue(), arr[arr.length - 1]);
logger.info("fastdfs地址"+fastDfsUrl);
put.addColumn("useful".getBytes(), hbaseFastCol.getBytes(), fastDfsUrl.getBytes());
}
}else {
logger.info("此ftp地址数据为空:=========================="+url);
}
}
}
package com.hikcreate.entity;
import lombok.Data;
import java.util.Map;
@Data
public class PicByte {
Map<String,byte[]> map;
}
package com.hikcreate.schedul;
import com.hikcreate.drv_photo_pic.DrvPhoto;
import com.hikcreate.drv_photo_pic.VioPic;
import com.hikcreate.utils.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class PicSchedule {
@Autowired
private DrvPhoto drvPhoto;
@Autowired
private VioPic vioPic;
@Value("${pastDay}")
private int pastDay;
/**
* 每天早上10点同步增量数据
*/
@Scheduled(cron = "0 10 * * * *")
public void getIncrementDrvPhoto(){
drvPhoto.getIncrementDrvPhoto();
}
// @Scheduled(cron = "0 10 * * * *")
public void getIncrementVioPic(){
String date = DateUtil.getDate();
vioPic.getIncrementVioPic(date,DateUtil.formatDate(DateUtil.getPastDate(new Date(),-pastDay)));
}
}
package com.hikcreate.service.fdfs.service.impl; package com.hikcreate.service.fdfs.service.impl;
import com.github.tobato.fastdfs.domain.fdfs.StorePath;
import com.github.tobato.fastdfs.domain.upload.FastImageFile;
import com.github.tobato.fastdfs.service.FastFileStorageClient; import com.github.tobato.fastdfs.service.FastFileStorageClient;
import com.hikcreate.service.fdfs.FastDFSClient; import com.hikcreate.service.fdfs.FastDFSClient;
import com.hikcreate.service.fdfs.service.FileService; import com.hikcreate.service.fdfs.service.FileService;
import org.apache.commons.io.FilenameUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -39,6 +35,7 @@ public class FileServiceImpl implements FileService { ...@@ -39,6 +35,7 @@ public class FileServiceImpl implements FileService {
return "/group" + path.split("group")[1]; return "/group" + path.split("group")[1];
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
logger.error("Upload Img Error, msg ={}", e); logger.error("Upload Img Error, msg ={}", e);
throw new RuntimeException("上传图片失败"); throw new RuntimeException("上传图片失败");
} }
......
package com.hikcreate.utils;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
public class HttpUtils {
public static String sendPost(String url, String param) {
PrintWriter out = null;
BufferedReader in = null;
String result = "";
try {
URL realUrl = new URL(url);
// 打开和URL之间的连接
URLConnection conn = realUrl.openConnection();
// 设置通用的请求属性
conn.setRequestProperty("accept", "*/*");
conn.setRequestProperty("connection", "Keep-Alive");
conn.setRequestProperty("user-agent",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
// 发送POST请求必须设置如下两行
conn.setDoOutput(true);
conn.setDoInput(true);
// 获取URLConnection对象对应的输出流
out = new PrintWriter(conn.getOutputStream());
// 发送请求参数
out.print(param);
// flush输出流的缓冲
out.flush();
// 定义BufferedReader输入流来读取URL的响应
in = new BufferedReader(
new InputStreamReader(conn.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
} catch (Exception e) {
System.out.println("发送 POST 请求出现异常!"+e);
e.printStackTrace();
}
//使用finally块来关闭输出流、输入流
finally{
try{
if(out!=null){
out.close();
}
if(in!=null){
in.close();
}
}
catch(IOException ex){
ex.printStackTrace();
}
}
return result;
}
public static String HttpPostWithJson(String url, String json) {
String returnValue = "这是默认返回值,接口调用失败";
CloseableHttpClient httpClient = HttpClients.createDefault();
ResponseHandler<String> responseHandler = new BasicResponseHandler();
try {
//第一步:创建HttpClient对象
httpClient = HttpClients.createDefault();
//第二步:创建httpPost对象
HttpPost httpPost = new HttpPost(url);
//第三步:给httpPost设置JSON格式的参数
StringEntity requestEntity = new StringEntity(json, "utf-8");
requestEntity.setContentEncoding("UTF-8");
httpPost.setHeader("Content-type", "application/json");
httpPost.setEntity(requestEntity);
//第四步:发送HttpPost请求,获取返回值
//调接口获取返回值时,必须用此方法
returnValue = httpClient.execute(httpPost, responseHandler);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
httpClient.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//第五步:处理返回值
return returnValue;
}
}
package com.hikcreate.utils.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* @auther: lifuyi
* @date: 2019/1/31 10:55
* @description:
*/
public class PropertyUtil {
private static final Logger logger = LoggerFactory.getLogger(PropertyUtil.class);
private static Properties props;
static {
loadProps("common-config.properties");
}
public static String getProperty(String key) {
if (null == props) {
throw new NullPointerException("props is null");
}
return props.getProperty(key);
}
public static String getProperty(String key, String defaultValue) {
if (null == props) {
throw new NullPointerException("props is null");
}
return props.getProperty(key, defaultValue);
}
private Properties getProps(String path) {
if (props == null) {
loadProps(path);
}
return props;
}
private synchronized static void loadProps(String path) {
props = new Properties();
InputStream in = null;
try {
path = path;
in = PropertyUtil.class.getClassLoader().getResourceAsStream(path);
props.load(in);
} catch (FileNotFoundException e) {
logger.error("loadProps error", e);
} catch (IOException e) {
logger.error("loadProps error", e);
} finally {
try {
if (null != in) {
in.close();
}
} catch (IOException e) {
}
}
}
}
package com.hikcreate.utils.redis; package com.hikcreate.utils.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCluster;
import java.util.List; import java.util.List;
...@@ -10,8 +13,12 @@ import java.util.Set; ...@@ -10,8 +13,12 @@ import java.util.Set;
* @date: 2019/1/31 10:53 * @date: 2019/1/31 10:53
* @description: * @description:
*/ */
@Component
public class RedisClient { public class RedisClient {
@Value("${redis.server}")
private String redisServer;
public final static String VIRTUAL_COURSE_PREX = "_lc_vc_"; public final static String VIRTUAL_COURSE_PREX = "_lc_vc_";
private JedisCluster jedisCluster; private JedisCluster jedisCluster;
...@@ -520,7 +527,7 @@ public class RedisClient { ...@@ -520,7 +527,7 @@ public class RedisClient {
private <R> R jedisCall(JedisFunction<R> function) { private <R> R jedisCall(JedisFunction<R> function) {
if (jedisCluster == null) { if (jedisCluster == null) {
String hosts = PropertyUtil.getProperty("redis.cluster"); String hosts = redisServer;
jedisCluster = RedisUtils.getClusterClient(hosts); jedisCluster = RedisUtils.getClusterClient(hosts);
} }
try { try {
......
package com.hikcreate.utils.redis; package com.hikcreate.utils.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/** /**
* @auther: lifuyi * @auther: lifuyi
* @date: 2019/1/31 10:53 * @date: 2019/1/31 10:53
* @description: * @description:
*/ */
@Component
public class RedisClientUtil { public class RedisClientUtil {
@Value("${redis.server}")
private static String redisServer;
@Autowired
private static RedisClient redisClient; private static RedisClient redisClient;
public static RedisClient getRedisClient() { public static RedisClient getRedisClient() {
if (redisClient == null) { if (redisClient == null) {
synchronized (RedisClientUtil.class) { synchronized (RedisClientUtil.class) {
redisClient = RedisBuilder.getRedisClient(PropertyUtil.getProperty("redis.server")); redisClient = RedisBuilder.getRedisClient(redisServer);
} }
} }
return redisClient; return redisClient;
......
...@@ -4,17 +4,32 @@ password=zckj2018 ...@@ -4,17 +4,32 @@ password=zckj2018
roundDay=-5 roundDay=-5
redis.cluster=172.16.25.23:7000,172.16.25.23:7001,172.16.25.23:7002,172.16.25.24:7003,172.16.25.24:7004,172.16.25.24:7005 redis.cluster=172.16.25.23:7000,172.16.25.23:7001,172.16.25.23:7002,172.16.25.24:7003,172.16.25.24:7004,172.16.25.24:7005
redis.server=172.16.25.23:7000,172.16.25.23:7001,172.16.25.23:7002,172.16.25.24:7003,172.16.25.24:7004,172.16.25.24:7005 redis.server=172.16.25.23:7000,172.16.25.23:7001,172.16.25.23:7002,172.16.25.24:7003,172.16.25.24:7004,172.16.25.24:7005
ftpUrl=http://193.5.103.5:80/ftp/testFtpUtil
#hive
hive.url=jdbc:hive2://172.16.25.25:10000/kakou
hive.user=hdfs
hive.password=hdfs
#fdfs #fdfs
fdfs.so-timeout = 1500 fdfs.so-timeout = 1500
fdfs.connect-timeout = 600 fdfs.connect-timeout = 600
fdfs.thumb-image.width = 150 fdfs.thumb-image.width = 150
fdfs.thumb-image.height = 150 fdfs.thumb-image.height = 150
fdfs.tracker-list[0] = 10.197.236.172:22122 fdfs.tracker-list[0] = 172.16.25.23:22122
fdfs.tracker-list[1] = 10.197.236.188:22122 fdfs.tracker-list[1] = 172.16.25.26:22122
fdfs.pool.max-total = 153 fdfs.pool.max-total = 153
fdfs.pool.max-wait-millis = 102 fdfs.pool.max-wait-millis = 102
#˿
server.port=8084
hbase.zookeeper.property.clientPort=2181
hbase.zookeeper.quorum=172.16.25.25,172.16.25.28,172.16.25.24,172.16.25.26,172.16.25.27
hbase.master=172.16.25.25:60000
hbase.drv_photo.table=drv:drv_photo
hbase.driverlicense.table=drv:drivinglicense
hbase.vio.table=vio:vio_violation
#hive----sql
increment.vio.pic.sql=SELECT a.ccarnumber hphm,a.clicensetype hpzl,b.wfsj wfsj,a.cpic1path url1,a.cpic2path url2,a.cpic3path url3 from (SELECT * from kakou.vio_violation_pic_his_ods WHERE substr(export_time,0,10)<=? and substr(export_time,0,10)>?) a INNER JOIN (SELECT * from default.vio_surveil_all WHERE clsj='null' or clsj='' or clsj is null) b WHERE a.ccarnumber=b.hphm and a.clicensetype=b.hpzl and substr(a.dillegaldate,0,16)=substr(b.wfsj,0,16)
pastDay=10
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameservice1</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.nameservice1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.nameservice1</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>172.16.25.24:2181,172.16.25.25:2181,172.16.25.26:2181,172.16.25.27:2181,172.16.25.28:2181</value>
</property>
<property>
<name>dfs.ha.namenodes.nameservice1</name>
<value>namenode237,namenode293</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode237</name>
<value>172.16.25.25:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode237</name>
<value>172.16.25.25:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode237</name>
<value>172.16.25.25:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode237</name>
<value>172.16.25.25:50470</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode293</name>
<value>172.16.25.28:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode293</name>
<value>172.16.25.28:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode293</name>
<value>172.16.25.28:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode293</name>
<value>172.16.25.28:50470</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.client.use.legacy.blockreader</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hdfs-sockets/dn</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.client.domain.socket.data.traffic</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment