如何手动更新kafka 删除topic中某个Topic的偏移量

kafka topic 问题
各位为好:
就是kafka将topic分为多个区,然后将区分布在多个sever上;如果将10个topic,每个topic分1个区。那么这个topic的10个区会分布在不同的server上吗?还是会集中在1,2台server上?如果分为多个区的话,我知道是会均匀分布的;
另外每个区是在那个server上怎么知道?owner 只是告诉了是消费者在消费当前分区,不知道当前分区在那个server上。
我也在研究这个 有空一起研究下
对KAFKA高可用表示怀疑
各个参数的含义:
1 &--zk: zookeeper的地址
2 &--prot 端口号
3 &--refresh 刷新频率,更新到DB。
4 &--retain 保留DB的时间
5 & 设置dbName在哪里存储记录(默认'offsetapp')
更详细的你可以看这篇文章呀:/54
Spark+Kafka实时流机器学习实战
课程观看地址:
课程出自学途无忧网:
一、课程使用到的软件及版本
①Spark1.6.2
②kafka0.8.2.1
③centos6.5
二、课程适合人群
适合想学kafka,Spark实时流计算,Spark机器学习的学员
三、课程目标
①学好Kafka,及其整个架构实现原理
②熟练运用Spark机器学习
③熟练掌控Spark与Kafka结合,实现实时流计算
四、课程目录
第1课、spark与kafka的介绍
第2课、spark的集群安装
第3课、Spark RDD函数讲解与实战分析
第4课、Spark 的java操作实现简单程序
第5课、SparkRDD原理详细剖析
第6课、Spark 机器学习,API阅读
第7课、Kafka架构介绍以及集群安装
第8课、Kafka生产者Producer的实战
第9课、Kafka消费者Consumer剖析与实战
第10课、Kafka复杂消费者的详细讲解
第11课、Kafka数据安全,以及Spark Kafka Streaming API
第12课、Spark+Kafka+Mysql整合
第13课、Spark 机器学习ALS设计
第14课、Spark ALS协同过滤java实战
第15课、Spark ALS给用户推荐产品
第16课、Spark机器学习后存储到Mysql
第17课、Spark读取Kafka流构建Als模型
第18课、Spark处理Kafka流构建Als模型&
第19课、Spark处理Kafka流实现实时推荐算法
第20课、Spark学习经验总结,spark2与spark1的区别,下期预告
推荐组合学习:《深入浅出Spark机器学习实战(用户行为分析)》
课程观看地址:菜菜光 的BLOG
用户名:菜菜光
文章数:149
评论数:119
访问量:179234
注册日期:
阅读量:5863
阅读量:12276
阅读量:380557
阅读量:1072813
51CTO推荐博文
结构:nginx-flume-&kafka-&flume-&kafka(因为牵扯到跨机房问题,在两个kafka之间加了个flume,蛋疼。。)现象:在第二层,写入kafka的topic和读取的kafka的topic相同,手动设定的sink topic不生效打开debug日志:source实例化:21&Apr&:03,146&INFO&[conf-file-poller-0]&(org.apache.flume.source.DefaultSourceFactory.create:41)&-&Creating&instance&of&source&kafka1,&type&org.apache.flume.source.kafka.KafkaSource
21&Apr&:03,146&DEBUG&[conf-file-poller-0]&(org.apache.flume.source.DefaultSourceFactory.getClass:61)&&-&Source&type&org.apache.flume.source.kafka.KafkaSource&is&a&custom&type
21&Apr&:03,152&INFO&&[conf-file-poller-0]&(org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties:37)&&-&context={&parameters:{topic=bigdata_api_ele_me_access,&batchDurationMillis=5000,&groupId=nginx,&zookeeperConnect=xxx,&channels=bigdata_api_ele_me_access-channel4,&batchSize=2000,&type=org.apache.flume.source.kafka.KafkaSource}&}sink实例化:21&Apr&:03,185&INFO&&[conf-file-poller-0]&(org.apache.flume.sink.DefaultSinkFactory.create:42)&&-&Creating&instance&of&sink:&web-sink2,&type:&org.apache.flume.sink.kafka.KafkaSink
21&Apr&:03,185&DEBUG&[conf-file-poller-0]&(org.apache.flume.sink.DefaultSinkFactory.getClass:63)&&-&Sink&type&org.apache.flume.sink.kafka.KafkaSink&is&a&custom&type
21&Apr&:03,190&DEBUG&[conf-file-poller-0]&(org.apache.flume.sink.kafka.KafkaSink.configure:220)&&-&Using&batch&size:&2000
21&Apr&:03,190&INFO&&[conf-file-poller-0]&(org.apache.flume.sink.kafka.KafkaSink.configure:229)&&-&Using&the&static&topic:&nginx-access&this&may&be&over-ridden&by&event&headers
21&Apr&:03,191&INFO&&[conf-file-poller-0]&(org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)&&-&context={&parameters:{topic=nginx-access,&brokerList=1xxx,&requiredAcks=1,&batchSize=2000,&type=org.apache.flume.sink.kafka.KafkaSink,&channel=bigdata_api_ele_me_access-channel4}&}
21&Apr&:03,191&DEBUG&[conf-file-poller-0]&(org.apache.flume.sink.kafka.KafkaSink.configure:236)&&-&Kafka&producer&properties:&{metadata.broker.list=192.168.101.43:.101.44:.101.45:9092,&request.required.acks=1,&key.serializer.class=kafka.serializer.StringEncoder,&serializer.class=kafka.serializer.DefaultEncoder}可以看到创建sink和source实例的时候配置上下文context中topic是按设置的来的,但是看到日志中有下面一段:Using&the&static&topic:&nginx-access&this&may&be&over-ridden&by&event&headers分析KafkaSink源码:org.apache.flume.sink.kafka.KafkaSink.process方法中:&&public&static&final&String&KEY_HDR&=&"key";
&&public&static&final&String&TOPIC_HDR&=&"topic";
&&&&&&&&if&((eventTopic&=&headers.get(TOPIC_HDR))&==&null)&{
&&&&&&&&&&eventTopic&=&
&&&&&&&&}&//eventTopic的取值,会从header中获取,如果header中没有才会使用配置的topic
&&&&&&&&...
&&&&&&&&eventKey&=&headers.get(KEY_HDR);
&&&&&&&&...
&&&&&&&&KeyedMessage&String,&byte[]&&data&=&new&KeyedMessage&String,&byte[]&
&&&&&&&&&&(eventTopic,&eventKey,&eventBody);
&&&&&&&&messageList.add(data);其中topic的取值在configure中:&&&&topic&=&context.getString(KafkaSinkConstants.TOPIC,
&&&&&&KafkaSinkConstants.DEFAULT_TOPIC);&//通过flume的配置获取topic,如果没有设置topic按默认default-flume-topic处理
&&&&if&(topic.equals(KafkaSinkConstants.DEFAULT_TOPIC))&{
&&&&&&logger.warn("The&Property&'topic'&is&not&set.&"&+
&&&&&&&&"Using&the&default&topic&name:&"&+
&&&&&&&&KafkaSinkConstants.DEFAULT_TOPIC);
&&&&}&else&{
&&&&&&("Using&the&static&topic:&"&+&topic&+
&&&&&&&&"&this&may&be&over-ridden&by&event&headers");&//这里提示可能会被header覆盖
&&&&}header的来源:1)kafka中的数据是没有header的概念的2)flume中的消息分header/body概念这种结构下,数据由kafkasource进入flume,添加了header信息,然后流入到kafkasinkkafkasource中header的添加处理在org.apache.flume.source.kafka.KafkaSource.process方法中:&&&&&&&&if&(iterStatus)&{
&&&&&&&&&&//&get&next&message
&&&&&&&&&&MessageAndMetadata&byte[],&byte[]&&messageAndMetadata&=&it.next();
&&&&&&&&&&kafkaMessage&=&messageAndMetadata.message();
&&&&&&&&&&kafkaKey&=&messageAndMetadata.key();
&&&&&&&&&&//&Add&headers&to&event&(topic,&timestamp,&and&key)
&&&&&&&&&&headers&=&new&HashMap&String,&String&();
&&&&&&&&&&headers.put(KafkaSourceConstants.TIMESTAMP,
&&&&&&&&&&&&&&&&&&String.valueOf(System.currentTimeMillis()));
&&&&&&&&&&headers.put(KafkaSourceConstants.TOPIC,&topic);因为kafka中不需要header,注释掉org.apache.flume.sink.kafka.KafkaSink.process中这几段代码即可:&&&&&&&&/*
&&&&&&&&if&((eventTopic&=&headers.get(TOPIC_HDR))&==&null)&{
&&&&&&&&&&eventTopic&=&
&&&&&&&&*/
&&&&&&&&eventTopic&=&&//增加这一段,否则会有npe错误本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)Kafka介绍Kafka的介绍可参考:http://blog.csdn.net/eric_sunah/article/details/Zookeeper在Kafka中的使用从上面的介绍可以看出Kafka是一个分布式的消息系统,分布式主要体现在Producer,Broker,Consumer的分布式,下面章节主要讲解Zookeeper如何支持相关对象的分布式特性Broker注册Broker在zookeeper中保存为一个临时节点,节点的路径是/brokers/ids/[brokerid],每个节点会保存对应broker的IP以及端口等信息.Topic注册在kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况都保存在zookeeper中,根节点路径为/brokers/topics,每个topic都会在topics下建立独立的子节点,每个topic节点下都会包含分区以及broker的对应信息,例如下图中的状态生产者负载均衡当Broker启动时,会注册该Broker的信息,以及可订阅的topic信息。生产者通过注册在Broker以及Topic上的watcher的感知Broker以及Topic的分区情况,从而将Topic的分区动态的分配到broker上. &kafka有消费者分组的概念,每个分组中可以包含多个消费者,每条消息只会发给分组中的一个消费者,且每个分组之间是相互独立互不影响的。消费者与分区的对应关系对于每个消费者分组,kafka都会为其分配一个全局唯一的Group ID,分组内的所有消费者会共享该ID,kafka还会为每个消费者分配一个consumer ID,通常采用hostname:uuid的形式。在kafka的设计中规定,对于topic的每个分区,最多只能被一个消费者进行消费,也就是消费者与分区的关系是一对多的关系。消费者与分区的关系也被在zookeeper中节点的路劲为 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id],该节点的内容就是消费者的Consumer ID例如下图的状态:消费者负载均衡消费者服务启动时,会创建一个属于消费者节点的临时节点,节点的路径为 /consumers/[group_id]/ids/[consumer_id],该节点的内容是该消费者订阅的Topic信息。每个消费者会对/consumers/[group_id]/ids节点注册Watcher监听器,一旦消费者的数量增加或减少就会触发消费者的负载均衡。例如下图的状态:消费者还会对/brokers/ids/[brokerid]节点进行监听,如果发现的Broker服务器列表发生变化,也会进行消费者的负载均衡消费者的offset在kafka的消费者API分为两种(1)High Level Api:由zookeeper维护消费者的offset (2) Low Level API,自己的代码实现对offset的维护。由于自己维护offset往往比较复杂,所以多数情况下都是使用High Level的APIoffset在zookeeper中的节点路径为/consumers/[group_id]/offsets/[topic]/[broker_id-part_id],该节点的值就是对应的offset&
版权声明:本文为博主原创,未经博主允许不得转载。q5725827 的BLOG
用户名:q5725827
访问量:7777
注册日期:
阅读量:5863
阅读量:12276
阅读量:380557
阅读量:1072813
51CTO推荐博文
APIProducer API此处只简介一个procedure的例子生产类是用来创建新消息的主题和可选的分区。如果使用Java你需要包括几个包和支持类:import kafka.javaapi.producer.Pimport kafka.producer.KeyedMimport kafka.producer.ProducerC&第一步首先定义producer如何找到集群,如何序列化消息和为消息选择适合的分区。下面吧这些定义在一个标准的JAVA& Properties类中Properties&props&=&new&Properties();
props.put("metadata.broker.list","broker1:9092,broker2:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("partitioner.class","example.producer.SimplePartitioner");
props.put("request.required.acks","1");
ProducerConfig&config&=&new&ProducerConfig(props);1.metadata.broker.list 定义了生产者如何找到一个或多个Broker去确定每个topic的Leader。这不需要你设置集群中全套的brokers但至少应包括两个,第一个经纪人不可用可以替换。不需要担心需要指出broker为主题的领袖(分区),生产者知道如何连接到代理请求元数据并连接到正确的broker。2.第二个属性“序列化类定义“。定义使用什么序列化程序传递消息。在我们的例子中,我们使用一个卡夫卡提供的简单的字符串编码器。请注意,encoder必须和下一步的keyedmessage使用相同的类型可以适当的定义"key.serializer.class"根据key改变序列化类。默认的是与"serializer.class"相同3.第三个属性partitioner.class 定义了决定topic中的分区发送规则。这个属性是可选的,但是对于你的特殊的分区实现是重要的。如果存在key将使用kafka默认的分组规则,如果key为null 则使用随机的分区发送策略。4.最后一个属性“request.required.acks”将设置kafka知否需要broker的回应。如果不设置可能将导致数据丢失。1.1此处可以设置为0 生产者不等待broker的回应。会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失)1.2此处可以设置为1生产者会收到leader的回应在leader写入之后。(在当前leader服务器为复制前失败可能会导致信息丢失)1.3此处可以设置为-1生产者会收到leader的回应在全部拷贝完成之后。之后可以定义生产者Producer&String, String& producer =new Producer&String, String&(config);此处泛型的第一个type是分区的key的类型。第二个是消息的类型。与上面Properties中定义的对应。现在定义messgaeRandom&rnd&=&new&Random();
long&runtime&=&new&Date().getTime();
String&ip&=&“192.168.2.”&+rnd.nextInt(255);
String&msg&=&runtime&+&“,,”+&此处模拟一个website的访问记录。之后想broker中写入信息.KeyedMessage&String, String& data =new KeyedMessage&String, String&("page_visits",ip, msg);producer.send(data);此处的“page_visits”是要写入的Topic。此处我们将IP设置为分区的key值。注意如果你没有设置键值,即使你定义了一个分区类,kafka也将使用随机发送.Full Code:import&java.util.*;
import&kafka.javaapi.producer.P
import&kafka.producer.KeyedM
import&kafka.producer.ProducerC
public&class&TestProducer&{
&&&public&static&void&main(String[]&args)&{
&&&&&&&long&events&=&Long.parseLong(args[0]);
&&&&&&&Random&rnd&=&new&Random();
&&&&&&&Properties&props&=&new&Properties();
&&&&&&&props.put("metadata.broker.list","broker1:9092,broker2:9092&");
&&&&&&&props.put("serializer.class","kafka.serializer.StringEncoder");
&&&&&&&props.put("partitioner.class","example.producer.SimplePartitioner");
&&&&&&&props.put("request.required.acks",&"1");
&&&&&&&ProducerConfig&config&=&new&ProducerConfig(props);
&&&&&&&Producer&String,&String&&producer&=&new&Producer&String,String&(config);
&&&&&&&for&(long&nEvents&=&0;&nEvents&&&&nEvents++)&{
&&&&&&&&&&&&&&&long&runtime&=&newDate().getTime();&
&&&&&&&&&&&&&&&String&ip&=&“192.168.2.”&+rnd.nextInt(255);
&&&&&&&&&&&&&&&String&msg&=&runtime&+“,,”&+&
&&&&&&&&&&&&&&&KeyedMessage&String,String&&data&=&new&KeyedMessage&String,&String&("page_visits",ip(key),&msg);
&&&&&&&&&&&&&&&producer.send(data);
&&&&&&&producer.close();
Partitioning&Code:&(分区函数)
import&kafka.producer.P
import&kafka.utils.VerifiableP
public&class&SimplePartitioner&implementsPartitioner&String&&{
&&&public&SimplePartitioner&(VerifiableProperties&props)&{
&&&public&int&partition(String&key,&int&a_numPartitions)&{
&&&&&&&int&partition&=&0;
&&&&&&&int&offset&=&key.lastIndexOf('.');
&&&&&&&if&(offset&&&0)&{
&&&&&&&&&&partition&=&Integer.parseInt(&key.substring(offset+1))&%a_numP
&&&&&&return&
}上面分区的作用是相同的IP将发送至相同的分区。但此时你的消费者需要知道如何去处理这样的规则消息。使用前需要建立topicbin/kafka-create-topic.sh&--topicpage_visits&--replica&3&--zookeeper&localhost:2181&--partition&5可以使用下面的工具验证你发送的消息bin/kafka-console-consumer.sh&--zookeeperlocalhost:2181&--topic&page_visits&--from-beginningHigh Level Consumer API顶层接口:class&Consumer&{
&&*&&创建java的消费者与kafka的connect
&&*&&@param&config&&至少需要提供consumer的groupId和zookeeper.connect.
public&statickafka.javaapi.consumer.ConsumerConnector&createJavaConsumerConnector(config:ConsumerConfig);
ConsumerConnector:
public&interfacekafka.javaapi.consumer.ConsumerConnector&{
&&*&&为每一个主题创建一个泛型的消息流
&&*&&@param&topicCountMap&&提供topic和Stream的一一对应
&&*&&@param&decoder&解析器&
&&*&&@return&Map&&&&topic&,List&#streams&&
&&*&&&&&&&&&&&&&&&&&&&此处的KafkaStream提供对内容的Iterable读取
&public&&K,V&&Map&String,&List&KafkaStream&K,V&&&
&&&&createMessageStreams(Map&String,Integer&&topicCountMap,&Decoder&K&&keyDecoder,&Decoder&V&valueDecoder);
&&*&&同上.
&public&Map&String,&List&KafkaStream&byte[],&byte[]&&&createMessageStreams(Map&String,&Integer&&topicCountMap);
&&&*&&&&&&&&&&建一个匹配的通配符主题的消息流的List
&&*&&@param&topicFilter一个topicfilter指定Consumer订阅的话题(
&&*&&包含了一个白名单和黑名单).
&&*&&@param&numStreams&messagestreams的数量
&&*&&@param&keyDecoder&message&key解析器
&&*&&@param&valueDecoder&a&message解析器
&&*&&@return&同上
&public&&K,V&&List&KafkaStream&K,V&&
&&&createMessageStreamsByFilter(TopicFilter&topicFilter,&int&numStreams,Decoder&K&&keyDecoder,&Decoder&V&&valueDecoder);
&………………………….(其余接口类似,是上述方法的重载方法)
&&*&&提交本连接器所连接的所有分区和主题
&public&void&commitOffsets();
&&*&&停止当前Consumer
&public&void&shutdown();
}&e.g &example1. 为什使用高级消费者(High Level Consumer)&&&&&&&& 有时消费者从卡夫卡读取消息不在乎处理消息的偏移量逻辑,只是消费消息内部的信息。高级消费者提供了消费信息的方法而屏蔽了大量的底层细节。&&&&&&&& 首先要知道的是,高级消费者从zookeeper的特殊分区存储最新偏离。这个偏移当kafka启动时准备完毕。这一般是指消费者群体(Consumer group)[此处的意思,kafka中的消息是发送到Consumer group中的任一个consumer上的,kafka保存的是整体的偏移。此处不知是否理解正确请大虾指点。]&&&&&&&& 请小心,对于kafka集群消费群体的名字是全局的,任何的“老”逻辑的消费者应该被关闭,然后运行新的代码。当一个新的进程拥有相同的消费者群的名字,卡夫卡将会增加进程的线程消费topic并且引发的“重新平衡(reblannce)”。在这个重新平衡中,卡夫卡将分配现有分区到所有可用线程,可能移动一个分区到另一个进程的消费分区。如果此时同时拥有旧的的新的代码逻辑,将会有一部分逻辑进入旧得Consumer而另一部分进入新的Consumer中的情况.2. Designing a High Level Consumer了解使用高层次消费者的第一件事是,它可以(而且应该!)是一个多线程的应用。线程围绕在你的主题分区的数量,有一些非常具体的规则:1.&如果你提供比在主题分区多的线程数量,一些线程将不会看到消息2.&如果你提供的分区比你拥有的线程多,线程将从多个分区接收数据3.&如果你每个线程上有多个分区,对于你以何种顺序收到消息是没有保证的。举个栗子,你可能从分区10上获取5条消息和分区11上的6条消息,然后你可能一直从10上获取消息,即使11上也拥有数据。4.添加更多的进程/线程将使卡夫卡重新平衡,可能改变一个分区到线程的分配。这里是一个简单的消费者栗子:package&com.test.
import&kafka.consumer.ConsumerI
import&kafka.consumer.KafkaS
public&class&ConsumerTest&implements&Runnable&{
&&&&privateKafkaStream&m_
&&&&private&intm_threadN
&&&&publicConsumerTest(KafkaStream&a_stream,&int&a_threadNumber)&{
&&&&&&&m_threadNumber&=&a_threadN
&&&&&&&&m_stream&=a_
&&&&public&void&run()&{
&&&&&&&ConsumerIterator&byte[],&byte[]&&it&=&m_stream.iterator();
&&&&&&&&while(it.hasNext())
System.out.println("Thread&"&+&m_threadNumber+&":&"&+&new&String(it.next().message()));
&&&&&&&System.out.println("Shutting&down&Thread:&"&+&m_threadNumber);
}在这里有趣的是,(it.hasnext())。这个代码将从卡夫卡读取直到你停下来。3. Config不像simpleconsumer高层消费者为你很多的提供需要bookkeeping(?)和错误处理。但是你要告诉卡夫卡这些信息。下面的方法定义了创建高级消费者基础配置:private&static&ConsumerConfigcreateConsumerConfig(String&a_zookeeper,&String&a_groupId)&{
&&&&&&&&Propertiesprops&=&new&Properties();
&&&&&&&props.put("zookeeper.connect",&a_zookeeper);
&&&&&&&&props.put("group.id",&a_groupId);
&&&&&&&props.put("zookeeper.session.timeout.ms",&"400");
&&&&&&&props.put("zookeeper.sync.time.ms",&"200");
&&&&&&&props.put("mit.interval.ms",&"1000");
&&&&&&&&return&newConsumerConfig(props);
&&&&}zookeeper.connect& 指定zookeeper集群中的一个实例,kafka利用zookeeper储存topic的分区偏移值。Groupid 消费者所属的Consumer Group(消费者群体)zookeeper.session.timeout.ms&zookeeper的超时处理<mit.interval.ms&& 属性自动提交的间隔。这将替代消息被消费后提交。如果发生错误,你将从新获得未更新的消息。4.使用线程池处理消息public&void&run(int&a_numThreads)&{
&&&Map&String,&Integer&&topicCountMap&=&new&HashMap&String,Integer&();
&&&topicCountMap.put(topic,&new&Integer(a_numThreads));
&&&Map&String,&List&KafkaStream&byte[],&byte[]&&&consumerMap&=&consumer.createMessageStreams(topicCountMap);
&&&List&KafkaStream&byte[],&byte[]&&&streams&=consumerMap.get(topic);
&&&//&now&launch&all&the&threads
&&&executor&=&Executors.newFixedThreadPool(a_numThreads);
&&&//&now&create&an&object&to&consume&the&messages
&&&int&threadNumber&=&0;
&&&for&(final&KafkaStream&stream&:&streams)&{
&&&&&&&executor.submit(new&ConsumerTest(stream,&threadNumber));
&&&&&&&threadNumber++;
}首先我们创建一个map,告诉kafka提供给哪个topic多少线程。consumer.createmessagestreams是我们如何把这个信息传递给卡夫卡。返回的是一个包含kafkastream 的以topic 为键list的map结合。(注意,这里我们只向卡夫卡注册一个话题,但我们可以为map中多添加一个元素的)最后,我们创建的线程池和通过一项新的consumertest对象,每个线程运转我们的业务逻辑。5.清理和异常处理Kafka在每次处理后不会立即更新zookeeper上的偏移值,她会休息上一段时间后提交。在这段时间内,你的消费者可能已经消费了一些消息,但并没有提交到zookeeper上。这样你可能会重复消费数据。同时一些时候,broker失败从新选取leader是也可能会导致重复消费消息。为了避免这种情况应该清理完成后再关闭,而不是直接使用kill命令。e.gtry&{
&&&Thread.sleep(10000);
}&catch&(InterruptedException&ie)&{
example.shutdown();full codepackage&com.test.
import&kafka.consumer.ConsumerC
import&kafka.consumer.KafkaS
importkafka.javaapi.consumer.ConsumerC
import&java.util.HashM
import&java.util.L
import&java.util.M
import&java.util.P
importjava.util.concurrent.ExecutorS
import&java.util.concurrent.E
public&class&ConsumerGroupExample&{
&&&private&final&ConsumerConnector&
&&&private&final&String&
&&&private&&ExecutorService&
&&&public&ConsumerGroupExample(String&a_zookeeper,&String&a_groupId,&Stringa_topic)&{
&&&&&&&consumer&=&kafka.consumer.Consumer.createJavaConsumerConnector(
&&&&&&&&&&&&&&&createConsumerConfig(a_zookeeper,&a_groupId));
&&&&&&&this.topic&=&a_
&&&public&void&shutdown()&{
&&&&&&&if&(consumer&!=&null)&consumer.shutdown();
&&&&&&&if&(executor&!=&null)&executor.shutdown();
&&&public&void&run(int&a_numThreads)&{
&&&&&&&Map&String,&Integer&&topicCountMap&=&new&HashMap&String,Integer&();
&&&&&&&topicCountMap.put(topic,&new&Integer(a_numThreads));
&&&&&&&Map&String,&List&KafkaStream&byte[],&byte[]&&&consumerMap&=&consumer.createMessageStreams(topicCountMap);
&&&&&&&List&KafkaStream&byte[],&byte[]&&&streams&=consumerMap.get(topic);
&&&&&&&//&now&launch&all&the&threads
&&&&&&&executor&=&Executors.newFixedThreadPool(a_numThreads);
&&&&&&&//&now&create&an&object&to&consume&the&messages
&&&&&&&int&threadNumber&=&0;
&&&&&&&for&(final&KafkaStream&stream&:&streams)&{
&&&&&&&&&&&executor.submit(new&ConsumerTest(stream,&threadNumber));
&&&&&&&&&&&threadNumber++;
&&&private&static&ConsumerConfig&createConsumerConfig(String&a_zookeeper,String&a_groupId)&{
&&&&&&&Properties&props&=&new&Properties();
&&&&&&&props.put("zookeeper.connect",&a_zookeeper);
&&&&&&&props.put("group.id",&a_groupId);
&&&&&&&props.put("zookeeper.session.timeout.ms",&"400");
&&&&&&&props.put("zookeeper.sync.time.ms",&"200");
&&&&&&&props.put("mit.interval.ms",&"1000");
&&&&&&&return&new&ConsumerConfig(props);
&&&public&static&void&main(String[]&args)&{
&&&&&&&String&zooKeeper&=&args[0];
&&&&&&&String&groupId&=&args[1];
&&&&&&&String&topic&=&args[2];
&&&&&&&int&threads&=&Integer.parseInt(args[3]);
&&&&&&&ConsumerGroupExample&example&=&new&ConsumerGroupExample(zooKeeper,groupId,&topic);
&&&&&&&example.run(threads);
&&&&&&&try&{
&&&&&&&&&&&Thread.sleep(10000);
&&&&&&&}&catch&(InterruptedException&ie)&{
&&&&&&&example.shutdown();
}此处的启动命令需提供1:2181&group3 &&myTopic& 41.1:2181 zookeeper 的端口和地址2.group3&& Consumer Group Name3.myTopic& consumer消费消息的message4.消费topic的线程数
了这篇文章
类别:┆阅读(0)┆评论(0)}

我要回帖

更多关于 kafka 删除topic 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信