Commit 88d5b83f by chenwenjun8

Initial commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hikcreate</groupId>
<artifactId>oracleexport</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.18</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.29</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5-20081211</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.quartz-scheduler</groupId>-->
<!-- <artifactId>quartz</artifactId>-->
<!-- <version>2.2.1</version>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<!--所有的编译都依照JDK1.8-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.hikcreate.bigdata.hdip.source;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* @author chencong13
* @Description 通过api的方式来控制服务
* @Time 2020/8/20 14:43
*/
public class ApiServer {
public static final Logger logger = LoggerFactory.getLogger(ApiServer.class);
private OracleSourceService service;
private volatile Server srv;
private volatile String host = "127.0.0.1";
/**
* 方法描述 用于测试
* 处理逻辑
*
* @param:
* @author chencong13
* @Time 2020/10/20 9:55
**/
public void testStart() {
service = new OracleSourceService();
service.start("C:\\Users\\chencong13\\Desktop\\test\\queryConfig3.json");
}
public void start(int port) {
logger.info("JettySource start...");
srv = new Server();
service = new OracleSourceService();
// Connector Array
Connector[] connectors = new Connector[1];
SelectChannelConnector connector = new SelectChannelConnector();
connector.setReuseAddress(true);
connectors[0] = connector;
connectors[0].setHost(host);
connectors[0].setPort(port);
srv.setConnectors(connectors);
try {
org.mortbay.jetty.servlet.Context root =
new org.mortbay.jetty.servlet.Context(srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
srv.start();
srv.join();
} catch (Exception ex) {
logger.error("Error while starting HTTPSource. Exception follows.", ex);
}
logger.info("JettySource start end...");
}
/**
* 方法描述 关闭jetty服务
* 处理逻辑
*
* @param
* @return
* @author chencong13
* @Time 2020/8/20 15:37
*/
public void shutdown() {
logger.info("JettySource shutdown...");
try {
srv.stop();
srv.join();
srv = null;
} catch (Exception ex) {
logger.error("Error while stopping HTTPSource. Exception follows.", ex);
}
}
private class FlumeHTTPServlet extends HttpServlet {
private static final long serialVersionUID = 4891924863218790344L;
@Override
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
String characterEncoding = request.getCharacterEncoding();
String urlPath = request.getServletPath();
String msg = "";
if (urlPath.contains("/querycontrol")) {
String command = request.getParameter("command");
System.out.println("command = " + command);
switch (command) {
case "start":
String configPath = request.getParameter("configPath");
logger.info("configPath is {}", configPath);
msg = service.start(configPath);
break;
case "shutdown":
msg = service.shuntDown();
break;
case "status":
msg = service.getStatus();
break;
case "saveoffset":
msg = service.saveOffset();
break;
case "checkoffset":
msg = service.getOffset();
break;
case "kill":
service.shuntDown();
shutdown();
break;
default:
break;
}
}
msg = "result:" + msg;
reponseIt(response, msg, characterEncoding, urlPath);
}
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException {
doPost(request, response);
}
private void reponseIt(HttpServletResponse response, String info, String encoding, String header) throws IOException {
response.setCharacterEncoding(encoding);
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().write(info);
response.setHeader("header", header);
response.flushBuffer();
}
}
/**
* 方法描述 启动类
* 处理逻辑 需要传服务端口
* @param: args
* @author chencong13
* @Time 2020/10/20 9:56
**/
public static void main(String[] args) {
if (args.length == 0) {
logger.error("please input your server port!");
return;
}
String port = args[0];
ApiServer apiServer = new ApiServer();
apiServer.start(Integer.parseInt(port));
//apiServer.testStart();
}
}
package com.hikcreate.bigdata.hdip.source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 常用的帮助类
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public class CommonUtils {
public static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
/**
* 默认 yyyy-MM-dd HH:mm:ss
*/
static public String df1 = "yyyy-MM-dd HH:mm:ss";
/**
* yyyy/MM/dd HH:mm:ss
*/
static public String df2 = "yyyy/MM/dd HH:mm:ss";
/**
* yyyy年MM月dd日 HH时mm分ss秒
*/
static public String df3 = "yyyy年MM月dd日 HH时mm分ss秒";
/**
* 默认 yyyy-MM-dd
*/
static public String df4 = "yyyy-MM-dd";
/**
* yyyy/MM/dd
*/
static public String df5 = "yyyy/MM/dd";
/**
* yyyy年MM月dd日
*/
static public String df6 = "yyyy年MM月dd日";
/**
* yyyy-MM-dd HH:ss:mm.SSS
*/
static public String df7 = "yyyy-MM-dd HH:ss:mm.SSS";
/**
* yyyy/MM/dd HH:ss:mm.SSS
*/
static public String df8 = "yyyy/MM/dd HH:ss:mm.SSS";
static public String df9 = "yyyy-MM-dd HH:ss:mm.SSSSSS";
/**
* 方法描述 判断是否是日期格式的字符串
* 处理逻辑 正则匹配
*
* @param strDate 日期字符串
* @return true:符合日期格式
* @author chencong13
* @Time 2020/8/20 9:59
*/
public static boolean isDate(String strDate) {
Pattern pattern = Pattern
.compile("^((\\d{2}(([02468][048])|([13579][26]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])))))|(\\d{2}(([02468][1235679])|([13579][01345789]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|(1[0-9])|(2[0-8]))))))(\\s(((0?[0-9])|([1-2][0-3]))\\:([0-5]?[0-9])((\\s)|(\\:([0-5]?[0-9])))))?$");
Matcher m = pattern.matcher(strDate);
if (m.matches()) {
return true;
} else {
return false;
}
}
/**
* 方法描述 判断是否是日期格式
* 处理逻辑 规则判断
*
* @param strDate:字符串日期
* @return true:符合日期格式
* @author chencong13
* @Time 2020/8/20 10:00
*/
public static boolean isDate2(String strDate) {
if (strDate == null || "".equals(strDate)) {
return false;
}
if (strDate.contains(":") && strDate.contains(" ") && strDate.length() >= 10) {
return true;
} else {
return false;
}
}
/**
* 将字符串转化为日期
*
* @param date 日期
* @param df 字符串格式,默认 Sat Mar 23 00:00:00 GMT+08:00
* @return Date 日期,格式 Sat Mar 23 00:00:00 GMT+08:00
* @throws ParseException
*/
/**
* 方法描述 将字符串转化为日期
* 处理逻辑
*
* @return Date 日期
* @throws ParseException 解析异常
* @author chencong13
* @Time 2020/8/20 10:01
*/
public static Date stringParseDate(String date, String df) throws ParseException {
SimpleDateFormat formatter = null;
if (isBlank(df)) {
formatter = new SimpleDateFormat(df1);
} else {
formatter = new SimpleDateFormat(df);
}
Date parseDate = formatter.parse(date);
return parseDate;
}
/**
* 方法描述 判断字符串是否为空
* 处理逻辑
*
* @param source
* @return true:为空
* @author chencong13
* @Time 2020/8/20 10:03
*/
public static final boolean isBlank(String source) {
return (source == null || "".equals(source.trim()));
}
/**
* 方法描述 判断时间字符串的日期格式
* 处理逻辑 正则匹配
*
* @param timeStr:时间字符串
* @return 日期格式
* @author chencong13
* @Time 2020/8/20 10:04
*/
public static String getDateFormat(String timeStr) {
if (timeStr.contains(".")) {
timeStr = timeStr.split("\\.")[0];
}
String regex1 = "\\d{4}-\\d{2}-\\d{2}\\s{1}\\d{2}:\\d{2}:\\d{2}";
String regex2 = "\\d{4}/\\d{2}/\\d{2}\\s{1}\\d{2}:\\d{2}:\\d{2}";
String regex3 = "\\d{4}年\\d{2}月\\d{2}日\\s{1}\\d{2}时\\d{2}分\\d{2}秒";
String regex4 = "\\d{4}-\\d{2}-\\d{2}";
String regex5 = "\\d{4}/\\d{2}/\\d{2}";
String regex6 = "\\d{4}年\\d{2}月\\d{2}日";
String regex7 = "\\d{4}-\\d{2}-\\d{2}\\s{1}\\d{2}:\\d{2}:\\d{2}.\\d{3}";
String regex8 = "\\d{4}/\\d{2}/\\d{2}\\s{1}\\d{2}:\\d{2}:\\d{2}.\\d{3}";
String regex9 = "\\d{4}-\\d{2}-\\d{2}\\s{1}\\d{2}:\\d{2}:\\d{2}.\\d{6}";
// 编译正则表达式
Pattern pattern1 = Pattern.compile(regex1);
Pattern pattern2 = Pattern.compile(regex2);
Pattern pattern3 = Pattern.compile(regex3);
Pattern pattern4 = Pattern.compile(regex4);
Pattern pattern5 = Pattern.compile(regex5);
Pattern pattern6 = Pattern.compile(regex6);
Pattern pattern7 = Pattern.compile(regex7);
Pattern pattern8 = Pattern.compile(regex8);
Pattern pattern9 = Pattern.compile(regex9);
// 忽略大小写的写法
// Pattern pat = Pattern.compile(regEx, Pattern.CASE_INSENSITIVE);
Matcher matcher1 = pattern1.matcher(timeStr);
Matcher matcher2 = pattern2.matcher(timeStr);
Matcher matcher3 = pattern3.matcher(timeStr);
Matcher matcher4 = pattern4.matcher(timeStr);
Matcher matcher5 = pattern5.matcher(timeStr);
Matcher matcher6 = pattern6.matcher(timeStr);
Matcher matcher7 = pattern7.matcher(timeStr);
Matcher matcher8 = pattern8.matcher(timeStr);
Matcher matcher9 = pattern9.matcher(timeStr);
if (matcher1.matches()) {
return df1;
}
if (matcher2.matches()) {
return df2;
}
if (matcher3.matches()) {
return df3;
}
if (matcher4.matches()) {
return df4;
}
if (matcher5.matches()) {
return df5;
}
if (matcher6.matches()) {
return df6;
}
if (matcher7.matches()) {
return df7;
}
if (matcher8.matches()) {
return df8;
}
if (matcher9.matches()) {
return df9;
}
return null;
}
/**
* 方法描述 创建文件目录
* 处理逻辑
*
* @param path:目录的路径
* @return 无
* @author chencong13
* @Time 2020/8/20 10:05
*/
public static void createDir(String path) {
File file = new File(path);
if (!file.exists()) {
file.mkdirs();
}
}
/**
* 方法描述 得到当前日期
* 处理逻辑 格式 yyyy年MM月dd日 HH时mm分ss秒
*
* @param
* @return 无
* @author chencong13
* @Time 2020/8/20 10:06
*/
public static String getCurrentDate() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(df1);
return simpleDateFormat.format(new Date());
}
/**
* 方法描述 文件重命名
* 处理逻辑 将文件one重命名成two
*
* @return
* @author chencong13
* @Time 2020/8/20 10:07
*/
public static void renameFile(String one, String two) {
File s = new File(one);
File d = new File(two);
boolean b = s.renameTo(d);
if (b) {
s.delete();
}
}
/**
* 方法描述 清空文件内容
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 11:39
*/
public static void clearFileContent(String path) {
try {
FileWriter fw = new FileWriter(path);
fw.write("");
fw.flush();
fw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 方法描述 根据时间进行解析
* 处理逻辑
* @param: date
* @author chencong13
* @Time 2020/10/20 9:58
**/
public static long parseDate(String date) {
String dateFormat = getDateFormat(date);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat);
try {
Date parse = simpleDateFormat.parse(date);
return parse.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return -1;
}
/**
* 方法描述 根据给定的毫秒数,获得当前时间
* 处理逻辑
* @param: time
* @author chencong13
* @Time 2020/10/20 9:57
**/
public static String getTargetDate(long time) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(df1);
return simpleDateFormat.format(new Date(time));
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
logger.info("..a...fsfjlsjflsifhisfjlsfjslfjslfhslfhsnkfnhskfhslfjslf" + i);
}
}
/*
public static void testIn() {
while (true) {
testInsert("A");
testInsert("B");
testInsert("C");
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//System.out.println(parseDate("2020-09-23 10:09:01")+" "+System.currentTimeMillis());
//testIn();
//testInsert("A");
//checkIt("C:\\\\Users\\\\chencong13\\\\Desktop\\\\test\\\\ftp");
}
public static void checkIt(String path) {
File dir = new File(path);
File[] files = dir.listFiles();
Map<String, Integer> result = new HashMap<String, Integer>();
for (File f : files) {
String[] split = f.getName().split("#");
Integer integer = result.get(split[2]);
if (integer == null) {
result.put(split[2], checkIt2(f));
} else {
result.put(split[2], integer + checkIt2(f));
}
}
for (String k : result.keySet()) {
System.out.println(k + ":" + result.get(k));
}
}
public static int checkIt2(File f) {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line = "";
int count = 0;
while ((line = br.readLine()) != null) {
if (line.length() > 38) {
count++;
}
}
br.close();
return count;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
public static void testInsert(String tableName) {
Connection connection = DruidUtils.getConnection();
String sql = "insert into test." + tableName + " (A,B,C,D) values (?,?,?,?)";
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
for (int i = 0; i < 2; i++) {
Thread.sleep(1000);
preparedStatement.setString(1, UUID.randomUUID().toString());
preparedStatement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
preparedStatement.setString(3, 101 + "");
preparedStatement.setString(4, tableName + i);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
preparedStatement.close();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}*/
}
package com.hikcreate.bigdata.hdip.source;
import java.util.List;
/**
* 配置解析实体类
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public class ConfigBean {
private String tempPath; //临时目录
private String ftpPath; //ftp的目录及
private String offsetPath; //保存offset的目录
private List<TableBean> queryTables; //增量查询的信息
private String jdbcType;//数据库类型
private String jdbcIp;//数据库的ip
private String jdbcPort;//数据库对应的端口
private String sid;//sid(对于oracle和postgresql)
private String jdbcUser;
private String jdbcPwd;
public String getOffsetPath() {
return offsetPath;
}
public void setOffsetPath(String offsetPath) {
this.offsetPath = offsetPath;
}
public String getTempPath() {
return tempPath;
}
public void setTempPath(String tempPath) {
this.tempPath = tempPath;
}
public String getFtpPath() {
return ftpPath;
}
public void setFtpPath(String ftpPath) {
this.ftpPath = ftpPath;
}
public List<TableBean> getQueryTables() {
return queryTables;
}
public void setQueryTables(List<TableBean> queryTables) {
this.queryTables = queryTables;
}
public String getJdbcType() {
return jdbcType;
}
public void setJdbcType(String jdbcType) {
this.jdbcType = jdbcType;
}
public String getJdbcIp() {
return jdbcIp;
}
public void setJdbcIp(String jdbcIp) {
this.jdbcIp = jdbcIp;
}
public String getJdbcPort() {
return jdbcPort;
}
public void setJdbcPort(String jdbcPort) {
this.jdbcPort = jdbcPort;
}
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public String getJdbcPwd() {
return jdbcPwd;
}
public void setJdbcPwd(String jdbcPwd) {
this.jdbcPwd = jdbcPwd;
}
public String getJdbcUser() {
return jdbcUser;
}
public void setJdbcUser(String jdbcUser) {
this.jdbcUser = jdbcUser;
}
}
package com.hikcreate.bigdata.hdip.source;
/**
* 数据库的枚举类
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public enum DBDriver {
Mysql_5("com.mysql.jdbc.Driver","jdbc:mysql://${ip}:${port}/${dataBase}?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8"),
Mysql_6("com.mysql.cj.jdbc.Driver","jdbc:mysql://${ip}:${port}/${dataBase}?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8"),
Oracle("oracle.jdbc.driver.OracleDriver","jdbc:oracle:thin:@${ip}:${port}/${dataBase}"),
PostgreSql("org.postgresql.Driver","jdbc:postgresql://${ip}:${port}/${dataBase}"),
SQLserver("com.microsoft.sqlserver.jdbc.SQLServerDriver","jdbc:sqlserver://${ip}:${port};DatabaseName=${dataBase}"),
Hive("org.apache.hive.jdbc.HiveDriver","jdbc:hive2://${ip}:${port}/${dataBase}"),
Phoenix("org.apache.phoenix.jdbc.PhoenixDriver","jdbc:phoenix:${ip}:${port}/${dataBase}"),
Impala("com.cloudera.impala.jdbc41.Driver","jdbc:impala://${ip}:${port}/${dataBase}");
private String driver;
private String url;
DBDriver(String driver, String url) {
this.driver=driver;
this.url=url;
}
public String getUrl(String ip,String port,String dataBase) {
return url.replace("${ip}",ip).replace("${port}",port).replace("${dataBase}",dataBase);
}
public void setUrl(String url) {
this.url = url;
}
public String getDriver() {
return driver;
}
public void setDriver(String driver) {
this.driver = driver;
}
@Override
public String toString() {
return "DBDriver{" +
"driver='" + driver + '\'' +
", url='" + url + '\'' +
'}';
}
}
package com.hikcreate.bigdata.hdip.source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
/**
* @author chencong13
* @Description 数据库协助类
* @Time 2020/8/20 17:35
*/
public class DBUtils {
private static final Logger log = LoggerFactory.getLogger(DBUtils.class);
private static String url; //url
private static String driver; //驱动类
private static String userName;
private static String password;
private DBUtils() {
}
public static void initDataSource(String hurl, String hdriver, String huserName, String hpassword) {
url = hurl;
driver = hdriver;
userName = huserName;
password = hpassword;
}
public static Connection getConnection() {
Connection connection = null;
try {
Class.forName(driver);
//获取连接
connection = DriverManager.getConnection(url, userName, password);
} catch (Exception e) {
log.error("error", e);
}
return connection;
}
}
package com.hikcreate.bigdata.hdip.source;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
/**
* @author chencong13
* @Description 不在使用
* @Time 2020/8/20 17:35
*/
public class DruidUtils {
private static final Logger log = LoggerFactory.getLogger(DruidUtils.class);
private static DruidDataSource dataSource;
private DruidUtils() {
}
public static void initDataSource(String url, String driver, String userName, String password) {
if (dataSource == null) {
dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setDriverClassName(driver);
dataSource.setUsername(userName);
dataSource.setPassword(password);
dataSource.setInitialSize(10); //初始连接数,默认0
dataSource.setMaxActive(10); //最大连接数,默认8
dataSource.setMinIdle(10); //最小闲置数
dataSource.setMaxWait(3000); //获取连接的最大等待时间,单位毫秒
dataSource.setPoolPreparedStatements(true); //缓存PreparedStatement,默认false
dataSource.setMaxOpenPreparedStatements(20); //缓存PreparedStatement的最大数量,默认-1(不缓存)。大于0时会自动开启缓存PreparedStatement,所以可以省略上一句代码
}
}
public static Connection getConnection() {
try {
return dataSource.getConnection();
} catch (Exception e) {
log.error("getConnection error ", e);
}
return null;
}
public static void closeDruid() {
if (dataSource != null) {
dataSource.close();
}
}
}
package com.hikcreate.bigdata.hdip.source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author chencong13
* @Description 执行器
* @Time 2020/9/18 10:58
*/
public class Executor implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Executor.class.getName());
private ConcurrentLinkedQueue<QueryTask> tasks; //消息队列
private boolean flag = true;
private ExecutorService executors; //线程池
public Executor(ConcurrentLinkedQueue<QueryTask> tasks) {
this.tasks = tasks;
flag = true;
executors = Executors.newCachedThreadPool();
}
/**
* 方法描述 从消息队列中取出task,然后执行
* 处理逻辑
* @param:
* @author chencong13
* @Time 2020/10/20 10:03
**/
@Override
public void run() {
logger.info("start execute");
while (flag) {
QueryTask poll = tasks.poll();
if (poll != null && !poll.isRunning()) {
logger.info("execute - " + poll.toString() + " task size = " + tasks.size());
executors.execute(poll);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 方法描述 停止类
* 处理逻辑
* @param:
* @author chencong13
* @Time 2020/10/20 10:04
**/
public void stop() {
flag = false;
executors.shutdownNow();
}
}
package com.hikcreate.bigdata.hdip.source;
import com.google.gson.Gson;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 采集Oracle表的管理类
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public class OracleSourceService implements QueryCallback {
/**
* 用于日志打印
*/
private static final Logger log = LoggerFactory.getLogger(OracleSourceService.class);
Map<String, QueryTask> queryTaskMap; //任务集合(线程类)
private String offsetPath;//offset保存到文件的路径
private String status = "shutdown";
private ConcurrentLinkedQueue<QueryTask> queryTasks;
private Scheduler scheduler;
private Executor executor;
/**
* 方法描述 初始话采集表的信息
* 处理逻辑 初始化
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 10:30
*/
private void init(String configPath) throws IOException, SQLException {
log.info("init ...");
queryTaskMap = new HashMap<>();
queryTasks = new ConcurrentLinkedQueue<>();
String configJson = IOUtils.toString(new FileInputStream(configPath), "utf-8");
Gson gson = new Gson();
ConfigBean queryConfig = gson.fromJson(configJson, ConfigBean.class);
String jdbcType = queryConfig.getJdbcType();
DBDriver dbDriver = DBDriver.valueOf(jdbcType);
String driver = dbDriver.getDriver();
String url = dbDriver.getUrl(queryConfig.getJdbcIp(), queryConfig.getJdbcPort(), queryConfig.getSid());
DBUtils.initDataSource(url, driver, queryConfig.getJdbcUser(), queryConfig.getJdbcPwd());
List<TableBean> queryTables = queryConfig.getQueryTables();
offsetPath = queryConfig.getOffsetPath();
Map<String, String> offsetMap = new HashMap<>();
Map<String, Long> fileNoMap = new HashMap<>();
readOffset(offsetPath, offsetMap, fileNoMap);
for (TableBean tb : queryTables) {
String key = tb.getDatabase() + "." + tb.getTable();
String offset = offsetMap.get(key);
Long fileNo = fileNoMap.get(key);
if (offset == null && fileNo == null) {
offset = tb.getInitOffset();
fileNo = 0L;
}
QueryTask queryTask = new QueryTask(tb, queryConfig.getTempPath(), offset, fileNo, queryConfig.getFtpPath(), jdbcType);
queryTask.addLister(this);
queryTaskMap.put(key, queryTask);
}
log.info("query table size is {}", queryTaskMap.size());
}
/**
* 方法描述 开始采集
* 处理逻辑 启动线程开始任务
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 10:30
*/
public String start(String configPath) {
if ("started".equals(status)) {
return "already started";
}
try {
if (queryTaskMap == null) {
init(configPath);
}
} catch (IOException | SQLException e) {
log.error("oracleSourceService", e);
}
log.info("start query...");
status = "started";
scheduler = new Scheduler(queryTaskMap, queryTasks);
executor = new Executor(queryTasks);
new Thread(scheduler).start();
new Thread(executor).start();
return "success";
}
/**
* 方法描述 从文件读取offset
* 处理逻辑
*
* @param path offset的文件路径
* @return offset的信息
* @throws
* @author chencong13
* @Time 2020/8/20 10:31
*/
private void readOffset(String path, Map<String, String> offset, Map<String, Long> fileNo) {
try {
List<String> strings = IOUtils.readLines(new FileReader(path));
for (String s : strings) {
if (CommonUtils.isBlank(s)) {
continue;
}
String[] split = s.split("=");
String[] offsets = split[1].split("#");
offset.put(split[0], offsets[0]);
fileNo.put(split[0], Long.parseLong(offsets[1]));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 方法描述 保存表的offset
* 处理逻辑
*
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 14:34
*/
public String saveOffset() {
String msg = "success";
String path = offsetPath;
CommonUtils.clearFileContent(offsetPath);
List<String> temp = new ArrayList<>();
for (String k : queryTaskMap.keySet()) {
QueryTask queryTask = queryTaskMap.get(k);
String offset = queryTask.getOffset();
if ("-1".equals(offset)) {
continue;
}
long fileNo = queryTask.getFileNo();
temp.add(k + "=" + queryTask.getOffset() + "#" + fileNo + "\n");
}
if (temp.size() == 0) {
return msg;
}
FileWriter fileWriter = null;
try {
fileWriter = new FileWriter(path);
for (String line : temp) {
fileWriter.write(line);
}
fileWriter.flush();
} catch (IOException e) {
log.error("error", e);
throw new IllegalArgumentException("save offset failed");
} finally {
try {
fileWriter.close();
} catch (IOException e) {
log.error("error", e);
}
}
return msg;
}
/**
* 方法描述 获取task的当前的offset
* 处理逻辑
*
* @param
* @return
* @author chencong13
* @Time 2020/8/20 15:31
*/
public String getOffset() {
StringBuffer sb = new StringBuffer();
for (String k : queryTaskMap.keySet()) {
QueryTask queryTask = queryTaskMap.get(k);
sb.append(k + " = " + queryTask.toString() + " , ");
}
sb.append("count = " + queryTaskMap.size());
String temp = sb.toString();
sb.setLength(0);
return temp;
}
/**
* 方法描述 停止掉所有的任务
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 10:33
*/
public String shuntDown() {
log.info("shunt down all ... ");
if ("shutdown".equals(status)) {
return "already shutdown";
}
saveOffset();
scheduler.stop();
executor.stop();
status = "shutdown";
for (String key : queryTaskMap.keySet()) {
QueryTask queryTask = queryTaskMap.get(key);
queryTask.stop();
}
queryTaskMap.clear();
DruidUtils.closeDruid();
return "sucess";
}
/**
* 方法描述 回调方法
* 处理逻辑 保存offset,在任何一个任务出错,或者文件满写50次,都会触发offset的保存
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 11:49
*/
@Override
synchronized public void handlerMsg(int waht, String msg) {
saveOffset();
switch (waht) {
case QueryTask.OFFSET_50:
log.info("50 count trigger save offset :{}", msg);
break;
case QueryTask.OFFSET_ERR:
log.info("error trigger save offset :{},need check", msg);
queryTaskMap.remove(msg);
break;
}
}
/**
* 方法描述 获取服务的当前状态
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/21 15:42
*/
public String getStatus() {
return status;
}
}
package com.hikcreate.bigdata.hdip.source;
/**
* 回调类
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public interface QueryCallback {
/**
* 方法描述 处理消息
* 处理逻辑
* @param: waht
* @param: msg
* @author chencong13
* @Time 2020/10/20 10:05
**/
void handlerMsg(int waht, String msg);
}
package com.hikcreate.bigdata.hdip.source;
import com.google.gson.Gson;
import oracle.sql.BLOB;
import oracle.sql.CLOB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.sql.*;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 采集Oracle表task,任务
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public class QueryTask implements Runnable {
/**
* 日志打印
*/
private static final Logger log = LoggerFactory.getLogger(QueryTask.class);
/**
* 文件未写完
*/
public static final String NOT_COMPELTE = ".temp";
/**
* 文件完成后缀
*/
public static final String COMPELTE = ".hik";
public static final String HIKNULL = "fromjdbcsource-null";
/**
* 标记:任务出错保存offset
*/
public static final int OFFSET_ERR = 2;
/**
* 标记:任务正常保存offset
*/
public static final int OFFSET_50 = 1;
private TableBean tableBean; //采集表的详情
private String tempPath; //文件的临时保存路径
private String offset; //偏移量
private boolean stop; //是否退出的标记
private boolean isRunning;//是否正在运行的标记
private String sqlPre; //sql的前缀
private long no; //文件编号
private QueryCallback queryCallback; //回调
private int columnCount;//字段的个数
private String ftpPath;//ftp的目录
private String jdbcType;//数据库类型
/**
* 方法描述 构造函数
* 处理逻辑
*
* @return
* @author chencong13
* @Time 2020/8/20 15:39
*/
public QueryTask(TableBean tableBean, String tempPath, String offset, long no, String ftpPath, String jdbcType) {
this.tableBean = tableBean;
this.tempPath = tempPath;
stop = false;
this.offset = offset;
this.no = no;
this.columnCount = -1;
this.ftpPath = ftpPath;
this.jdbcType = jdbcType;
this.isRunning = false;
}
public TableBean getTableBean() {
return this.tableBean;
}
/**
* 方法描述 添加监听器对象
* 处理逻辑
*
* @param queryCallback:监听器对象
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:40
*/
public void addLister(QueryCallback queryCallback) {
this.queryCallback = queryCallback;
}
/**
* 方法描述 run方法
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:41
*/
@Override
public void run() {
log.info("start query {},offset is {}", tableBean.getDatabase() + "." + tableBean.getTable(), offset);
String executeSql = genSql();
String genSql = tableBean.getGenSql();
if (genSql != null) {
executeSql = genSql;
}
executeSql(executeSql);
}
/**
* 方法描述 生成查询sql
* 处理逻辑
*
* @param
* @return 生成的sql
* @throws
* @author chencong13
* @Time 2020/8/20 15:42
*/
private String genSql() {
StringBuilder sql = new StringBuilder();
String incrField = tableBean.getIncrField();
if (sqlPre == null) {
String database = tableBean.getDatabase();
String table = tableBean.getTable();
List<String> fileds = tableBean.getFields();
sql.append("select ");
for (String str : fileds) {
sql.append(str + ",");
}
sql.setLength(sql.length() - 1);
sql.append(" from ");
sql.append(database + "." + table + " where 1=1 ");
sqlPre = sql.toString();
sql.setLength(0);
}
sql.append(sqlPre);
switch (tableBean.getIncrType()) {
case TableBean.NUM_TYPE:
sql.append("and " + incrField + ">" + offset);
break;
case TableBean.TIME_TYPE:
if ("oracle".equalsIgnoreCase(jdbcType)) {
sql.append("and " + incrField + ">to_timestamp('" + offset + "','yyyy-mm-dd hh24:mi:ss.ff') and " + incrField + "<=sysdate-30/24/60/60");
} else if ("Mysql_5".equalsIgnoreCase(jdbcType)) {
sql.append("and " + incrField + ">'" + offset + "' and " + incrField + "<=DATE_SUB(SYSDATE(),INTERVAL 30 SECOND)");
} else if ("PostgreSql".equalsIgnoreCase(jdbcType)) {
sql.append("and " + incrField + ">'" + offset + "' and " + incrField + "<=(now()::TIMESTAMP+'-30sec')");
} else {
log.info("other sql type");
}
break;
case TableBean.STR_TYPE:
sql.append("and " + incrField + ">'" + offset + "'");
break;
case TableBean.WHOLE_TYPE:
break;
default:
break;
}
String executeSql = sql.toString();
return executeSql;
}
/**
* 方法描述 执行sql逻辑
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:43
*/
private void executeSql(String executeSql) {
List<String> tempData = new ArrayList<>();
ResultSet resultSet = null;
String tableName = tableBean.getDatabase() + "." + tableBean.getTable();
PreparedStatement preparedStatement = null;
Connection connection = DBUtils.getConnection();
if (connection == null) {
log.warn("DruidUtils.getConnection() is null");
return;
}
try {
isRunning = true;
preparedStatement = connection.prepareStatement(executeSql);
resultSet = preparedStatement.executeQuery();
resultSet.setFetchSize(10000);
String timeSys = String.valueOf(System.currentTimeMillis());
StringBuffer sb = new StringBuffer();
boolean goon = true;
String newField = "";
while (resultSet.next()) {
if (stop) {
break;
}
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
if (goon) {
String incrColum = "";
if (this.columnCount == -1) {
this.columnCount = columnCount;
} else {
if (this.columnCount != columnCount) {
incrColum = genIncrColum(metaData, columnCount - this.columnCount);
this.columnCount = columnCount;
}
}
newField = incrColum;
log.info("new field = " + newField);
goon = false;
}
//对于增量更新增量字段
if (!TableBean.WHOLE_TYPE.equals(tableBean.getIncrType())) {
int incrIndex = resultSet.findColumn(tableBean.getIncrField());
String incrStr = resultSet.getString(incrIndex);
updateOffset(incrStr);
}
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
int columnType = metaData.getColumnType(i);
String value = "";
if (Types.BLOB == columnType) {
BLOB imageCol = (BLOB) resultSet.getBlob(columnName);
value = handleBlob(imageCol);
} else if (Types.CLOB == columnType) {
CLOB contentCol = (CLOB) resultSet.getClob(columnName);
value = handleClob(contentCol);
} else {
Object object = resultSet.getObject(resultSet.findColumn(columnName));
if (object == null) {
value = HIKNULL;
} else {
value = String.valueOf(object);
value = value.replace("\n", ",");
}
}
sb.append(value + "\001");
}
sb.setLength(sb.length() - 1);
tempData.add(sb.toString());
sb.setLength(0);
String sinkSize = tableBean.getSinkSize();
int intSinkSize = 10000;
if (sinkSize != null) {
intSinkSize = Integer.parseInt(sinkSize);
}
if (tempData.size() >= intSinkSize) {
if (no % 20 == 0) {
queryCallback.handlerMsg(QueryTask.OFFSET_50, tableName);
}
if (!TableBean.WHOLE_TYPE.equals(tableBean.getIncrType())) {
log.info("update offset is {}", offset);
flushFile(getHeader(newField), tempData, null, null, null);
} else {
flushFile(getHeader(newField), tempData, "all", null, timeSys);
}
tempData.clear();
if (!CommonUtils.isBlank(newField)) {
newField = "";
}
}
}
if (tempData.size() != 0) {
if (!TableBean.WHOLE_TYPE.equals(tableBean.getIncrType())) {
log.info("update offset is {}", offset);
flushFile(getHeader(newField), tempData, null, null, null);
} else {
flushFile(getHeader(newField), tempData, "all", "end", timeSys);
}
log.info("table is {},update offset is {}", tableName, offset);
tempData.clear();
}
} catch (Exception e) {
log.error(tableName, e);
stop();
queryCallback.handlerMsg(2, tableName);
} finally {
try {
isRunning = false;
log.info("close connetction");
preparedStatement.close();
resultSet.close();
connection.close();
} catch (SQLException e) {
log.error(tableName, e);
}
}
}
/**
* 方法描述 处理图片字段
* 处理逻辑 将图片base64转码,然后返回
* @param: imageCol
* @author chencong13
* @Time 2020/10/20 10:06
**/
private String handleBlob(BLOB imageCol) {
InputStream input = null;
if (imageCol == null) {
return HIKNULL;
}
String result = "";
try {
input = imageCol.getBinaryStream();
int len = (int) imageCol.length();
if (len == 0) {
result = "blob-null";
} else {
byte[] buffer = new byte[len];
input.read(buffer);
result = Base64.getEncoder().encodeToString(buffer);
}
} catch (Exception e) {
log.error("handleBlob error", e);
} finally {
try {
input.close();
} catch (IOException e) {
log.error("handleBlob catch error", e);
}
}
return result;
}
/**
* 方法描述 获取运行标记
* 处理逻辑
* @param:
* @author chencong13
* @Time 2020/10/20 10:06
**/
public boolean isRunning() {
return isRunning;
}
/**
* 方法描述 处理大文本字段
* 处理逻辑
* @param: contentCol
* @author chencong13
* @Time 2020/10/20 10:07
**/
private String handleClob(CLOB contentCol) {
if (contentCol == null) {
return HIKNULL;
}
InputStream input = null;
String result = "";
try {
input = contentCol.getAsciiStream();
int len = (int) contentCol.length();
if (len == 0) {
result = "clob-null";
} else {
byte[] buffer = new byte[len];
input.read(buffer);
result = Base64.getEncoder().encodeToString(buffer);
}
} catch (Exception e) {
log.error("handleClob error", e);
} finally {
try {
input.close();
} catch (IOException e) {
log.error("handleClob catch error", e);
}
}
return result;
}
/**
* 方法描述 表结构变化,得到新增的字段
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/25 14:56
*/
private String genIncrColum(ResultSetMetaData rd, int tag) throws SQLException {
if (tag <= 0) {
return "";
}
int count = rd.getColumnCount();
List<Map<String, Object>> tempList = new ArrayList<>();
for (int i = tag - 1; i >= 0; i--) {
Map<String, Object> temp = new HashMap<>();
int index = count - i;
int columnType = rd.getColumnType(index);
int precision = rd.getPrecision(index);
int scale = rd.getScale(index);
String columnName = rd.getColumnName(index);
temp.put("columnType", columnType);
temp.put("precision", precision);
temp.put("scale", scale);
temp.put("columnName", columnName);
tempList.add(temp);
}
Gson gson = new Gson();
String s = gson.toJson(tempList);
return s;
}
/**
* 方法描述 停止该任务
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:43
*/
public void stop() {
log.info("querytask {} stopped", tableBean.getDatabase() + "." + tableBean.getTable());
stop = true;
isRunning = false;
}
/**
* 方法描述 保存到文件中
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:43
*/
private void flushFile(List<String> header, List<String> datas, String prefix, String suffix, String timesys) {
String fileName = getFileName(prefix, suffix, timesys);
CommonUtils.createDir(tempPath);
String filePath = tempPath + File.separator + fileName + NOT_COMPELTE;
BufferedWriter fw = null;
try {
fw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(filePath), "UTF-8"));
if (header != null) {
for (String k : header) {
fw.write(k + "\n");
}
}
if (datas != null) {
for (String data : datas) {
fw.write(data + "\n");
}
}
fw.flush();
no++;
} catch (Exception e) {
log.error("flushFile error", e);
} finally {
try {
fw.close();
changeFile(fileName);
} catch (IOException e) {
log.error("flushFile catch error", e);
}
}
}
/**
* 方法描述 关闭文件写对象
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:43
*/
private void changeFile(String fileName) {
String oldFileName = tempPath + File.separator + fileName + NOT_COMPELTE;
String targetFile = ftpPath + File.separator + fileName + COMPELTE;
CommonUtils.renameFile(oldFileName, targetFile);
}
/**
* 方法描述 获得文件名字
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:44
*/
private String getFileName(String prefix, String suffix, String timesys) {
if (timesys == null) {
timesys = String.valueOf(System.currentTimeMillis());
}
String result = no + "#" + tableBean.getDatabase() + "#" + tableBean.getTable() + "#" + timesys;
if (!CommonUtils.isBlank(prefix)) {
result = prefix + "#" + result;
}
if (!CommonUtils.isBlank(suffix)) {
result = result + "#" + suffix;
}
return result;
}
/**
* 方法描述 构建文件头信息
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:44
*/
private List<String> getHeader(String nf) {
List<String> header = new ArrayList<>();
header.add("database:" + tableBean.getDatabase());
header.add("table:" + tableBean.getTable());
header.add("offset:" + offset);
header.add("time:" + CommonUtils.getCurrentDate());
header.add("new_fields:" + nf);
header.add("datas:" + "");
return header;
}
/**
* 方法描述 刷新offset
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:44
*/
private void updateOffset(String value) throws ParseException {
if (CommonUtils.isBlank(value)) {
return;
}
switch (tableBean.getIncrType()) {
case TableBean.NUM_TYPE:
long newOffset = Long.parseLong(value);
long oldOffset = Long.parseLong(offset);
if (newOffset > oldOffset) {
offset = value;
}
break;
case TableBean.TIME_TYPE:
newOffset = CommonUtils.stringParseDate(value, CommonUtils.getDateFormat(value)).getTime();
if (!CommonUtils.isDate2(value)) {
log.info("!CommonUtils.isDate2(value)");
break;
}
oldOffset = CommonUtils.stringParseDate(offset, CommonUtils.getDateFormat(offset)).getTime();
if (newOffset > oldOffset) {
offset = value;
}
break;
case TableBean.STR_TYPE:
offset = value.compareTo(offset) > 0 ? value : offset;
}
}
public boolean getStopFlag() {
return stop;
}
/**
* 方法描述 获取task当前的offset
* 处理逻辑
*
* @param
* @return
* @throws
* @author chencong13
* @Time 2020/8/20 15:44
*/
public String getOffset() {
return offset;
}
/**
* 方法描述 获取文件编号
* 处理逻辑
*
* @param
* @return
* @author chencong13
* @Time 2020/8/21 16:09
*/
public long getFileNo() {
return no;
}
@Override
public String toString() {
return tableBean.getDatabase() + "." + tableBean.getTable() + " offset = " + offset + ",no = " + no + ", is stop " + stop;
}
}
package com.hikcreate.bigdata.hdip.source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author chencong13
* @Description 调度
* @Time 2020/9/17 18:28
*/
public class Scheduler implements Runnable {
Logger logger = LoggerFactory.getLogger(Scheduler.class.getName());
Map<String, QueryTask> datas; //任务集合
private boolean flag; //停止标记
private ConcurrentLinkedQueue<QueryTask> queryTasks; //执行对列
public Scheduler(Map<String, QueryTask> datas, ConcurrentLinkedQueue<QueryTask> queryTasks) {
this.datas = datas;
this.queryTasks = queryTasks;
flag = true;
}
/**
* 方法描述 通过停止一秒,叠加数量,取余来处理调度
* 处理逻辑
* @param:
* @author chencong13
* @Time 2020/10/20 10:08
**/
@Override
public void run() {
logger.info("start scheduler...");
long count = 0;
while (flag) {
count++;
long startTime = System.currentTimeMillis();
for (QueryTask qt : datas.values()) {
String queryMin = qt.getTableBean().getQueryMin();
boolean running = qt.isRunning();
if (running) {
continue;
}
if (queryMin.contains(":")) {
String tomorrow = CommonUtils.getTargetDate(System.currentTimeMillis() + 24 * 60 * 60 * 1000L);
String tday = tomorrow.split(" ")[0];
long targetDate = CommonUtils.parseDate(tday + " " + queryMin.split(" ")[1]);
if (targetDate == -1) {
logger.warn("parse error,CommonUtils.parseDate");
} else {
long lag = Math.abs(targetDate - startTime);
if (lag < 60 * 1000) {
qt.getTableBean().setQueryMin(CommonUtils.getTargetDate(targetDate + 24 * 60 * 60 * 1000L));
queryTasks.add(qt);
logger.info("scheduler = " + qt.toString() + " cur day " + CommonUtils.getCurrentDate());
}
}
} else {
int queryMinInt = Integer.parseInt(queryMin);
if (count % (queryMinInt * 60) == 0) {
logger.info("scheduler = " + qt.toString() + " count = " + count + " cur day " + CommonUtils.getCurrentDate());
queryTasks.add(qt);
}
}
}
long endTime = System.currentTimeMillis();
long sleepTime = 1000 - (endTime - startTime);
if (sleepTime < 0) {
logger.info("count-- = " + count + ",datas size = " + datas.size() + ",sleeptime = " + sleepTime);
sleepTime = 1000;
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
logger.error("InterruptedException e", e);
}
}
}
public void stop() {
flag = false;
}
}
package com.hikcreate.bigdata.hdip.source;
import java.util.List;
/**
* 增量表实体类
*
* @author Administrator
* @Time 2020-08-20 12:12:12
*/
public class TableBean {
/**
* 用于标志增量字段是时间类型
*/
public static final String TIME_TYPE = "1";
/**
* 用于标志增量字段是数字类型
*/
public static final String NUM_TYPE = "2";
/**
* 用于标志增量字段是字符串类型
*/
public static final String STR_TYPE = "3";
/**
* 表示全量
*/
public static final String WHOLE_TYPE = "4";
private String database; //数据库
private String table; //表
private String incrField; //增量字段
private String incrType; //增量字段类型
private String queryMin; //查询间隔
private String initOffset;
private List<String> fields; //查询字段
private String sinkSize;//一次写到文件的条数10000
private String genSql;
public String getQueryMin() {
return queryMin;
}
public void setQueryMin(String queryMin) {
this.queryMin = queryMin;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public String getIncrField() {
return incrField;
}
public void setIncrField(String incrField) {
this.incrField = incrField;
}
public String getIncrType() {
return incrType;
}
public void setIncrType(String incrType) {
this.incrType = incrType;
}
public List<String> getFields() {
return fields;
}
public void setFields(List<String> fields) {
this.fields = fields;
}
public String getInitOffset() {
return initOffset;
}
public void setInitOffset(String initOffset) {
this.initOffset = initOffset;
}
public String getSinkSize() {
return sinkSize;
}
public void setSinkSize(String sinkSize) {
this.sinkSize = sinkSize;
}
public String getGenSql() {
return genSql;
}
public void setGenSql(String genSql) {
this.genSql = genSql;
}
}
driverClassName=oracle.jdbc.driver.OracleDriver
url=jdbc:oracle:thin:@10.160.118.146:1521/orcl
username=5201
password=gzgyzd559910
# 初始化连接数
initialSize=10
# 最大连接数
maxActive=10
# 超时时间
maxWait=3000
#最小连接池数量
minIdle=10
#是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle
poolPreparedStatements=true
maxPoolPrepareStatementPerConnectionSize=20
testWhileIdle=true
driverClassName=oracle.jdbc.driver.OracleDriver
url=jdbc:oracle:thin:@193.5.111.23:1521/orcl
username=test
password=hiktest
# 初始化连接数
initialSize=10
# 最大连接数
maxActive=10
# 超时时间
maxWait=3000
#最小连接池数量
minIdle=10
#是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle
poolPreparedStatements=true
maxPoolPrepareStatementPerConnectionSize=20
testWhileIdle=true
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<property name="log.dir" value="./log" />
<timestamp key="ymd" datePattern="yyyy-MM-dd"/>
<property name="moduleName" value="jdbcsource"/>
<timestamp key="bySecond" datePattern="yyyyMMdd'-'HHmmss"/>
<property name="CONSOLE_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%highlight(%level)|${moduleName}|%yellow(%thread)|%blue(%logger{0}:%line)|%msg%n"/>
<property name="FILE_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%level|${moduleName}|%thread|%logger{0}:%line|%msg%n"/>
<!--限制的日志-->
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
<logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.avro.ipc.netty.NettyServer" level="WARN"/>
<logger name="com.client.DataServer" level="WARN"/>
<logger name="com.client.SessionManager" level="WARN"/>
<!-- 控制台输出 [%yellow(%thread)] -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_PATTERN}</pattern>
</encoder>
</appender>
<!--ROLLING_FILE [%thread]-->
<appender name="ROLLING_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!--日志名称,如果没有File 属性,那么只会使用FileNamePattern的文件路径规则-->
<file>${log.dir}/my.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${log.dir}/my.log.%d{yyyy-MM-dd}</FileNamePattern>
<MaxHistory>10</MaxHistory>
</rollingPolicy>
<encoder>
<charset>UTF-8</charset>
<pattern>${FILE_PATTERN}</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="ROLLING_FILE" />
</root>
</configuration>
\ No newline at end of file
{
"druidProperty": "E:\\hik\\o65\\druid.properties",
"tempPath": "E:\\hik\\temp",
"offsetPath": "E:\\hik\\offset.txt",
"ftpPath": "E:\\ftp",
"queryTables": [
{
"database": "trff_app",
"table": "vio_surveil",
"incrField": "gxsj",
"incrType": "1",
"queryMin": "5",
"initOffset": "2020-08-27 00:19:50",
"fields": [
"*"
]
}
]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment