请思考何里应该使用同步kafka producer 异步,何时应使用异步kafka producer 异步

Jafka学习之Producer入门 - 勤奋的asialee的博客 - ITeye技术网站
博客分类:
首先我们来看看producer的例子:
首先我们先看单元测试ProducerTest这个类里面的发送消息的代码:
public void testSend() {
Jafka jafka = createJafka();
Properties producerConfig = new Properties();
producerConfig.setProperty( "broker.list", "0:localhost:" +jafka.getPort());
producerConfig.setProperty("serializer.class" , StringEncoder.class .getName());
Producer&String, String& producer = new Producer&String, String&(new ProducerConfig(producerConfig));
for (int i = 0; i & 1000; i++) {
producer.send( new StringProducerData("demo" ).add("Hello jafka").add("/adyliu/jafka"));
producer.close();
////////////////////////////////////////////////
close(jafka);
这个里面其实感觉还是挺简单的,配置broker的地址,encoder的class类型,最后构建消息并发送。
我先稍微花点时间看一下边边角角的类,Producer的配置类,从这个里面可以看出来,它的父类呢是ZKConfig,里面持有一个同步的producer的config和一个异步的producer的config。
下来我们来看看和我们打交道的ProductData类。
ProducerData类相对来说还是很简单明了的,里面持有给哪个topic发送消息,它的key决定超那个partition发送,里面的data就是producer产生的消息
浏览: 712005 次
来自: 北京
感觉LogManager打开了所有的LogSegment(文件 ...
大神 你好 我再使用ueditor 进行编辑的时候发现他会在我 ...3.3 Producer 配置
Essential configuration properties for the producer include:
生产者基本配置属性包括:
metadata.broker.list
request.required.acks
producer.type
serializer.class
Description
metadata.broker.list
This is for bootstrapping and the producer will only use it
for getting metadata (topics, partitions and replicas). The socket
connections for sending the actual data will be established based on the
broker information returned in the metadata. The format is
host1:port1,host2:port2, and the list can be a subset of brokers or a
VIP pointing to a subset of brokers.
(服务于bootstrapping。producer仅用来获取metadata(topics,partitions,replicas)。发送实际数据的socket连接将基于返回的metadata数据信息而建立。格式是:
host1:port1,host2:port2
这个列表可以是brokers的子列表或者是一个指向brokers的VIP)
request.required.acks
This value controls when a produce request is considered
completed. Specifically, how many other brokers must have committed the
data to their log and acknowledged this to the leader? Typical values
0, which means that the producer never waits for an
acknowledgement from the broker (the same behavior as 0.7). This option
provides the lowest latency but the weakest durability guarantees (some
data will be lost when a server fails).
1, which means that the producer gets an
acknowledgement after the leader replica has received the data. This
option provides better durability as the client waits until the server
acknowledges the request as successful (only messages that were written
to the now-dead leader but not yet replicated will be lost).
-1, The producer gets an acknowledgement after
all in-sync replicas have received the data. This option provides the
greatest level of durability. However, it does not completely eliminate
the risk of message loss because the number of in sync replicas may, in
rare cases, shrink to 1. If you want to ensure that some minimum number
of replicas (typically a majority) receive a write, then you must set
the topic-level min.insync.replicas setting. Please read the Replication
section of the design documentation for a more in-depth discussion.
(此配置是表明当一次produce请求被认为完成时的确认值。特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括:
0: 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。
1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。
-1:producer会获得所有同步replicas都收到数据的确认。同时时延最大,然而,这种方式并没有完全消除丢失消息的风险,因为同步replicas的数量可能是1.如果你想确保某些replicas接收到数据,那么你应该在topic-level设置中选项min.insync.replicas设置一下。请阅读一下设计文档,可以获得更深入的讨论。)
request.timeout.ms
The amount of time the broker will wait trying to meet the
request.required.acks requirement before sending back an error to the
(broker尽力实现request.required.acks需求时的等待时间,否则会发送错误到客户端)
producer.type
This parameter specifies whether the messages are sent
asynchronously in a background thread. Valid values are (1) async for
asynchronous send and (2) sync for synchronous send. By setting the
producer to async we allow batching together of requests (which is great
for throughput) but open the possibility of a failure of the client
machine dropping unsent data.
(此选项置顶了消息是否在后台线程中异步发送。正确的值:
(1) &async: 异步发送
(2) &sync: 同步发送
通过将producer设置为异步,我们可以批量处理请求(有利于提高吞吐率)但是这也就造成了客户端机器丢掉未发送数据的可能性)
serializer.class
kafka.serializer.DefaultEncoder
The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
(消息的序列化类别。默认编码器输入一个字节byte[],然后返回相同的字节byte[])
key.serializer.class
The serializer class for keys (defaults to the same as for messages if nothing is given).
(关键字的序列化类。如果没给与这项,默认情况是和消息一致)
partitioner.class
kafka.producer.DefaultPartitioner
The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
(partitioner 类,用于在subtopics之间划分消息。默认partitioner基于key的hash表)
compression.codec
This parameter allows you to specify the compression codec
for all data generated by this producer. Valid values are "none", "gzip"
and "snappy".
(此项参数可以设置压缩数据的codec,可选codec为:“none”, “gzip”, “snappy”)
compressed.topics
This parameter allows you to set whether compression should
be turned on for particular topics. If the compression codec is anything
other than NoCompressionCodec, enable compression only for specified
topics if any. If the list of compressed topics is empty, then enable
the specified compression codec for all topics. If the compression codec
is NoCompressionCodec, compression is disabled for all topics
(此项参数可以设置某些特定的topics是否进行压缩。如果压缩codec是NoCompressCodec之外的codec,则对指定的topics数
据应用这些codec。如果压缩topics列表是空,则将特定的压缩codec应用于所有topics。如果压缩的codec是
NoCompressionCodec,压缩对所有topics军不可用。)
message.send.max.retries
This property will cause the producer to automatically retry a
failed send request. This property specifies the number of retries when
such failures occur. Note that setting a non-zero value here can lead
to duplicates in the case of network errors that cause a message to be
sent but the acknowledgement to be lost.
(此项参数将使producer自动重试失败的发送请求。此项参数将置顶重试的次数。注意:设定非0值将导致重复某些网络错误:引起一条发送并引起确认丢失)
retry.backoff.ms
Before each retry, the producer refreshes the metadata of
relevant topics to see if a new leader has been elected. Since leader
election takes a bit of time, this property specifies the amount of time
that the producer waits before refreshing the metadata.
(在每次重试之前,producer会更新相关topic的metadata,以此进行查看新的leader是否分配好了。因为leader的选择需要一点时间,此选项指定更新metadata之前producer需要等待的时间。)
topic.metadata.refresh.interval.ms
600 * 1000
The producer generally refreshes the topic metadata from
brokers when there is a failure (partition missing, leader not
available...). It will also poll regularly (default: every 10min so
600000ms). If you set this to a negative value, metadata will only get
refreshed on failure. If you set this to zero, the metadata will get
refreshed after each message sent (not recommended). Important note: the
refresh happen only AFTER the message is sent, so if the producer never
sends a message the metadata is never refreshed
(producer一般会在某些失败的情况下(partition
missing,leader不可用等)更新topic的metadata。他将会规律的循环。如果你设置为负值,metadata只有在失败的情况下才
更新。如果设置为0,metadata会在每次消息发送后就会更新(不建议这种选择,系统消耗太大)。重要提示: 更新是有在消息发送后才会发生,因此,如果producer从来不发送消息,则metadata从来也不会更新。)
queue.buffering.max.ms
Maximum time to buffer data when using async mode. For example
a setting of 100 will try to batch together 100ms of messages to send
at once. This will improve throughput but adds message delivery latency
due to the buffering.
(当应用async模式时,用户缓存数据的最大时间间隔。例如,设置为100时,将会批量处理100ms之内消息。这将改善吞吐率,但是会增加由于缓存产生的延迟。)
queue.buffering.max.messages
The maximum number of unsent messages that can be queued up
the producer when using async mode before either the producer must be
blocked or data must be dropped.
(当使用async模式时,在在producer必须被阻塞或者数据必须丢失之前,可以缓存到队列中的未发送的最大消息条数)
queue.enqueue.timeout.ms
The amount of time to block before dropping messages when
running in async mode and the buffer has reached
queue.buffering.max.messages. If set to 0 events will be enqueued
immediately or dropped if the queue is full (the producer send call will
never block). If set to -1 the producer will block indefinitely and
never willingly drop a send.
batch.num.messages
The number of messages to send in one batch when using async
mode. The producer will wait until either this number of messages are
ready to send or queue.buffer.max.ms is reached.
(使用async模式时,可以批量处理消息的最大条数。或者消息数目已到达这个上线或者是queue.buffer.max.ms到达,producer才会处理)
send.buffer.bytes
100 * 1024
Socket write buffer size
(socket 写缓存的大小)
The client id is a user-specified string sent in each request
to help trace calls. It should logically identify the application making
the request.
(这个client &id是用户特定的字符串,在每次请求中包含用来追踪调用,他应该逻辑上可以确认是那个应用发出了这个请求。)
More details about producer configuration can be found in the scala classkafka.producer.ProducerConfig.
更多细节需要查看scala类 kafka.producer.ProducerConfig
半兽人之家
您还未填写推送消息的邮箱,请填写您常用的邮箱地址![Apache Kafka]开发producer - 推酷
[Apache Kafka]开发producer
procedure就是产生消息并将消息发布至broker的应用。
producer连接至任意的活动节点并请求获取某个topic的partition的leader元数据。这样producer可以直接将信息发给该partition的lead broker。
出于效率考虑,producer可以分批发布消息,但是只能在异步模式下。异步模式下,producer可以配置 queue.time 或` batch.size 这两个参数其中一个来指定在一定数量或一定时间后批量发布消息。消息会在producer这一端积累,然后在一次请求中批量发布至broker。因此异步模式也带来了消息丢失的风险,当producer崩溃时,在内存中的积累的尚未发布的消息就丢失了。
对于异步模式的producer,回调函数可以用来注册捕捉错误的处理器。
Java producer API
Kafka提供了类 kafka.javaapi.producer.Producer ( class Producer&K, V& )用于向一个或多个topic创建消息,还可以制定消息的partition。K和V分别指定partiton key和消息的值的类型。
KeyedMessage
类 kafka.producer.KeyedMessage 的构造函数参数为topic名称、partition key和消息值:
class KeyedMessage[K,V](val topic: String, val key: K,
val message: V) 这里,K和V仍然分别是指定partiton key和消息的值的类型,topic始终是String类型的。
ProducerConfig
类 kafka.producer.ProducerConfig 封装了与broker建立连接所需要的参数,如borker list、partition类、消息序列化类、partiton key。
producer的API封装了同步模式下producer的实现,异步模式下producer基于 producer.type 。例如,异步模式的 kafka.producer.Producer 负责消息序列化和发送之前的数据缓存。在内部, kafka.producer.async.ProducerSendThread 的实例从队列中读出该批次的消息, kafka.producer.EventHandler 序列化并发送数据。配置 event.handler 这个参数还可以自定义处理器。
一个简单的Java producer
接下来,我们写一个类 SimpleProducer 来创建指定的topic对应的消息,并使用默认的partition。
1.引入以下类:
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
2.定义属性:
Properties props = new Properties();
props.put(&metadata.broker.list&, &localhost:9092, localhost:9093, localhost:9094&);
props.put(&serializer.class&, &kafka.serializer.StringEncoder&);
props.put(&request.required.acks&, &1&);
ProducerConfig config = new ProducerConfig(props);
Producer&String, String& producer = new Producer&String, String&(config);
看一下代码中提到的属性:
metadata.broker.list :该属性指定producer要连接的broker(格式为 [&node:port&, &node:port&] )。Kafka producer会自动为topic选择lead broker,并且在发布消息时连接到正确的broker。
serializer.class :该属性指定准备发送消息时对消息进行序列化的类。在本例中使用的是Kafka提供的字符串编码器。默认情况下key和消息的序列化类是一样的。也可以通过扩展 kafka.serializer.Encoder 来实现自定义的序列化类。设置参数 key.serializer.class 就可以使用自定义编码器。
request.required.acks :该属性指示broker在收到消息后向producer发送回执。1表示只要lead副本接收到消息就发送回执。
3.构造消息并发送:
String runtime
= new Date().toString();
String msg = &Message Publishing Time - & +
KeyedMessage&String, String& data = new KeyedMessage&String, String&(topic, msg);
producer.send(data);
完整代码如下:
package kafka.examples.
import java.util.D
import java.util.P
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
public class SimpleProducer {
private static Producer&String, String&
public SimpleProducer() {
Properties props = new Properties();
// Set the broker list for requesting metadata to find the lead broker
props.put(&metadata.broker.list&,
&192.168.146.132:8.146.132:8.146.132:9094&);
//This specifies the serializer class for keys
props.put(&serializer.class&, &kafka.serializer.StringEncoder&);
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put(&request.required.acks&, &1&);
ProducerConfig config = new ProducerConfig(props);
producer = new Producer&String, String&(config);
public static void main(String[] args) {
int argsCount = args.length;
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentException(
&Please provide topic name and Message count as arguments&);
// Topic name and the message count to be published is passed from the
// command line
String topic = (String) args[0];
String count = (String) args[1];
int messageCount = Integer.parseInt(count);
System.out.println(&Topic Name - & + topic);
System.out.println(&Message Count - & + messageCount);
SimpleProducer simpleProducer = new SimpleProducer();
simpleProducer.publishMessage(topic, messageCount);
private void publishMessage(String topic, int messageCount) {
for (int mCount = 0; mCount & messageC mCount++) {
String runtime = new Date().toString();
String msg = &Message Publishing Time - & +
System.out.println(msg);
// Creates a KeyedMessage instance
KeyedMessage&String, String& data =
new KeyedMessage&String, String&(topic, msg);
// Publish the message
producer.send(data);
// Close producer connection with broker.
producer.close();
在运行上面的代码之前,确保已经创建了名为 kafkatopic 的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafkatopic
添加环境变量 KAFKA_LIB 指向Kafka的lib文件夹路径,并将lib文件夹下的jar包添加到 classpath 。
编译代码:
javac -d . kafka/examples/producer/SimpleProducer.java
运行程序, SimpleProducer 接收两个参数,topic名称和消息数量:
java kafka.examples.producer.SimpleProducer kafkatopic 10
之后可以运行consumer接收消息了:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic kafkatopic
自定义partition的Java producer
上面的例子是一个非常简单的针对多broker集群的producer,没有明确指定消息的partition。接下来我们写一个带自定义消息partition的。例子的场景是,捕捉并发布从各个IP访问网站的日志消息。日志消息包含:网站被访问时的timestamp、网站的名称、访问网站的IP地址。
1.引用以下类
import java.util.D
import java.util.P
import java.util.R
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
2.定义属性
Properties props = new Properties();
props.put(&metadata.broker.list&, &localhost:9092, localhost:9093, localhost:9094&);
props.put(&serializer.class&, &kafka.serializer.StringEncoder&);
props.put(&partitioner.class&, &kafka.examples.producer.SimplePartitioner&);
props.put(&request.required.acks&, &1&);
ProducerConfig config = new ProducerConfig(props);
Producer&Integer, String& producer = new Producer&Integer, String&(config);
属性 partitioner.class 指定用于决定消息发送的topic内partition的类。如果为null,则使用key的哈希值。
3.实现分区类
编写一个自定义分区类 SimplePartitioner ,它是抽象类 Partitioner 的实现。
package kafka.examples.
import kafka.producer.P
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
* The method takes the key, which in this case is the IP address,
* It finds the last octet and does a modulo operation on the number
* of partitions defined within Kafka for the topic.
* @see kafka.producer.Partitioner#partition(java.lang.Object, int)
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String partitionKey = (String)
int offset = partitionKey.lastIndexOf('.');
if (offset & 0) {
partition = Integer.parseInt(partitionKey.substring(offset + 1))
4.构造消息并发送
完整代码如下:
package kafka.examples.
import java.util.D
import java.util.P
import java.util.R
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
public class CustomPartitionProducer {
private static Producer&String, String&
public CustomPartitionProducer() {
Properties props = new Properties();
// Set the broker list for requesting metadata to find the lead broker
props.put(&metadata.broker.list&,
&192.168.146.132:8.146.132:8.146.132:9094&);
// This specifies the serializer class for keys
props.put(&serializer.class&, &kafka.serializer.StringEncoder&);
// Defines the class to be used for determining the partition
// in the topic where the message needs to be sent.
props.put(&partitioner.class&, &kafka.examples.ch4.SimplePartitioner&);
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put(&request.required.acks&, &1&);
ProducerConfig config = new ProducerConfig(props);
producer = new Producer&String, String&(config);
public static void main(String[] args) {
int argsCount = args.length;
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentException(
&Please provide topic name and Message count as arguments&);
// Topic name and the message count to be published is passed from the
// command line
String topic = (String) args[0];
String count = (String) args[1];
int messageCount = Integer.parseInt(count);
System.out.println(&Topic Name - & + topic);
System.out.println(&Message Count - & + messageCount);
CustomPartitionProducer simpleProducer = new CustomPartitionProducer();
simpleProducer.publishMessage(topic, messageCount);
private void publishMessage(String topic, int messageCount) {
Random random = new Random();
for (int mCount = 0; mCount & messageC mCount++) {
String clientIP = &192.168.14.& + random.nextInt(255);
String accessTime = new Date().toString();
String message = accessTime + &,kafka.apache.org,& + clientIP;
System.out.println(message);
// Creates a KeyedMessage instance
KeyedMessage&String, String& data =
new KeyedMessage&String, String&(topic, clientIP, message);
// Publish the message
producer.send(data);
// Close producer connection with broker.
producer.close();
在运行上面的代码之前,确保已经创建了名为 website-hits 的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic website-hits
编译代码:
javac -d . kafka/examples/producer/SimplePartitioner.java
javac -d . kafka/examples/producer/CustomPartitionProducer.java
运行程序:
java kafka.examples.producer.CustomPartitionProducer website-hits 100
运行consumer接收消息:
bash bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic kafkatopic
producer属性
metadata.broker.list :producer使用该属性获取元数据(topic、partition、、replica),格式为 host1:port1,host2:port2 。
serializer.class :指定消息的序列化类。默认值为 kafka.serializer.DefaultEncoder ,。
producer.type :指定消息发送是同步模式还是异步模式。可选值为 async 和 sync 。默认值为 sync 。
request.required.acks :指定producer请求完成时broker是否向producer发送回执。默认值为0。0表示producer不等待broker的回执,这样可以降低延迟,但可靠性降低。1表示在lead副本接收到数据后producer将立即收到回执,这提高了可靠性,因为客户端会等待服务器端处理请求完成的回执。-1表示在所有同步的副本都收到数据后producer将收到回执,这提供了最佳的可靠性。
key.serializer.class :指定对key的序列化类。默认值为 ${serializer.class} 。
partitioner.class :指定在topic中对消息进行分区的类。默认值为 kafka.producer.DefaultPartitioner ,是基于key的哈希值。
compression.codec :指定producer压缩数据的格式,可选的值有 none 、 gzip 、 snappy 。默认值为 none 。
batch.num.messages :指定异步模式时批次发送消息的数量。默认值为200。producer会等到消息数量达到该值或者达到 queue.buffer.max.ms 后才会发送消息。
Learing Apache Kafka-Second Edition
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致}

我要回帖

更多关于 nodejs 异步转同步 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信