kafka consumer 配置java代码为什么可以一直运行

org.apache.kafka - kafka_2.10 - 源码查看 - 拨云剑
请点击左侧目录树,选择待查看的源码文件.&&&& &&&&&&
&& &&&&&&&&&&&&
版权所有 鲁ICP备号-4
打开技术之扣,分享程序人生!当前访客身份:游客 [
热衷于大数据处理技术研究、使用 关注中间件技术
:引用来自“21克爱杰伦”的评论我使用的是Java EE...
:引用来自“21克爱杰伦”的评论我那个直接从eclip...
:/ressources/VanDyke_S...
:我使用的是Java EE IDE 版本的eclipse Eclipse J...
:我那个直接从eclipse里面安装加载不到; 然后直接...
:请问我在Red Hat 4.8.2-16 搭建了hdfs,在使用htt...
:这里metadata.broker.list直接指定broker节点地址...
:昨天的问题,今天看到这个,问题解决了,类和函数...
今日访问:713
昨日访问:238
本周访问:713
本月访问:12457
所有访问:152039
Kafka JAVA客户端代码示例--高级应用
发表于1年前( 11:37)&&
阅读(2685)&|&评论()
0人收藏此文章,
kafka是一种高吞吐量的分布式发布订阅消息系统,本文提供最新版本的JAVA客户端高级应用代码示例
什么时间使用高级应用?
针对一个消息读取多次
在一个process中,仅仅处理一个topic中的一组partitions
使用事务,确保每个消息只被处理一次
使用高级应用(调用较底层函数)的缺点?
&&&&SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)
在应用程序中跟踪上次消息处理的offset
确定一个topic partition的lead broker
手工处理broker leander的改变
使用底层函数(SimpleConsumer)开发的步骤
&通过active broker,确定topic partition的lead broker
确定topic partition的replicat brokers
根据需要,创建数据请求
识别lead brokder改变并进行恢复
import java.nio.ByteB
import java.util.ArrayL
import java.util.C
import java.util.HashM
import java.util.L
import java.util.M
import kafka.api.FetchR
import kafka.api.FetchRequestB
import kafka.api.PartitionOffsetRequestI
import kafka.cluster.B
mon.ErrorM
mon.TopicAndP
import kafka.javaapi.FetchR
import kafka.javaapi.OffsetR
import kafka.javaapi.PartitionM
import kafka.javaapi.TopicM
import kafka.javaapi.TopicMetadataR
import kafka.javaapi.TopicMetadataR
import kafka.javaapi.consumer.SimpleC
import kafka.message.MessageAndO
* https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
* @author Fung
public class ConsumerSimpleExample {
public static void main(String arg[]) {
String[] args={"20","page_visits","2","172.168.63.233","9092"};
ConsumerSimpleExample example = new ConsumerSimpleExample();
long maxReads = Long.parseLong(args[0]);
String topic = args[1];
int partition = Integer.parseInt(args[2]);
List&String& seeds = new ArrayList&String&();
seeds.add(args[3]);
int port = Integer.parseInt(args[4]);
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
private List&String& m_replicaBrokers = new ArrayList&String&();
public ConsumerSimpleExample() {
m_replicaBrokers = new ArrayList&String&();
public void run(long a_maxReads, String a_topic, int a_partition,
List&String& a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if (metadata == null) {
System.out
.println("Can't find metadata for Topic and Partition. Exiting");
if (metadata.leader() == null) {
System.out
.println("Can't find Leader for Topic and Partition. Exiting");
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
* 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
int numErrors = 0;
while (a_maxReads & 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,
64 * 1024, clientName);
// Note: this fetchSize of 100000 might need to be increased if
// large batches are written to Kafka
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
if (numErrors & 5)
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
consumer.close();
consumer =
leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset & readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
if (numRead == 0) {
Thread.sleep(1000);
} catch (InterruptedException ie) {
if (consumer != null)
consumer.close();
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map&TopicAndPartition, PartitionOffsetRequestInfo& requestInfo = new HashMap&TopicAndPartition, PartitionOffsetRequestInfo&();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
long[] offsets = response.offsets(topic, partition);
return offsets[0];
private String findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_port) throws Exception {
for (int i = 0; i & 3; i++) {
boolean goToSleep =
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
goToSleep =
} else if (metadata.leader() == null) {
goToSleep =
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
goToSleep =
return metadata.leader().host();
if (goToSleep) {
Thread.sleep(1000);
} catch (InterruptedException ie) {
System.out
.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to find new leader after Broker failure. Exiting");
private PartitionMetadata findLeader(List&String& a_seedBrokers,
int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData =
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer =
consumer = new SimpleConsumer(seed, a_port,
"leaderLookup");
List&String& topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
List&TopicMetadata& metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData =
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", "
+ a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
return returnMetaD
更多开发者职位上
1)">1)">1" ng-class="{current:{{currentPage==page}}}" ng-repeat="page in pages"><li class='page' ng-if="(endIndex<li class='page next' ng-if="(currentPage
相关文章阅读storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用。这里结合自己的应用做个简单总结。
由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据。如果有需要的话,自己实现也并不困难。使用方法如下:
// 设置kafka的zookeeper集群
BrokerHosts hosts = new ZkHosts(&10.1.80.249:.80.250:.80.251:2181/kafka&);
// 初始化配置信息
SpoutConfig conf = new SpoutConfig(hosts, &topic&, &/zkroot&,&topo&);
// 在topology中设置spout
builder.setSpout(&kafka-spout&, new KafkaSpout(conf));
这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0。默认情况下,spout下会发射域名为bytes的binary数据,如果有需要,可以通过设置schema进行修改。
如上面所示,使用起来还是很简单的,下面简单的分析一下实现细节。
KafkaSpout.open
// 初始化用于读写zookeeper的客户端对象_state
Map stateConf = new HashMap(conf);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
// 初始化用于读取kafka数据coordinator,真正数据读取使用的是内部的PartitionManager
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
读取数据:
KafkaSpout.nextTuple
// 通过各个分区对应的PartitionManager读取数据
List&PartitionManager& managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i & managers.size(); i++) {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
// 调用manager的next方法读取数据并emit
EmitState state = managers.get(_currPartitionIndex).next(_collector);
// 提交读取到的位置到zookeeper
long now = System.currentTimeMillis();
if((now - _lastUpdateMs) & _spoutConfig.stateUpdateIntervalMs) {
ack和fail:
KafkaSpout.ack
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//调用PartitionManager的ack
m.ack(id.offset);
KafkaSpout.fail
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//调用PartitionManager的fail
m.fail(id.offset);
可以看出,主要的逻辑都在PartitionManager这个类中。下面对它做个简单的分析:
//从zookeeper中读取上一次的偏移
Map&Object, Object& json = _state.readJSON(path);
//根据当前时间获取一个偏移
Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
//maxOffsetBehind为两个偏移的最大范围,如果超过这个范围,则用最新偏移覆盖读取偏移,两个偏移间的数据会被丢弃。如果不希望这样,应该将它设置成一个较大的值或者MAX_VALUE
if (currentOffset - _committedTo & spoutConfig.maxOffsetBehind || _committedTo &= 0) {
_committedTo = currentO
//初始化当前偏移
_emittedToOffset = _committedTo;
next和fill:
PartitionManager.next
//调用fill填充待发送队列
if (_waitingToEmit.isEmpty()) {
//发送数据
while (true) {
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
Iterable&List&Object&& tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
for (List&Object& tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
ack(toEmit.offset);
PartitionManager.fill
//初始化当前偏移,读取消息
if (had_failed) {
//先处理失败的偏移
offset = failed.first();
offset = _emittedToO
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
for (MessageAndOffset msg : msgs) {
final Long cur_offset = msg.offset();
if (cur_offset & offset) {
// Skip any old offsets.
if (!had_failed || failed.contains(cur_offset)) {
numMessages += 1;
//将偏移添加到pending中
_pending.add(cur_offset);
//将消息添加到待发送中
_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
if (had_failed) {
failed.remove(cur_offset);
PartitionManager.ack
//从_pending中移除该偏移,如果该偏移与当前偏移的差大于maxOffsetBehind,则清空pending
if (!_pending.isEmpty() && _pending.first() & offset - _spoutConfig.maxOffsetBehind) {
// Too many things pending!
_pending.headSet(offset).clear();
_pending.remove(offset);
numberAcked++;
PartitionManager.fail
//将偏移添加到失败队列中
failed.add(offset);
numberFailed++;
最后,加上一张图做个总结:
相关 [storm 笔记 kafka] 推荐:
- 开源软件 - ITeye博客
storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景. 因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用. 这里结合自己的应用做个简单总结.
由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据. 如果有需要的话,自己实现也并不困难.
这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0.
- 行业应用 - ITeye博客
大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目. 对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目. 可以带着下面问题来阅读本文章:. 1.一个好的项目架构应该具备什么特点. 2.本项目架构是如何保证数据准确性的. 4.flume+kafka如何整合. 5.使用什么脚本可以查看flume有没有往Kafka传输数据. 做软件开发的都知道模块化思想,这样设计的原因有两方面:. 一方面是可以模块化,功能划分更加清晰,从“数据采集--数据接入--流失计算--数据输出/存储”.
- 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,. 执行结果如下:列出了所有消费者组的所有信息,包括Group(消费者组)、Topic、Pid(分区id)、Offset(当前已消费的条数)、LogSize(总条数)、Lag(未消费的条数)、Owner. 细看kafka-run-class.sh脚本,它是调用 了ConsumerOffsetChecker的main方法,所以,我们也可以通过java代码来访问scala的ConsumerOffsetChecker类,代码如下:.
- 搜索技术博客-淘宝
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高. 举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了. 再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀. 其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理,而且大多是在夜深人静之时做的,那么你今天白天做的事情当然要明天才能反映出来啦.
- 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献
http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解. 不少人虽然在用消息队列,却并没有搞清楚消息队列是干嘛的. 有人会回答,消息队列就是为了分发消息用的. 那么,消息队列是用来提高性能,加速消息传输的吗. 显然不是,消息队列虽然提供了数据上的冗余,但它不是一种缓存. 如果你想加速,直接在把生产者与消费者合在一起写,中间自己加一个全内存的queue,没有了持久化,没有了网络传输,岂不更快.
- ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1. num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些. 配置线程数量为cpu核数2倍,最大不超过3倍. 2.log数据文件刷新策略. 为了大幅度提高producer写入吞吐量,需要定期批量写文件. # 每当producer写入10000条消息时,刷数据到磁盘.
- 互联网 - ITeye博客
kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目. 在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ. 在这片博文中,作者简单提到了开发kafka而不选择已有MQ系统的原因. Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB). Kafka版本:0.8.0. 官网:http://kafka.apache.org/. 官方文档:http://kafka.apache.org/documentation.html#quickstart. Kafka是用Scala写的,SBT是Simple Build Tool的简称,如果读者使用过Maven,那么可以简单将SBT看做是Scala世界的Maven,虽然二者各有优劣,但完成的工作基本是类似的.
- 互联网 - ITeye博客
//启动zookeeper server (用&是为了能退出命令行):. //启动kafka server: . 已有
0 人发表留言,猛击-&&
这里&&-参与讨论. —软件人才免语言低担保 赴美带薪读研.
- CSDN博客云计算推荐文章
Producer是一个应用程序,它创建消息并发送它们到Kafka broker中. 这些producer在本质上是不同. 比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer. 这些不同的Producer能够使用不同的语言实现,比如java、C和Python. 下面的这部图表解释了消息producer的Kafka API.. 下面将详细介绍如果编写一个简单的Producer和Consumer应用程序. 发送简单消息给Kafka broker,Producer端编写类ClusterProducer. 定于Consumer获取端,获取对应topic的数据:.
- 编程语言 - ITeye博客
在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)
运行在本地机器的单一JVM上,这个模式主要用来开发、调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式. 写一个HelloWord Storm.
我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过.
坚持分享优质有趣的原创文章,并保留作者信息和版权声明,任何问题请联系:@。}

我要回帖

更多关于 kafka consumer group 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信