Commit 6fa68d1d by shuyulong

kafka转发程序

parents
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
<option name="TYPE_ANNOTATION_PRIVATE_MEMBER" value="true" />
<option name="TYPE_ANNOTATION_LOCAL_DEFINITION" value="true" />
<option name="TYPE_ANNOTATION_FUNCTION_PARAMETER" value="true" />
<option name="TYPE_ANNOTATION_UNDERSCORE_PARAMETER" value="true" />
</ScalaCodeStyleSettings>
</code_scheme>
</component>
\ No newline at end of file
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="kafka_forword" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka_forword</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>com.hikcreate.app.KafkaForword</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka_forword</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.12.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.hikcreate.app.KafkaForword</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.hikcreate.app;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Properties;
import static sun.misc.Version.println;
/**
* @author yulong shu
* @version 1.0
* @date 2021/6/9 16:18
*/
public class KafkaForword {
public static void main(String[] args) {
Logger logger = LogManager.getLogger(KafkaForword.class);
Properties consumerprops = new Properties();
consumerprops.put("bootstrap.servers", "slave7:6667,slave8:6667,slave9:6667");
consumerprops.put("group.id", "jw_forword");
consumerprops.put("enable.auto.commit", "false");
consumerprops.put("session.timeout.ms", "30000");
consumerprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerprops.put("auto.offset.reset", "latest");
KafkaConsumer<String, String> consumer = new KafkaConsumer(consumerprops);
Properties producerprops = new Properties();
producerprops.put("bootstrap.servers", "10.11.57.119:9092,10.11.57.108:9092,10.11.57.109:9092,10.11.57.110:9092,10.11.57.111:9092");
producerprops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerprops.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerprops.put("acks", "1");
producerprops.put("timeout.ms", "30000");
producerprops.put("batch.size","1000");
consumer.subscribe(Collections.singletonList("JW_CREDIT"));
KafkaProducer<String, String> producer = new KafkaProducer(producerprops);
logger.info("初始化完成");
int flag=0;
try {
for (; ; ) {
ConsumerRecords<String, String> records = consumer.poll(1000);
if (!records.isEmpty() && records.count()>0) {
flag=0;
logger.info("消费到:" + records.count() + "条数据");
for (ConsumerRecord<String, String> record : records) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("creditScoreTopic", record.value());
producer.send(producerRecord);
}
producer.flush();
consumer.commitSync();
}else{
Thread.sleep(1000);
flag+=1;
}
if(flag>180){
logger.info("180s内无kafka消息,退出转发程序");
System.exit(0);
}
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
if(producer!=null) producer.close();
if(consumer!=null) consumer.close();
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!-- status=debug 可以查看log4j的装配过程 -->
<Configuration status="off" monitorInterval="1800">
<properties>
<property name="LOG_HOME">/home/shuyulong/kafka_forword/logs/</property>
<property name="FILE_NAME">forword</property>
</properties>
<Appenders>
<!-- 定义控制台输出 -->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n" />
</Console>
<RollingRandomAccessFile name="running-log"
fileName="${LOG_HOME}/${FILE_NAME}.log" filePattern="${LOG_HOME}/
%d{yyyy-MM-dd}.log">
<PatternLayout
pattern="%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n"/>
<Policies>
<!-- 每天生成一个日志文件 -->
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
<!-- 最大保存文件数 -->
<DefaultRolloverStrategy max="20" />
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Logger name="com.hikcreate.app" level="info"
additivity="true">
<AppenderRef ref="running-log" />
</Logger>
<Root level="info">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment