Commit 60186cf5 by shuyulong

图片数据处理

parents
This diff is collapsed. Click to expand it.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hikcreate</groupId>
<artifactId>hptmsp-pic</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<!--<dependency>
<groupId>com.github.tobato</groupId>
<artifactId>fastdfs-client</artifactId>
<version>1.26.5</version>
</dependency>-->
<dependency>
<groupId>net.oschina.zcx7878</groupId>
<artifactId>fastdfs-client-java</artifactId>
<version>1.27.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<!-- maven 打包插件 打原始jar包 第三方依赖打入jar包中-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--这里要替换成jar包main方法所在类 -->
<mainClass></mainClass>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.hikcreate.utils;
import org.csource.fastdfs.*;
import java.util.Properties;
public class FastdfsUtils {
private TrackerClient trackerClient = null;
private TrackerServer trackerServer = null;
private StorageServer storageServer = null;
//使用StorageClient1进行上传
private StorageClient1 storageClient1 = null;
public void init() {
try {
Properties prop = new Properties();
prop.put("fastdfs.tracker_servers","172.16.25.23:22122");
prop.put("fastdfs.connect_timeout_in_seconds","120");
prop.put("fastdfs.network_timeout_in_seconds","120");
ClientGlobal.initByProperties(prop);
trackerClient = new TrackerClient();
trackerServer = trackerClient.getConnection();
storageServer = trackerClient.getStoreStorage(trackerServer);
storageClient1 = new StorageClient1(trackerServer, storageServer);
ProtoCommon.activeTest(trackerServer.getSocket());
} catch (Exception e) {
e.printStackTrace();
System.out.println("初始化异常====================");
}
}
/**
* 上传图片
*
* @param file_id 图片id
* @param file_buff 图片内容
* @param file_ext_name 扩展名
* @return
* @throws Exception
*/
public String uploadFile(String file_id, byte[] file_buff, String file_ext_name) throws Exception {
//init();
/* FileInfo file = storageClient1.get_file_info1(file_id);
if(file!=null && file.getFileSize()>0){
storageClient1.delete_file1(file_id);
}
String result = storageClient1.upload_file1(file_id,"",file_buff,file_ext_name,null);*/
String result = storageClient1.upload_file1(file_id, "abc", file_buff, file_ext_name, null);
//String result=storageClient1.upload_file1(file_buff,"txt",null);
return result;
}
public String uploadFile(byte[] file_buff, String file_ext_name) throws Exception {
//init();
String result = storageClient1.upload_file1(file_buff, file_ext_name, null);
return result;
}
public void deleteFIle(String file_id) throws Exception{
//init();
storageClient1.delete_file1(file_id);
}
public void close() {
try {
if (storageServer != null) {
storageServer.close();
}
if (trackerServer != null) {
trackerServer.close();
}
} catch (Exception e) {
}
}
public static void main(String[] args) {
FastdfsUtils fdfs = new FastdfsUtils();
try {
String result = fdfs.uploadFile(args[0], "123".getBytes(), "txt");
String result1 = fdfs.uploadFile("123".getBytes(), "txt");
System.out.println("result" + result);
System.out.println("result1" + result1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>hadoop.security.auth_to_local</name>
<value>DEFAULT</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>hadoop.security.instrumentation.requires.admin</name>
<value>false</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/etc/hadoop/conf.cloudera.yarn/topology.py</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>hadoop.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>hadoop.ssl.require.client.cert</name>
<value>false</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.keystores.factory.class</name>
<value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.server.conf</name>
<value>ssl-server.xml</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.client.conf</name>
<value>ssl-client.xml</value>
<final>true</final>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameservice1</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hiknode3:9083</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.nameservice1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.nameservice1</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>hiknode2:2181,hiknode3:2181,hiknode4:2181,hiknode5:2181,hiknode6:2181</value>
</property>
<property>
<name>dfs.ha.namenodes.nameservice1</name>
<value>namenode237,namenode293</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode237</name>
<value>hiknode3:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode237</name>
<value>hiknode3:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode237</name>
<value>hiknode3:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode237</name>
<value>hiknode3:50470</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode293</name>
<value>hiknode6:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode293</name>
<value>hiknode6:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode293</name>
<value>hiknode6:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode293</name>
<value>hiknode6:50470</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.client.use.legacy.blockreader</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hdfs-sockets/dn</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.client.domain.socket.data.traffic</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hiknode3:9083</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/home/cdh/user/hive/warehouse</value>
</property>
<property>
<name>hive.warehouse.subdir.inherit.perms</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>20971520</value>
</property>
<property>
<name>hive.optimize.bucketmapjoin.sortedmerge</name>
<value>false</value>
</property>
<property>
<name>hive.smbjoin.cache.rows</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.logging.operation.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/home/cdh/var/log/hive/operation_logs</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>2147483648</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>1099</value>
</property>
<property>
<name>hive.vectorized.groupby.checkinterval</name>
<value>4096</value>
</property>
<property>
<name>hive.vectorized.groupby.flush.percent</name>
<value>0.1</value>
</property>
<property>
<name>hive.compute.query.using.stats</name>
<value>false</value>
</property>
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.reduce.enabled</name>
<value>false</value>
</property>
<property>
<name>hive.merge.mapfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.mapredfiles</name>
<value>false</value>
</property>
<property>
<name>hive.cbo.enable</name>
<value>false</value>
</property>
<property>
<name>hive.fetch.task.conversion</name>
<value>minimal</value>
</property>
<property>
<name>hive.fetch.task.conversion.threshold</name>
<value>268435456</value>
</property>
<property>
<name>hive.limit.pushdown.memory.usage</name>
<value>0.1</value>
</property>
<property>
<name>hive.merge.sparkfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.smallfiles.avgsize</name>
<value>16777216</value>
</property>
<property>
<name>hive.merge.size.per.task</name>
<value>268435456</value>
</property>
<property>
<name>hive.optimize.reducededuplication</name>
<value>true</value>
</property>
<property>
<name>hive.optimize.reducededuplication.min.reducer</name>
<value>4</value>
</property>
<property>
<name>hive.map.aggr</name>
<value>true</value>
</property>
<property>
<name>hive.map.aggr.hash.percentmemory</name>
<value>0.5</value>
</property>
<property>
<name>hive.optimize.sort.dynamic.partition</name>
<value>false</value>
</property>
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<property>
<name>spark.executor.memory</name>
<value>10737418240</value>
</property>
<property>
<name>spark.driver.memory</name>
<value>4294967296</value>
</property>
<property>
<name>spark.executor.cores</name>
<value>6</value>
</property>
<property>
<name>spark.yarn.driver.memoryOverhead</name>
<value>2048</value>
</property>
<property>
<name>spark.yarn.executor.memoryOverhead</name>
<value>4096</value>
</property>
<property>
<name>spark.dynamicAllocation.enabled</name>
<value>false</value>
</property>
<property>
<name>spark.dynamicAllocation.initialExecutors</name>
<value>5</value>
</property>
<property>
<name>spark.dynamicAllocation.minExecutors</name>
<value>1</value>
</property>
<property>
<name>spark.dynamicAllocation.maxExecutors</name>
<value>20</value>
</property>
<property>
<name>spark.executor.instances</name>
<value>12</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>hiknode3,hiknode4,hiknode2,hiknode6,hiknode5</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.zookeeper.namespace</name>
<value>hive_zookeeper_namespace_hive</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>true</value>
</property>
<property>
<name>hive.server2.use.SSL</name>
<value>false</value>
</property>
<property>
<name>spark.shuffle.service.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.spark.client.server.connect.timeout</name>
<value>300000</value>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>mapreduce.job.split.metainfo.maxsize</name>
<value>10000000</value>
</property>
<property>
<name>mapreduce.job.counters.max</name>
<value>120</value>
</property>
<property>
<name>mapreduce.job.counters.groups.max</name>
<value>50</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>false</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>BLOCK</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.DefaultCodec</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>zlib.compress.level</name>
<value>DEFAULT_COMPRESSION</value>
</property>
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>64</value>
</property>
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.8</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>10</value>
</property>
<property>
<name>mapreduce.task.timeout</name>
<value>600000</value>
</property>
<property>
<name>mapreduce.client.submit.file.replication</name>
<value>3</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>100</value>
</property>
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>256</value>
</property>
<property>
<name>mapreduce.map.speculative</name>
<value>false</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>false</value>
</property>
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
<value>0.8</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>hiknode3:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hiknode3:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.https.address</name>
<value>hiknode3:19890</value>
</property>
<property>
<name>mapreduce.jobhistory.admin.address</name>
<value>hiknode3:10033</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.staging-dir</name>
<value>/user</value>
</property>
<property>
<name>mapreduce.am.max-attempts</name>
<value>2</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>40960</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.cpu-vcores</name>
<value>1</value>
</property>
<property>
<name>mapreduce.job.ubertask.enable</name>
<value>false</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Djava.net.preferIPv4Stack=true -Xmx4294967296</value>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Djava.net.preferIPv4Stack=true -Xmx17179869184</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Djava.net.preferIPv4Stack=true -Xmx8589934592</value>
</property>
<property>
<name>yarn.app.mapreduce.am.admin.user.env</name>
<value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native:$JAVA_LIBRARY_PATH</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>20480</value>
</property>
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>2</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>10240</value>
</property>
<property>
<name>mapreduce.reduce.cpu.vcores</name>
<value>2</value>
</property>
<property>
<name>mapreduce.job.heap.memory-mb.ratio</name>
<value>0.8</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*,$MR2_CLASSPATH</value>
</property>
<property>
<name>mapreduce.jobhistory.jhist.format</name>
<value>binary</value>
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native:$JAVA_LIBRARY_PATH</value>
</property>
<property>
<name>mapreduce.job.redacted-properties</name>
<value>fs.s3a.access.key,fs.s3a.secret.key,fs.adl.oauth2.credential,dfs.adls.oauth2.credential</value>
</property>
<property>
<name>mapreduce.job.acl-view-job</name>
<value> </value>
</property>
<property>
<name>mapreduce.job.acl-modify-job</name>
<value> </value>
</property>
<property>
<name>mapreduce.cluster.acls.enabled</name>
<value>false</value>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>yarn.acl.enable</name>
<value>true</value>
</property>
<property>
<name>yarn.admin.acl</name>
<value>*</value>
</property>
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.ha.automatic-failover.embedded</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hiknode3:2181,hiknode4:2181,hiknode2:2181,hiknode6:2181,hiknode5:2181</value>
</property>
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
<name>yarn.client.failover-sleep-base-ms</name>
<value>100</value>
</property>
<property>
<name>yarn.client.failover-sleep-max-ms</name>
<value>2000</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarnRM</value>
</property>
<property>
<name>yarn.resourcemanager.address.rm252</name>
<value>hiknode3:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address.rm252</name>
<value>hiknode3:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address.rm252</name>
<value>hiknode3:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address.rm252</name>
<value>hiknode3:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm252</name>
<value>hiknode3:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.https.address.rm252</name>
<value>hiknode3:8090</value>
</property>
<property>
<name>yarn.resourcemanager.address.rm312</name>
<value>hiknode6:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address.rm312</name>
<value>hiknode6:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address.rm312</name>
<value>hiknode6:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address.rm312</name>
<value>hiknode6:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm312</name>
<value>hiknode6:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.https.address.rm312</name>
<value>hiknode6:8090</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm252,rm312</value>
</property>
<property>
<name>yarn.resourcemanager.client.thread-count</name>
<value>50</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.client.thread-count</name>
<value>50</value>
</property>
<property>
<name>yarn.resourcemanager.admin.client.thread-count</name>
<value>1</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>
<property>
<name>yarn.scheduler.increment-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>122880</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-vcores</name>
<value>1</value>
</property>
<property>
<name>yarn.scheduler.increment-allocation-vcores</name>
<value>1</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>120</value>
</property>
<property>
<name>yarn.resourcemanager.amliveliness-monitor.interval-ms</name>
<value>1000</value>
</property>
<property>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
</property>
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>2</value>
</property>
<property>
<name>yarn.resourcemanager.container.liveness-monitor.interval-ms</name>
<value>600000</value>
</property>
<property>
<name>yarn.resourcemanager.nm.liveness-monitor.interval-ms</name>
<value>1000</value>
</property>
<property>
<name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.client.thread-count</name>
<value>50</value>
</property>
<property>
<name>yarn.application.classpath</name>
<value>$HADOOP_CLIENT_CONF_DIR,$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,$HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,$HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<property>
<name>yarn.resourcemanager.max-completed-applications</name>
<value>10000</value>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/tmp/logs</value>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir-suffix</name>
<value>logs</value>
</property>
</configuration>
package com.hikcreate.picservice
import java.sql.Timestamp
import java.util.Properties
import java.math.BigDecimal
import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author yulong shu
* @date 2021/8/18 16:20
* @version 1.0
*/
object AcdPhotoPicService {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
doTask(sparkSession, args)
}
protected def doTask(sparkSession: SparkSession, args: Array[String]): Unit = {
val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
val outputTable = map.getOrElse("outputTable", "ods_jg_yp.ods_acd_photo")
val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val dbuser = map.getOrElse("user", "gyjg")
val dbpassword = map.getOrElse("password", "gyjg2018")
val inputTable = map.getOrElse("inputTable", "acd_photo")
val properties = new Properties()
properties.put("driver", "oracle.jdbc.driver.OracleDriver")
properties.put("user", dbuser)
properties.put("password", dbpassword)
//并发读取oracle数据
val arr = ArrayBuffer[Int]()
for (i <- 0 until (10)) {
arr.append(i)
}
val predicates = arr.map(i => {
s" MOD(TPXH,10) = $i "
}).toArray
val acdDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
import sparkSession.implicits._
val resultDF: DataFrame = acdDF.mapPartitions(
iter => {
val fdfs = new FastdfsUtils()
fdfs.init()
val acdItr: Iterator[AcdPhoto] = iter.map(
data => {
val sgbh = data.getAs[String]("SGBH")
val zjlx = data.getAs[String]("ZJLX")
val wsbh = data.getAs[String]("WSBH")
val tpxh = data.getAs[BigDecimal]("TPXH")
var tpxhStr = ""
if (tpxh != null) {
tpxhStr = tpxh.toString
}
val zjlxfl = data.getAs[String]("ZJLXFL")
val dsrbh = data.getAs[BigDecimal]("DSRBH")
var dsrbhStr = ""
if (dsrbh != null) {
dsrbhStr = dsrbh.toString
}
val tpsm = data.getAs[String]("TPSM")
val slr = data.getAs[String]("SLR")
val jbr = data.getAs[String]("JBR")
val lrsj = data.getAs[Timestamp]("LRSJ")
val tp = data.getAs[Array[Byte]]("TP")
var tpUrl = ""
if (tp != null && tp.size > 10) {
tpUrl = fdfs.uploadFile(tp, "jpeg")
}
val sltp = data.getAs[Array[Byte]]("SLTP")
var sltpUrl = ""
if (sltp != null && sltp.size > 10) {
sltpUrl = fdfs.uploadFile(sltp, "jpeg")
}
val bz = data.getAs[String]("BZ")
val sxh = data.getAs[BigDecimal]("SXH")
var sxhStr = ""
if (sxh != null) {
sxhStr = sxh.toString
}
AcdPhoto(sgbh, zjlx, wsbh, tpxhStr, zjlxfl, dsrbhStr, tpsm, slr, jbr, lrsj.toString, tpUrl, sltpUrl, bz, sxhStr)
}
)
acdItr
}).toDF()
resultDF.createOrReplaceTempView("acd_photo")
//当前处理图片数据后,删除fdfs上历史数据
if(resultDF.take(5)!=null && resultDF.take(5).size>0){
val photoDF: DataFrame = sparkSession.sql(
s"""
|select
|tp,
|sltp
|from
|${outputTable}
|""".stripMargin)
if(photoDF.take(10)!=null && photoDF.take(10).size>0){
photoDF.foreachPartition(
iter=>{
val fdfs = new FastdfsUtils
fdfs.init()
iter.map(
data=>{
val tp=data.getAs[String]("tp")
val sltp=data.getAs[String]("sltp")
fdfs.deleteFIle(tp)
fdfs.deleteFIle(sltp)
}
)
fdfs.close()
}
)
}
}
sparkSession.sql(s"insert overwrite table ${outputTable} select * from acd_photo")
}
}
case class AcdPhoto(sgbh: String, zjlx: String, wsbh: String, tpxh: String, zjlxfl: String, dsrbh: String, tpsm: String, slr: String, jbr: String, lrsj: String, tp: String, sltp: String, bz: String, sxh: String)
package com.hikcreate.picservice
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Properties}
import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author yulong shu
* @date 2021/8/13 14:36
* @version 1.0
*/
object DriverPhotoPicService {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
doTask(sparkSession, args)
}
protected def doTask(sparkSession: SparkSession, args: Array[String]): Unit = {
val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
val outputTable = map.getOrElse("outputTable", "ods_jg_yp.ods_drv_photo")
val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val dbuser = map.getOrElse("user", "gyjg")
val dbpassword = map.getOrElse("password", "gyjg2018")
val inputTable = map.getOrElse("inputTable", "drv_photo")
val properties = new Properties()
properties.put("driver", "oracle.jdbc.driver.OracleDriver")
properties.put("user", dbuser)
properties.put("password", dbpassword)
//并发读取oracle数据
val arr = ArrayBuffer[Int]()
for (i <- 0 until (10)) {
arr.append(i)
}
val predicates = arr.map(i => {
s" REGEXP_SUBSTR(REVERSE(XH),'[0-9]') = $i "
}).toArray
val drvDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
println("count:" + drvDF.count())
import sparkSession.implicits._
/* val resultDF: DataFrame = drvDF.mapPartitions(
iter => {
val fdfs = new FastdfsUtils()
fdfs.init()
val drvItr: Iterator[DrvPhoto] = iter.map(
data => {
val zp: Array[Byte] = data.getAs[Array[Byte]]("ZP")
val sfzmhm = data.getAs[String]("SFZMHM")
var zpUrl: String = ""
if (zp != null && zp.size > 10) {
//zpUrl = pic.uploadFileStor()
zpUrl=fdfs.uploadFile(zp,"jpeg")
}
val gxsj = data.getAs[Timestamp]("GXSJ")
var gxsjStr = ""
if (gxsj != null) {
gxsjStr = gxsj.toString
}
val xzqh = data.getAs[String]("XZQH")
val flag = data.getAs[String]("FLAG")
val xh = data.getAs[String]("XH")
val rksj = data.getAs[Timestamp]("RKSJ")
var rksjStr = ""
if (rksj != null) {
rksjStr = rksj.toString
}
val fzjg = data.getAs[String]("FZJG")
val sfzzp = data.getAs[Array[Byte]]("SFZZP")
var sfzzpUrl = ""
if (sfzzp != null && sfzzp.size > 10) {
sfzzpUrl=fdfs.uploadFile(sfzzp,"jpeg")
}
val mqzp = data.getAs[Array[Byte]]("MQZP")
var mqzppUrl = ""
if (mqzp != null && mqzp.size > 10) {
mqzppUrl=fdfs.uploadFile(mqzp,"jpeg")
}
val bjcsbj = data.getAs[String]("BJCSBJ")
val zplx = data.getAs[String]("ZPLX")
val zply = data.getAs[String]("ZPLY")
val jcjg = data.getAs[String]("JCJG")
val jcjgms = data.getAs[String]("JCJGMS")
DrvPhoto(sfzmhm, zpUrl, gxsjStr, xzqh, flag, xh, rksjStr, fzjg, sfzzpUrl, mqzppUrl, bjcsbj, zplx, zply, jcjg, jcjgms)
}
)
fdfs.close()
drvItr
}
).toDF()
resultDF.createOrReplaceTempView("drv_photo")
//当前处理图片数据后,删除fdfs上历史数据
if(resultDF.take(5)!=null && resultDF.take(5).size>0){
val photoDF: DataFrame = sparkSession.sql(
s"""
|select
|zp,
|sfzzp,
|mqzp
|from
|${outputTable}
|""".stripMargin)
if(photoDF.take(10)!=null && photoDF.take(10).size>0){
photoDF.foreachPartition(
iter=>{
val fdfs = new FastdfsUtils
fdfs.init()
iter.map(
data=>{
val zp=data.getAs[String]("zp")
val sfzzp=data.getAs[String]("sfzzp")
val mqzp=data.getAs[String]("mqzp")
fdfs.deleteFIle(zp)
fdfs.deleteFIle(sfzzp)
fdfs.deleteFIle(mqzp)
}
)
fdfs.close()
}
)
}
}
sparkSession.sql(s"insert overwrite table ${outputTable} select * from drv_photo")*/
/* resultDF
.write
.format("hive")
.option("spark.sql.catalogImplementation", "hive")
.mode(SaveMode.Overwrite)
.saveAsTable(outputTable)
sparkSession.stop() */
}
}
case class DrvPhoto(sfzmhm: String, zp: String, gxsj: String, xzqh: String, flag: String, xh: String, rksj: String, fzjg: String, sfzzp: String, mqzp: String, bjcsbj: String, zplx: String, zply: String, jcjg: String, jcjgms: String)
package com.hikcreate.picservice
import java.sql.Timestamp
import java.util.Properties
import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author yulong shu
* @date 2021/8/18 17:03
* @version 1.0
*/
object NetDrvPhotoPicService {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
doTask(sparkSession, args)
}
protected def doTask(sparkSession: SparkSession, args: Array[String]): Unit = {
val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
val outputTable = map.getOrElse("outputTable", "ods_jg_yp.ods_net_drv_photo")
val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val dbuser = map.getOrElse("user", "gyjg")
val dbpassword = map.getOrElse("password", "gyjg2018")
val inputTable = map.getOrElse("inputTable", "net_drv_photo")
val properties = new Properties()
properties.put("driver", "oracle.jdbc.driver.OracleDriver")
properties.put("user", dbuser)
properties.put("password", dbpassword)
//并发读取oracle数据
val arr = ArrayBuffer[Int]()
for (i <- 0 until (10)) {
arr.append(i)
}
val predicates = arr.map(i => {
s" REGEXP_SUBSTR(REVERSE(WWLSH),'[0-9]') = $i "
}).toArray
val drvDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
import sparkSession.implicits._
val resultDF: DataFrame = drvDF.mapPartitions(
iter => {
val fdfs = new FastdfsUtils()
fdfs.init()
val pcbItr: Iterator[NetDrvPhoto] = iter.map(
data => {
val wwlsh=data.getAs[String]("WWLSH")
val sfzmhm=data.getAs[String]("SFZMHM")
val zp=data.getAs[Array[Byte]]("ZP")
var zpUrl=""
if(zp!=null && zp.size>10){
zpUrl=fdfs.uploadFile(zp,"jpeg")
}
val gxsj=data.getAs[Timestamp]("GXSJ")
val fzjg=data.getAs[String]("FZJG")
NetDrvPhoto(wwlsh,sfzmhm,zpUrl,gxsj.toString,fzjg)
})
pcbItr
}).toDF()
resultDF.createOrReplaceTempView("pcb_st_photo")
//当前处理图片数据后,删除fdfs上历史数据
if(resultDF.take(5)!=null && resultDF.take(5).size>0){
val photoDF: DataFrame = sparkSession.sql(
s"""
|select
|zp
|from
|${outputTable}
|""".stripMargin)
if(photoDF.take(10)!=null && photoDF.take(10).size>0){
photoDF.foreachPartition(
iter=>{
val fdfs = new FastdfsUtils
fdfs.init()
iter.map(
data=>{
val zp=data.getAs[String]("zp")
fdfs.deleteFIle(zp)
}
)
fdfs.close()
}
)
}
}
sparkSession.sql(s"insert overwrite table ${outputTable} select * from pcb_st_photo")
}
}
case class NetDrvPhoto(wwlsh:String,sfzmhm:String,zp:String,gxsj:String,fzjg:String)
package com.hikcreate.picservice
import java.sql.Timestamp
import java.util.Properties
import com.hikcreate.picservice.AcdPhotoPicService.doTask
import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author yulong shu
* @date 2021/8/18 16:55
* @version 1.0
*/
object PcbStPhotoPicService {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
doTask(sparkSession, args)
}
protected def doTask(sparkSession: SparkSession, args: Array[String]): Unit = {
val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
val outputTable = map.getOrElse("outputTable", "ods_jg_yp.ods_pcb_st_photo")
val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val dbuser = map.getOrElse("user", "gyjg")
val dbpassword = map.getOrElse("password", "gyjg2018")
val inputTable = map.getOrElse("inputTable", "pcb_st_photo")
val properties = new Properties()
properties.put("driver", "oracle.jdbc.driver.OracleDriver")
properties.put("user", dbuser)
properties.put("password", dbpassword)
//并发读取oracle数据
val arr = ArrayBuffer[Int]()
for (i <- 0 until (10)) {
arr.append(i)
}
val predicates = arr.map(i => {
s" REGEXP_SUBSTR(REVERSE(ZPBH),'[0-9]') = $i "
}).toArray
val pcbDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
import sparkSession.implicits._
val resultDF: DataFrame = pcbDF.mapPartitions(
iter => {
val fdfs = new FastdfsUtils()
fdfs.init()
val pcbItr: Iterator[PcbStPhoto] = iter.map(
data => {
val zpbh=data.getAs[String]("ZPBH")
val zp=data.getAs[Array[Byte]]("ZP")
var zpUrl=""
if(zp!=null && zp.size>10){
zpUrl=fdfs.uploadFile(zp,"jpeg")
}
val gxrq=data.getAs[Timestamp]("GXRQ")
val jyw=data.getAs[String]("JYW")
PcbStPhoto(zpbh,zpUrl,gxrq.toString,jyw)
})
pcbItr
}).toDF()
resultDF.createOrReplaceTempView("pcb_st_photo")
//当前处理图片数据后,删除fdfs上历史数据
if(resultDF.take(5)!=null && resultDF.take(5).size>0){
val photoDF: DataFrame = sparkSession.sql(
s"""
|select
|zp
|from
|${outputTable}
|""".stripMargin)
if(photoDF.take(10)!=null && photoDF.take(10).size>0){
photoDF.foreachPartition(
iter=>{
val fdfs = new FastdfsUtils
fdfs.init()
iter.map(
data=>{
val zp=data.getAs[String]("zp")
fdfs.deleteFIle(zp)
}
)
fdfs.close()
}
)
}
}
sparkSession.sql(s"insert overwrite table ${outputTable} select * from pcb_st_photo")
}
}
case class PcbStPhoto(zpbh:String, zp:String, gxrq:String, jyw:String)
\ No newline at end of file
package com.hikcreate.picservice
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Properties, UUID}
import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author yulong shu
* @date 2021/8/16 11:31
* @version 1.0
*/
object VehPicturePicService {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
doTask(sparkSession,args)
}
protected def doTask(sparkSession: SparkSession,args:Array[String]): Unit = {
val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
val outputTable=map.getOrElse("outputTable","ods_jg_yp.ods_veh_picture")
val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val dbuser = map.getOrElse("user", "gyjg")
val dbpassword = map.getOrElse("password", "gyjg2018")
val inputTable = map.getOrElse("inputTable", "veh_picture")
val properties = new Properties()
properties.put("driver", "oracle.jdbc.driver.OracleDriver")
properties.put("user", dbuser)
properties.put("password", dbpassword)
//并发读取oracle数据
val arr = ArrayBuffer[Int]()
for (i <- 0 until (10)) {
arr.append(i)
}
val predicates = arr.map(i => {
s" REGEXP_SUBSTR(REVERSE(XH),'[0-9]') = $i "
}).toArray
val vehDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
import sparkSession.implicits._
val resultDF : DataFrame=vehDF.mapPartitions(
iter => {
val fdfs = new FastdfsUtils()
fdfs.init()
val picItr: Iterator[VehPicture] = iter.map(
data => {
val xh: String = data.getAs[String]("XH")
val hpzl: String = data.getAs[String]("HPZL")
val hphm: String = data.getAs[String]("HPHM")
var zpUrl = ""
val zp: Array[Byte] = data.getAs[Array[Byte]]("ZP")
if (zp != null && zp.size > 10) {
// zpUrl = pic.upload(UUID.randomUUID() + ".jpg", zp, photoHost, photoPort.toInt, photoUser, photoPassword, photoBasePath, photoFilepath + "_" + today, nginxUrl)
zpUrl= fdfs.uploadFile(zp, "jpeg")
}
val gxsj: Timestamp = data.getAs[Timestamp]("GXSJ")
var gxsjStr = ""
if (gxsj != null) {
gxsjStr = gxsj.toString
}
VehPicture(xh, hpzl, hphm, zpUrl, gxsjStr)
}
)
picItr
}
).toDF()
resultDF.createOrReplaceTempView("veh_picture")
//当前处理图片数据后,删除fdfs上历史数据
if(resultDF.take(5)!=null && resultDF.take(5).size>0){
val photoDF: DataFrame = sparkSession.sql(
s"""
|select
|zp
|from
|${outputTable}
|""".stripMargin)
if(photoDF.take(10)!=null && photoDF.take(10).size>0){
photoDF.foreachPartition(
iter=>{
val fdfs = new FastdfsUtils
fdfs.init()
iter.map(
data=>{
val zp=data.getAs[String]("zp")
fdfs.deleteFIle(zp)
}
)
fdfs.close()
}
)
}
}
sparkSession.sql(s"insert overwrite table ${outputTable} select * from veh_picture")
/*resultDF
.write
.format("hive")
.option("spark.sql.catalogImplementation","hive")
.mode(SaveMode.Overwrite)
.saveAsTable(outputTable)*/
}
}
case class VehPicture(xh:String,hpzl:String,hphm:String,zp:String,gxsj:String)
package com.hikcreate.picservice
import java.sql.Timestamp
import java.util.Properties
import com.hikcreate.picservice.utils.ParameterUtil
import com.hikcreate.utils.FastdfsUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author yulong shu
* @date 2021/8/18 17:11
* @version 1.0
*/
object VioJdczpPicService {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
doTask(sparkSession, args)
}
protected def doTask(sparkSession: SparkSession, args: Array[String]): Unit = {
val map: mutable.Map[String, String] = ParameterUtil.fromArgs(args)
val outputTable = map.getOrElse("outputTable", "ods_jg_yp.ods_vio_jdczp")
val url = map.getOrElse("url", "jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val dbuser = map.getOrElse("user", "gyjg")
val dbpassword = map.getOrElse("password", "gyjg2018")
val inputTable = map.getOrElse("inputTable", "vio_jdczp")
val properties = new Properties()
properties.put("driver", "oracle.jdbc.driver.OracleDriver")
properties.put("user", dbuser)
properties.put("password", dbpassword)
//并发读取oracle数据
val arr = ArrayBuffer[Int]()
for (i <- 0 until (10)) {
arr.append(i)
}
val predicates = arr.map(i => {
s" REGEXP_SUBSTR(REVERSE(XH),'[0-9]') = $i "
}).toArray
val vioDF: DataFrame = sparkSession.read.jdbc(url, inputTable, predicates, properties)
import sparkSession.implicits._
val resultDF: DataFrame = vioDF.mapPartitions(
iter => {
val fdfs = new FastdfsUtils()
fdfs.init()
val pcbItr: Iterator[VioJdczp] = iter.map(
data => {
val xh=data.getAs[String]("XH")
val hpzl=data.getAs[String]("HPZL")
val hphm=data.getAs[String]("HPHM")
val photo1=data.getAs[Array[Byte]]("PHOTO1")
var photo1Url=""
if(photo1!=null && photo1.size>10){
photo1Url=fdfs.uploadFile(photo1,"jpeg")
}
val photo2=data.getAs[Array[Byte]]("PHOTO2")
var photo2Url=""
if(photo2!=null && photo2.size>10){
photo2Url=fdfs.uploadFile(photo2,"jpeg")
}
val photo3=data.getAs[Array[Byte]]("PHOTO3")
var photo3Url=""
if(photo3!=null && photo3.size>10){
photo3Url=fdfs.uploadFile(photo3,"jpeg")
}
val lrsj=data.getAs[Timestamp]("LRSJ")
VioJdczp(xh,hpzl,hphm,photo1Url,photo2Url,photo3Url,lrsj.toString)
})
pcbItr
}).toDF()
resultDF.createOrReplaceTempView("vio_jdczp")
//当前处理图片数据后,删除fdfs上历史数据
if(resultDF.take(5)!=null && resultDF.take(5).size>0){
val photoDF: DataFrame = sparkSession.sql(
s"""
|select
|photo1,
|photo2,
|photo3
|from
|${outputTable}
|""".stripMargin)
if(photoDF.take(10)!=null && photoDF.take(10).size>0){
photoDF.foreachPartition(
iter=>{
val fdfs = new FastdfsUtils
fdfs.init()
iter.map(
data=>{
val photo1=data.getAs[String]("photo1")
val photo2=data.getAs[String]("photo2")
val photo3=data.getAs[String]("photo3")
fdfs.deleteFIle(photo1)
fdfs.deleteFIle(photo2)
fdfs.deleteFIle(photo3)
}
)
fdfs.close()
}
)
}
}
sparkSession.sql(s"insert overwrite table ${outputTable} select * from vio_jdczp")
}
}
case class VioJdczp(xh:String,hpzl:String,hphm:String,photo1:String,photo2:String,photo3:String,lrsj:String)
\ No newline at end of file
package com.hikcreate.picservice.utils
import scala.collection.mutable
import scala.collection.mutable.HashMap
/**
* @author yulong shu
* @date 2021/5/14 11:24
* @version 1.0
*/
object ParameterUtil extends Serializable {
def fromArgs(args: Array[String]):mutable.Map[String,String] ={
val map: mutable.Map[String, String] = new HashMap
args.foreach(
arg=>{
if(!arg.contains("=")){
throw new IllegalArgumentException(s"paramter ${arg} is illegal")
}else{
map+=(arg.split("=")(0) -> arg.split("=")(1))
}
}
)
map
}
def main(args: Array[String]): Unit = {
val props=Array("url=jdbc:oracle:thin:@172.16.17.82:1521:gyjg")
val map: mutable.Map[String, String] = fromArgs(props)
println(map.getOrElse("url",""))
}
}
package com.hikcreate.picservice.utils
import java.io.ByteArrayInputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.hadoop.io.IOUtils
/**
* @author yulong shu
* @date 2021/8/17 9:09
* @version 1.0
*/
class PicUtils {
def upload(file: Array[Byte], basePath: String, filePath: String, fileName: String, nginxUrl: String): Boolean = {
var result = false
val conf = new Configuration()
val fs: FileSystem = FileSystem.get(conf)
val inStream = new ByteArrayInputStream(file)
//如果路径不存在,创建目录
if(!fs.exists(new Path(basePath+filePath))) {
fs.mkdirs(new Path(basePath+filePath))
}
val outStream: FSDataOutputStream = fs.create(new Path(basePath + filePath))
try{
IOUtils.copyBytes(inStream,outStream,4096,false)
}
result
}
}
#!/bin/bash
export HIVE_CONF=/etc/hive/conf
APP_HOME=$(dirname $0)/..
export SPARK_HOME=/opt/cloudera/parcels/spark2
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hadoop/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hive
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 1G \
--executor-cores 1 \
--driver-memory 1G \
--driver-cores 1 \
--class "com.hikcreate.picservice.AcdPhotoPicService" \
--name "acdPhotoPicService" \
$APP_HOME/package/hptmsp-pic-1.0-SNAPSHOT-jar-with-dependencies.jar url="jdbc:oracle:thin:@172.16.17.82:1521:gyjg" user="gyjg" password="gyjg2018" inputTable="acd_photo" outputTable="ods_jg_yp.ods_acd_photo"
#!/bin/bash
export HIVE_CONF=/etc/hive/conf
APP_HOME=$(dirname $0)/..
export SPARK_HOME=/opt/cloudera/parcels/spark2
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hadoop/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hive
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 2 \
--driver-memory 2G \
--driver-cores 1 \
--class "com.hikcreate.picservice.DriverPhotoPicService" \
--name "drvPhotoService" \
$APP_HOME/package/hptmsp-pic-1.0-SNAPSHOT-jar-with-dependencies.jar url="jdbc:oracle:thin:@172.16.17.82:1521:gyjg" user="gyjg" password="gyjg2018" inputTable="drv_photo" outputTable="ods_jg_yp.ods_drv_photo"
#!/bin/bash
export HIVE_CONF=/etc/hive/conf
APP_HOME=$(dirname $0)/..
export SPARK_HOME=/opt/cloudera/parcels/spark2
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hadoop/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hive
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 1G \
--executor-cores 1 \
--driver-memory 1G \
--driver-cores 1 \
--class "com.hikcreate.picservice.NetDrvPhotoPicService" \
--name "netDrvPhotoPicService" \
$APP_HOME/package/hptmsp-pic-1.0-SNAPSHOT-jar-with-dependencies.jar url="jdbc:oracle:thin:@172.16.17.82:1521:gyjg" user="gyjg" password="gyjg2018" inputTable="net_drv_photo" outputTable="ods_jg_yp.ods_net_drv_photo"
#!/bin/bash
export HIVE_CONF=/etc/hive/conf
APP_HOME=$(dirname $0)/..
export SPARK_HOME=/opt/cloudera/parcels/spark2
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
#export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hadoop/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hive
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 1G \
--executor-cores 1 \
--driver-memory 1G \
--driver-cores 1 \
--class "com.hikcreate.picservice.PcbStPhotoPicService" \
--name "pcbStPhotoPicService" \
$APP_HOME/package/hptmsp-pic-1.0-SNAPSHOT-jar-with-dependencies.jar url="jdbc:oracle:thin:@172.16.17.82:1521:gyjg" user="gyjg" password="gyjg2018" inputTable="pcb_st_photo" outputTable="ods_jg_yp.ods_pcb_st_photo"
#!/bin/bash
export HIVE_CONF=/etc/hive/conf
APP_HOME=$(dirname $0)/..
export SPARK_HOME=/opt/cloudera/parcels/spark2
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hadoop/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hive
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 2 \
--driver-memory 2G \
--driver-cores 1 \
--class "com.hikcreate.picservice.VehPicturePicService" \
--name "vehPictureService" \
$APP_HOME/package/hptmsp-pic-1.0-SNAPSHOT-jar-with-dependencies.jar url="jdbc:oracle:thin:@172.16.17.82:1521:gyjg" user="gyjg" password="gyjg2018" inputTable="veh_picture" outputTable="ods_jg_yp.ods_veh_picture"
#!/bin/bash
export HIVE_CONF=/etc/hive/conf
APP_HOME=$(dirname $0)/..
export SPARK_HOME=/opt/cloudera/parcels/spark2
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hadoop/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/hive
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 1G \
--executor-cores 1 \
--driver-memory 1G \
--driver-cores 1 \
--class "com.hikcreate.picservice.VioJdczpPicService" \
--name "vioJdczpPicService" \
$APP_HOME/package/hptmsp-pic-1.0-SNAPSHOT-jar-with-dependencies.jar url="jdbc:oracle:thin:@172.16.17.82:1521:gyjg" user="gyjg" password="gyjg2018" inputTable="vio_jdczp" outputTable="ods_jg_yp.ods_vio_jdczp"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment