storm0.8 kafka0.8 怎样kafka storm整合实例

storm,hbase和storm-kafka-0.8-plus兼容性问题 - jeremychen - 博客园
1 &org.slf4j.impl.StaticLoggerBinder.SINGLETON错误
确保slf4j-api-1.5.6.jar 和slf4j-log4j12-1.5.6.jar 版本一致
如果发布,pom中其他依赖的jar包排除上面两个
2 本地调试不行,在LocalCluster cluster = new LocalCluster();突然停掉;或者报如下错误:
544 [main] INFO .StormTridentKafkaTest - Simple Trident Kafka spout test for Kafka version 0.8 and Storm 0.9 (Version in pom.xml)6167 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] diedjava.lang.NoSuchFieldError: AuthFailed
在pom中加入版本,排除或者替换其他相关依赖库的版本
&dependency&&groupId&org.slf4j&/groupId&&artifactId&slf4j-api&/artifactId&&version&1.6.4&/version&&/dependency&
hbase,或storm-kafka-0.8-plus中的zookeeper版本冲突,
在pom中加入版本,排除或者替换其他相关依赖库的版本
&dependency&
&groupId&org.apache.hadoop&/groupId&
&artifactId&zookeeper&/artifactId&
&version&3.3.1&/version&
&/dependency&kafka与storm集成测试问题小结
kafka与storm集成测试问题小结
[摘要:1 拷贝kafka依附jar包到storm lib
[root@hdmaster libs]# cp kafka_2.10-0.8.2.1.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp scala-library-2.10.4.ja]
1 拷贝kafka依赖jar包到storm lib
[root@hdmaster libs]# cp kafka_2.10-0.8.2.1.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp scala-library-2.10.4.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp metrics-core-2.2.0.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp zkclient-0.3.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp log4j-1.2.16.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp slf4j-api-1.7.6.jar /opt/apache-storm-0.9.5/lib/
[root@hdmaster libs]# cp jopt-simple-3.2.jar /opt/apache-storm-0.9.5/lib/
2 执行storm jar命令
报错信息:
Exception in thread &main& java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
& & & & at java.lang.Class.getDeclaredMethods0(Native Method)
& & & & at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
& & & & at java.lang.Class.getMethod0(Class.java:2856)
& & & & at java.lang.Class.getMethod(Class.java:1668)
& & & & at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
& & & & at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
& & & & at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
& & & & at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
& & & & at java.security.AccessController.doPrivileged(Native Method)
& & & & at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
& & & & at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
& & & & at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
& & & & at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
server端没有storm-kafka相关jar包
从本地maven库找到拷过去
5372 [Thread-11-kafka-reader] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
& & & & at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at backtype.storm.daemon.executor$fn__6579$fn__6594.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
& & & & at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
& & & & at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
& & & & at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
& & & & at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_79]
& & & & at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_79]
& & & & at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79]
& & & & at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_79]
& & & & at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_79]
& & & & at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_79]
& & & & at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_79]
& & & & ... 5 common frames omitted
从本地maven库找到curator-client-2.7.1.jar curator-framework-2.7.1.jar拷过去
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
& & & & at org.apache.curator.ensemble.fixed.FixedEnsembleProvider.&init&(FixedEnsembleProvider.java:39) ~[curator-client-2.7.1.jar:na]
& & & & at org.apache.curator.framework.CuratorFrameworkFactory$Builder.connectString(CuratorFrameworkFactory.java:193) ~[curator-framework-2.7.1.jar:na]
& & & & at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:94) ~[curator-framework-2.7.1.jar:na]
& & & & at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at storm.kafka.ZkState.&init&(ZkState.java:61) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at backtype.storm.daemon.executor$fn__6579$fn__6594.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
& & & & at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
& & & & at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
& & & & at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassNotFoundException: mon.base.Preconditions
& & & & at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_79]
& & & & at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_79]
& & & & at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79]
& & & & at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_79]
& & & & at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_79]
& & & & at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_79]
& & & & at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_79]
需要guava-11.0.2.jar,从hadoop home下的share=common-lib找的
又报zookeeper错误,将zookeeper-3.4.6.jar放进去
又报下列错误:
5481 [Thread-13-kafka-reader] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-test-topic5/partitions
& & & & at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at storm.kafka.trident.ZkBrokerReader.&init&(ZkBrokerReader.java:42) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at backtype.storm.daemon.executor$fn__6579$fn__6594.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
& & & & at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
& & & & at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
& & & & at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-test-topic5/partitions
& & & & at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & ... 7 common frames omitted
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-test-topic5/partitions
& & & & at org.apache.zookeeper.KeeperException.create(KeeperException.java:111) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
& & & & at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
& & & & at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
& & & & at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) ~[curator-framework-2.7.1.jar:na]
& & & & at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203) ~[curator-framework-2.7.1.jar:na]
& & & & at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[curator-client-2.7.1.jar:na]
& & & & at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:200) ~[curator-framework-2.7.1.jar:na]
& & & & at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) ~[curator-framework-2.7.1.jar:na]
& & & & at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) ~[curator-framework-2.7.1.jar:na]
& & & & at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:91) ~[storm-kafka-0.9.5.jar:0.9.5]
& & & & ... 8 common frames omitted
原因:KeeperErrorCode = NoNode for /brokers
配置kafka时,如果使用zookeeper create /kafka创建了节点,kafka与storm集成时new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然会报java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。
storm-kafka插件默认kafka的 zk_path如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;
改一下代码:
&BrokerHosts brokerHosts = new ZkHosts(zks,&/kafka/brokers&); //或者把/kafka写到zks字符串中,测试过是一样的效果
&最后以storm jar stormnew-1.0-SNAPSHOT.jar MyKafkaTopology hdmaster成功提交
&看worker日志,有报错信息
&Caused by: java.lang.ClassNotFoundException: storm.kafka.KafkaSpout
&找了下lib包里看到storm-kafka jar里就有这个类,可能因为是后来拷贝进去的没被识别,于是重启了nimbus和supervisor
&然后新错误:
&java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils at kafka.consumer.SimpleConsumer$$anonfun$disconnect$1.apply(SimpleConsumer.scala:49) at kafka.consumer.SimpleConsumer$$anonfun$disconnect$1.apply(SimpleConsumer.scala:49) at kafka.utils.Logging$class.debug(Logging.scala:52)
at kafka.consumer.SimpleConsumer.debug(SimpleConsumer.scala:30) at kafka.consumer.SimpleConsumer.disconnect(SimpleConsumer.scala:49) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:82) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67)
at storm.kafka.PartitionManager.&init&(PartitionManager.java:83) at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135)
at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565) at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
&storm程序对应java类里的写在各个节点的worker.log中
感谢关注 Ithao123精品文库频道,是专门为互联网人打造的学习交流平台,全面满足互联网人工作与学习需求,更多互联网资讯尽在 IThao123!
Laravel是一套简洁、优雅的PHP Web开发框架(PHP Web Framework)。它可以让你从面条一样杂乱的代码中解脱出来;它可以帮你构建一个完美的网络APP,而且每行代码都可以简洁、富于表达力。
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。
产品设计是互联网产品经理的核心能力,一个好的产品经理一定在产品设计方面有扎实的功底,本专题将从互联网产品设计的几个方面谈谈产品设计
随着国内互联网的发展,产品经理岗位需求大幅增加,在国内,从事产品工作的大部分岗位为产品经理,其实现实中,很多从事产品工作的岗位是不能称为产品经理,主要原因是对产品经理的职责不明确,那产品经理的职责有哪些,本专题将详细介绍产品经理的主要职责
IThao123周刊当前位置: &
5,895 次阅读 -
数据流向图
(是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱)
实时日志分析系统架构简介
系统主要分为四部分:
1).数据采集
负责从各节点上实时采集数据,选用cloudera的flume来实现
2).数据接入
由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka
3).流式计算
对采集到的数据进行实时分析,选用apache的storm
4).数据输出
对分析后的结果持久化,暂定用mysql
详细介绍各个组件及安装配置:
操作系统:centos6.4
Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
下图为flume典型的体系结构:
Flume数据源以及输出方式:
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。
Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。在我们系统中由kafka来接收。
Flume版本:1.4.0
Flume下载及文档:
Flume安装:
$tar zxvf apache-flume-1.4.0-bin.tar.gz /usr/local
Flume启动命令:
$bin/flume-ng agent –conf conf –conf-file conf/flume-conf.properties –name producer -Dflume.root.logger=INFO,console
注意事项:需要更改conf目录下的配置文件,并且添加jar包到lib目录下。
Kafka是一个消息中间件,它的特点是:
1、关注大吞吐量,而不是别的特性
2、针对实时性场景
3、关于消息被处理的状态是在consumer端维护,而不是由kafka server端维护。
4、分布式,producer、broker和consumer都分布于多台机器上。
下图为kafka的架构图:
Kafka版本:0.8.0
Kafka下载及文档:http://kafka.apache.org/
Kafka安装:
& tar xzf kafka-&VERSION&.tgz
& cd kafka-&VERSION&
& ./sbt update
& ./sbt package
& ./sbt assembly-package-dependency Kafka
启动及测试命令:
(1) start server
& bin/zookeeper-server-start.sh config/zookeeper.properties
& bin/kafka-server-start.sh config/server.properties
(2)Create a topic
& bin/kafka-create-topic.sh –zookeeper localhost:2181 –replica 1 –partition 1 –topic test
& bin/kafka-list-topic.sh –zookeeper localhost:2181
(3)Send some messages
& bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
(4)Start a consumer
& bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
Storm是一个分布式的、高容错的实时计算系统。
Storm架构图:
storm工作任务topology:
Storm 版本:0.9.0
Storm 下载:
Storm安装:
第一步,安装Python2.7.2
# wget http://www.python.org/ftp/python/2.7.2/Python-2.7.2.tgz
# tar zxvf Python-2.7.2.tgz
# cd Python-2.7.2
# ./configure
# make install
# vi /etc/ld.so.conf
第二步,安装zookeeper(kafka自带zookeeper,如果选用kafka的,该步可省略)
#wget http://ftp.meisei-u.ac.jp/mirror/apache/dist//zookeeper/zookeeper-3.3.3/zoo keeper-3.3.3.tar.gz
# tar zxf zookeeper-3.3.3.tar.gz
# ln -s /usr/local/zookeeper-3.3.3/ /usr/local/zookeeper
# vi ~./bashrc (设置ZOOKEEPER_HOME和ZOOKEEPER_HOME/bin)
第三步,安装JAVA
jdk-7u45-linux-x64.tar.gz
/usr/local
如果使用storm0.9以下版本需要安装zeromq及jzmq。
第四步,安装zeromq以及jzmq
jzmq的安装貌似是依赖zeromq的,所以应该先装zeromq,再装jzmq。
1)安装zeromq(非必须):
# tar zxf zeromq-2.1.7.tar.gz
# cd zeromq-2.1.7
# ./configure
# make install
# sudo ldconfig (更新LD_LIBRARY_PATH)
缺少c++环境:yum install gcc-c++
之后遇到的问题是:Error:cannot link with -luuid, install uuid-dev
这是因为没有安装uuid相关的package。
解决方法是:# yum install uuid*
# yum install e2fsprogs*
# yum install libuuid*
2)安装jzmq(非必须)
# yum install git
# git clone git:///nathanmarz/jzmq.git
# ./autogen.sh
# ./configure
# make install
然后,jzmq就装好了,这里有个网站上参考到的问题没有遇见,遇见的童鞋可以参考下。在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。
如果安装的是storm0.9及以上版本不需要安装zeromq和jzmq,但是需要修改storm.yaml来指定消息传输为netty:
storm.local.dir: “/tmp/storm/data”
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
storm.messaging.netty.min_wait_ms: 100
第五步,安装storm
$unzip storm-0.9.0-wip16.zip
备注:单机版不需要修改配置文件,分布式在修改配置文件时要注意:冒号后必须加空格。
测试storm是否安装成功:
1. 下载strom starter的代码 git clone
2. 使用mvn -f m2-pom.xml package 进行编译
如果没有安装过maven,参见如下步骤安装:
1.从maven的官网下载
tar zxvf apache-maven-3.1.1-bin.tar.gz /usr/local
配置maven环境变量
export MAVEN_HOME=/usr/local/maven
export PATH=$PATH:$MAVEN_HOME/bin
验证maven是否安装成功:mvn -v
修改Storm-Starter的pom文件m2-pom.xml ,修改dependency中twitter4j-core 和 twitter4j-stream两个包的依赖版本,如下:
org.twitter4j
twitter4j-core
org.twitter4j
twitter4j-stream
编译完后生成target文件夹
启动zookeeper
zkServer.sh start
启动nimbus supervisor ui
storm nimbus
storm supervisor
jps查看启动状态
进入target目录执行:
storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology wordcountTop
然后查看http://localhost:8080
注释:单机版 不用修改storm.yaml
kafka和storm整合
1.下载kafka-storm0.8插件:
2.该项目下载下来需要调试下,找到依赖jar包。然后重新打包,作为我们的storm项目的jar包。
3.将该jar包及kafka_2.9.2-0.8.0-beta1.jar
metrics-core-2.2.0.jar
scala-library-2.9.2.jar (这三个jar包在kafka-storm-0.8-plus项目依赖中能找到)
备注:如果开发的项目需要其他jar,记得也要放进storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下
flume和kafka整合
1.下载flume-kafka-plus:
2.提取插件中的flume-conf.properties文件
修改该文件:#source section
producer.sources.s.type = exec
producer.mand = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c
修改所有topic的值改为test
将改后的配置文件放进flume/conf目录下
在该项目中提取以下jar包放入环境中flume的lib下:
以上为单机版的flume+kafka+storm的配置安装
flume+storm插件
/xiaochawan/edw-Storm-Flume-Connectors
安装好storm,flume,kafka之后开始项目部署启动(在部署启动之前最好按照安装文档进行storm kafka flume各个组件测试)。
将编写好的storm项目打成jar包放入服务器上,假如放在/usr/local/project/storm.xx.jar
注:关于storm项目的编写见安装文档中的 kafka和storm整合 。
启动zookeeper(这里可以启动kafka自带的zookeeper或者启动单独安装的kafka,以下以kafka自带为例)
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
cd /usr/local/kafka
& bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server.properties
& bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
注:因为kafka消息的offset是由zookeeper记录管理的,所以在此需指定zookeeper的ip,replica 表示该主题的消息被复制几份,partition 表示每份主题被分割成几部分。test表示主题名称。
注:因为kafka消息的offset是由zookeeper记录管理的,所以在此需指定zookeeper的ip,replica 表示该主题的消息被复制几份,partition 表示每份主题被分割成几部分。test表示主题名称。
& storm nimbus
> storm nimbus
& storm supervisor
> storm supervisor
& storm ui
> storm ui
cd /usr/local/project/
cd /usr/local/project/
& storm jar storm.xx.jar storm.testTopology test
> storm jar storm.xx.jar storm.testTopology test
注:storm.xx.jar 为我们编写好的storm项目jar包,第一步完成的工作。 storm.testTopology 为storm项目中main方法所在的类路径。test为此次topology的名字。
注:storm.xx.jar 为我们编写好的storm项目jar包,第一步完成的工作。 storm.testTopology 为storm项目中main方法所在的类路径。test为此次topology的名字。
cd /usr/local/flume
cd /usr/local/flume
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer
注:flume.conf.properties为我们自定义的flume配置文件,flume安装好后是没有此文件的,需要我们自己编写,编写方式见flume安装的文章。
注:flume.conf.properties为我们自定义的flume配置文件,flume安装好后是没有此文件的,需要我们自己编写,编写方式见flume安装的文章。
至此需要启动的程序已经全部启动,storm项目已经开始运行,可以打开storm ui 观察运行是否正常。
至此需要启动的程序已经全部启动,storm项目已经开始运行,可以打开storm ui 观察运行是否正常。
http://localhost:8080
http://localhost:8080
注:此处ip为storm nimbus所在机器Ip 端口可在storm配置文件 storm/conf/storm.yaml中修改
注:此处ip为storm nimbus所在机器Ip 端口可在storm配置文件 storm/conf/storm.yaml中修改
注:转载文章均来自于公开网络,仅供学习使用,不会用于任何商业用途,如果侵犯到原作者的权益,请您与我们联系删除或者授权事宜,联系邮箱:contact@dataunion.org。转载数盟网站文章请注明原文章作者,否则产生的任何版权纠纷与数盟无关。
相关文章!
期待你一针见血的评论,Come on!
不用想啦,马上 发表自已的想法.
做最棒的数据科学社区
扫描二维码,加微信公众号
联系我们:}

我要回帖

更多关于 kafka整合storm 例子 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信