Commit 0a6a342d by mlin

修改包名、git仓库名

parents
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>kafka-read-write</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.7
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafka</groupId>
<artifactId>io</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>io</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11 </artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
</project>
package com.zhiwei.kafka.io;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
package com.zhiwei.kafka.io;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConsumerTest{
public static void main(String[] args) {
System.out.println("start consumer..");
Properties props = new Properties();
/* 配置文件
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
*/
//读取配置文件
InputStream ins = Thread.currentThread().getContextClassLoader().getResourceAsStream("./consumer.properties");
try {
props.load(ins);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//订阅 topic list
consumer.subscribe(Arrays.asList("test2"));
//要处理的消息为list<map>
List<Map<String,Object>>messages = new ArrayList<Map<String,Object>>();
int count = 0;
System.out.println("out while");
// 持续接收消息
while (true) {
count = count+1;
System.out.println("in while:");
//测试 跳出循环
if(count>5)
break;
Map<String, Object> map1 = new HashMap<String,Object>();
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
//把获得的messages重新转换成list<map>
map1.put(record.key(), record.value());
//System.out.println(record.value());
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
System.out.println(map1.get("1"));
messages.add(map1);
}
consumer.close();
System.out.println("stop consumer...");
}
}
package com.zhiwei.kafka.io;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
System.out.print("start producer..");
Properties props = new Properties();
//读取配置文件
InputStream ins = Thread.currentThread().getContextClassLoader().getResourceAsStream("./producer.properties");
//加载配置文件
try {
props.load(ins);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//要处理的消息为 list<map>
List<Map<String,Object>> messages = new ArrayList<Map<String,Object>>();
Map<String,Object>map1 = new HashMap<String,Object>();
map1.put("1", "a");
map1.put("2", "b");
messages.add(map1);
//发送消息
for (Map<String,Object> map: messages) {
for (String s : map1.keySet()) {
producer.send(new ProducerRecord<String, String>("myTest",s.toString(),map.get(s).toString()));
//producer.send(new ProducerRecord<String, String>("test1", "from windows!","by lin."));
}
}
producer.close();
System.out.print("stop producer..");
}
}
bootstrap.servers=10.10.103.40:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
\ No newline at end of file
bootstrap.servers=10.10.103.40:9092
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment