Commit d6c4b79b by 李辅翼

v14

parent 2d761682
......@@ -43,6 +43,8 @@ public class DrvPhotoImpl implements DrvPhoto {
private int roundDay;
@Autowired
private FileService fileService;
@Autowired
private SqlHelp sqlHelp;
private static String startTime;
private static String endTime;
......@@ -85,7 +87,7 @@ public class DrvPhotoImpl implements DrvPhoto {
try {
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
endTime = time;
startTime = SqlHelp.getStartTime("select min(GXSJ) mtime from GYJG.DRV_PHOTO", url, username, password);
startTime = sqlHelp.getStartTime("select min(GXSJ) mtime from GYJG.DRV_PHOTO", url, username, password);
//循环获取图片
String sql;
String lastTime;
......@@ -205,19 +207,19 @@ public class DrvPhotoImpl implements DrvPhoto {
}
}
private synchronized static void saveToHbase(ResultSet resultSet, FileService fileService) throws Exception {
private synchronized void saveToHbase(ResultSet resultSet, FileService fileService) throws Exception {
paramResultSet(resultSet);
//判断图片是否已经存入fastdfs
colValue = new String[2];
String rowkey = sfzmhm;
PicResult picResult = SqlHelp.getFlag(rowkey, driverPhototable, colValue, gxsj);
PicResult picResult = sqlHelp.getFlag(rowkey, driverPhototable, colValue, gxsj);
if (picResult.getI() != 0) {
if (picResult.getI() == 2) {
fileService.deleteFile(picResult.getUrl());
}
Put put = new Put(rowkey.getBytes());
put.addColumn("info".getBytes(), "sfzmhm".getBytes(), sfzmhm.getBytes());
byte[] bytes1 = SqlHelp.blobToByteZp(zp);
byte[] bytes1 = sqlHelp.blobToByteZp(zp);
String picUrl = fileService.uploadFile(bytes1, sfzmhm + ".jpeg");
put.addColumn("info".getBytes(), "sfzmhm".getBytes(), (sfzmhm == null ? "null" : sfzmhm).getBytes());
put.addColumn("info".getBytes(), "picUrl".getBytes(), picUrl.getBytes());
......
......@@ -10,6 +10,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.BufferedInputStream;
......@@ -17,11 +18,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
@Service
public class SqlHelp {
private static Logger logger = LoggerFactory.getLogger(SqlHelp.class);
public static String getStartTime(String sql, String url, String username, String password) {
public String getStartTime(String sql, String url, String username, String password) {
String startTime = null;
try {
Class.forName("oracle.jdbc.OracleDriver");
......@@ -42,7 +44,7 @@ public class SqlHelp {
}
public static PicResult getFlag(String rowkey, Table table, String[] colValue, String gxsj) {
public PicResult getFlag(String rowkey, Table table, String[] colValue, String gxsj) {
//1:写入,2:删除原有并写入,0:不写入
int flag1 = 1;
String url = "";
......@@ -87,7 +89,7 @@ public class SqlHelp {
}
}
public synchronized static byte[] blobToByteZp(BLOB zp) {
public byte[] blobToByteZp(BLOB zp) {
try {
long length = zp.length();
byte[] bytes = new byte[(int) length];
......@@ -101,7 +103,7 @@ public class SqlHelp {
}
}
public static byte[] blobToBytes(Blob blob) {
public byte[] blobToBytes(Blob blob) {
BufferedInputStream is = null;
try {
is = new BufferedInputStream(blob.getBinaryStream());
......
package com.hikcreate.drv_photo_pic.impl;
import com.github.tobato.fastdfs.domain.fdfs.StorePath;
import com.github.tobato.fastdfs.service.FastFileStorageClient;
import com.hikcreate.drv_photo_pic.Vehicle;
import com.hikcreate.entity.PicResult;
import com.hikcreate.service.fdfs.service.FileService;
......@@ -16,7 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.*;
import java.sql.*;
import java.sql.Connection;
import java.util.Iterator;
......@@ -28,6 +30,10 @@ public class VehicleImpl implements Vehicle {
private static Logger logger = LoggerFactory.getLogger(VehicleImpl.class);
@Autowired
private FileService fileService;
@Autowired
private SqlHelp sqlHelp;
@Autowired
private FastFileStorageClient storageClient;
@Value("${hbase.veh_pic.table}")
private String vehPicTableStr;
......@@ -57,16 +63,13 @@ public class VehicleImpl implements Vehicle {
@Override
public void getHisVehPic(String time) {
org.apache.hadoop.hbase.client.Connection hbaseConn = null;
Table vehPicTable = null;
try {
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseAddress);
hbaseConf.set("hbase.zookeeper.quorum", hbaseZkQuorum);
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
TableName vehPicTableName = TableName.valueOf(vehPicTableStr);
vehPicTable = hbaseConn.getTable(vehPicTableName);
endTime = time;
startTime = SqlHelp.getStartTime("select min(GXSJ) mtime from GYJG.VEH_PICTURE", url, username, password);
startTime = sqlHelp.getStartTime("select min(GXSJ) mtime from GYJG.VEH_PICTURE", url, username, password);
String lastTime;
String sql;
while (endTime.compareTo(startTime) > 0 || endTime.compareTo(startTime) == 0) {
......@@ -79,7 +82,7 @@ public class VehicleImpl implements Vehicle {
pstm.setString(2, endTime);
ResultSet resultSet = pstm.executeQuery();
while (resultSet.next()) {
saveToHbase(resultSet, vehPicTable);
saveToHbase(resultSet, vehPicTableStr, hbaseConn);
}
endTime = lastTime;
resultSet.close();
......@@ -95,13 +98,6 @@ public class VehicleImpl implements Vehicle {
e.printStackTrace();
}
}
if (vehPicTable != null) {
try {
vehPicTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
......@@ -182,16 +178,15 @@ public class VehicleImpl implements Vehicle {
}
} catch (Exception e) {
e.printStackTrace();
}
finally {
if(connection!=null){
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(table!=null){
if (table != null) {
try {
table.close();
} catch (IOException e) {
......@@ -204,20 +199,17 @@ public class VehicleImpl implements Vehicle {
private void getPic(String sql) {
org.apache.hadoop.hbase.client.Connection hbaseConn = null;
Table vehPicTable = null;
try {
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseAddress);
hbaseConf.set("hbase.zookeeper.quorum", hbaseZkQuorum);
hbaseConn = ConnectionFactory.createConnection(hbaseConf);
TableName vehPicTableName = TableName.valueOf(vehPicTableStr);
vehPicTable = hbaseConn.getTable(vehPicTableName);
Class.forName("oracle.jdbc.OracleDriver");
Connection connection = DriverManager.getConnection(url, username, password);
PreparedStatement pstm = connection.prepareStatement(sql);
ResultSet resultSet = pstm.executeQuery();
while (resultSet.next()) {
saveToHbase(resultSet, vehPicTable);
saveToHbase(resultSet, vehPicTableStr,hbaseConn);
}
resultSet.close();
pstm.close();
......@@ -232,13 +224,6 @@ public class VehicleImpl implements Vehicle {
e.printStackTrace();
}
}
if (vehPicTable != null) {
try {
vehPicTable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
......@@ -247,10 +232,11 @@ public class VehicleImpl implements Vehicle {
* 存储图片
*
* @param resultSet
* @param vehPicTable
* @param hbaseConn
*/
private synchronized void saveToHbase(ResultSet resultSet, Table vehPicTable) {
private synchronized void saveToHbase(ResultSet resultSet, String vehPicTableName, org.apache.hadoop.hbase.client.Connection hbaseConn) {
try {
Table vehPicTable = hbaseConn.getTable(TableName.valueOf(vehPicTableName));
String xh = resultSet.getString("XH");
String hphm = resultSet.getString("HPHM");
String hpzl = resultSet.getString("HPZL");
......@@ -258,7 +244,7 @@ public class VehicleImpl implements Vehicle {
String gxsj = resultSet.getString("GXSJ");
String rowkey = xh;
colValue = new String[2];
PicResult picResult = SqlHelp.getFlag(rowkey, vehPicTable, colValue, gxsj);
PicResult picResult = sqlHelp.getFlag(rowkey, vehPicTable, colValue, gxsj);
if (picResult.getI() != 0) {
if (picResult.getI() == 2) {
try {
......@@ -276,39 +262,56 @@ public class VehicleImpl implements Vehicle {
put.addColumn("info".getBytes(), "hphm".getBytes(), (hphm == null ? "null" : hphm).getBytes());
put.addColumn("info".getBytes(), "hpzl".getBytes(), (hpzl == null ? "null" : hpzl).getBytes());
put.addColumn("info".getBytes(), "gxsj".getBytes(), (gxsj == null ? "null" : gxsj).getBytes());
byte[] bytes = SqlHelp.blobToByteZp(zp);
String picUrl = uploadFile(bytes, xh + ".jpeg", xh);
put.addColumn("info".getBytes(), "picUrl".getBytes(), picUrl.getBytes());
byte[] bytes = sqlHelp.blobToBytes(zp);
logger.info(xh+"---------------------"+bytes.length);
put.addColumn("info".getBytes(),"picLen".getBytes(),String.valueOf(bytes.length/1024).getBytes());
//写入fastdfs
int count = 0;
String picUrl=uploadFileStor(bytes,xh);
while (picUrl==null){
picUrl=uploadFileStor(bytes,xh);
count++;
if(count>10){
logger.info("10fail-----------------"+xh);
return;
}
}
put.addColumn("info".getBytes(), "picUrl".getBytes(), ("/"+picUrl).getBytes());
vehPicTable.put(put);
vehPicTable.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private String uploadFileStor(byte[] bytes, String xh) {
try {
StorePath jpeg = storageClient.uploadFile(new ByteArrayInputStream(bytes), bytes.length, "jpeg", null);
return jpeg.getFullPath();
}catch (Exception e) {
logger.info(xh+"***************************");
return null;
}
private String uploadFile(byte[] file, String fileName, String xh) {
}
String url;
try {
url = fileService.uploadFile(file, fileName);
} catch (Exception e) {
logger.info("---------------" + xh);
e.printStackTrace();
url = uploadFile(file, fileName, xh);
}
return url;
// int count=0;
// while ("false".equals(url)){
// url=fileService.uploadFile(file, fileName);
// count++;
// if(count>10){
// logger.info("---------------" + xh);
// }
// }
private String uploadFile(byte[] file, String fileName, String xh) {
String picUrl = fileService.uploadFile(file, fileName);
int count = 0;
while ("false".equals(picUrl)) {
logger.info("false---------"+xh);
picUrl = fileService.uploadFile(file, fileName);
count++;
if (count > 10) {
logger.info("---------------" + xh);
break;
}
}
return picUrl;
}
......
......@@ -14,7 +14,7 @@ public interface FileService {
* @param fileName
* @return
*/
String uploadFile(byte[] file, String fileName) throws IOException, InterruptedException;
String uploadFile(byte[] file, String fileName) ;
/**
* 下载文件
......
......@@ -9,7 +9,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @author 赵东
......@@ -23,15 +22,7 @@ public class FileServiceImpl implements FileService {
@Override
public synchronized String uploadFile(byte[] file, String fileName) throws IOException, InterruptedException {
Thread.sleep(10);
//将文件上装到fast文件服务器上,成功则返回文件保存的路径
String path = fastDFSClient.uploadFile(file, fileName);
if (!StringUtils.hasText(path)) {
logger.error("Upload Img Error");
}
return "/group" + path.split("group")[1];
// try {
public synchronized String uploadFile(byte[] file, String fileName) {
// Thread.sleep(10);
// //将文件上装到fast文件服务器上,成功则返回文件保存的路径
// String path = fastDFSClient.uploadFile(file, fileName);
......@@ -39,10 +30,18 @@ public class FileServiceImpl implements FileService {
// logger.error("Upload Img Error");
// }
// return "/group" + path.split("group")[1];
// } catch (Exception e) {
try {
Thread.sleep(10);
//将文件上装到fast文件服务器上,成功则返回文件保存的路径
String path = fastDFSClient.uploadFile(file, fileName);
if (!StringUtils.hasText(path)) {
logger.error("Upload Img Error");
}
return "/group" + path.split("group")[1];
} catch (Exception e) {
// e.printStackTrace();
// return "false";
// }
return "false";
}
}
@Override
......
......@@ -20,14 +20,14 @@ fdfs.tracker-list[1] = 172.16.25.26:22122
fdfs.pool.max-total = 153
fdfs.pool.max-wait-millis = 102
#˿
server.port=8084
server.port=8099
hbase.zookeeper.property.clientPort=2181
hbase.zookeeper.quorum=172.16.25.25,172.16.25.28,172.16.25.24,172.16.25.26,172.16.25.27
hbase.master=172.16.25.25:60000
hbase.drv_photo.table=pic:drv_photo
hbase.vio.table=pic:vio_violation
hbase.veh_pic.table=pic:veh_pic
hbase.veh_pic.table=pic:veh_pic_new1
#hive----sql
increment.vio.pic.sql=SELECT a.ccarnumber hphm,a.clicensetype hpzl,b.wfsj wfsj,b.wfxw wfxw,a.cpic1path url1,a.cpic2path url2,a.cpic3path url3 from (SELECT * from kakou.vio_violation_pic_his_ods WHERE substr(export_time,0,10)<=? and substr(export_time,0,10)>?) a INNER JOIN (SELECT * from default.vio_surveil_all WHERE clbj='0') b WHERE a.ccarnumber=b.hphm and a.clicensetype=b.hpzl and substr(a.dillegaldate,0,16)=substr(b.wfsj,0,16) and a.coffense=b.wfxw
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment