调用链监控 pinpointt怎么监控flume

最近公司有个需求,就是用flume将一文件夹下的许多小文件分类,自定义sink端持久化到数据库,但是遇到一个问题,就是那些被flume监听小文件是从外部写进来的,会变化(写完以后才可以让flume读,否则报错),有没有什么办法将它过滤或者设置时间等小文件完成变动后再让flume读,大神们,肿么办
主题帖子积分
一、sink可以设置时间间隔,
引用如下内容
Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。详细可参考:
二、自定义Sink
你也可以看看Sink是否有像数据库一样的触发器,如果没有的话,只能自己开发个了,具体如何开发可以参考:
主题帖子积分
中级会员, 积分 226, 距离下一级还需 774 积分
中级会员, 积分 226, 距离下一级还需 774 积分
一、sink可以设置时间间隔,
引用如下内容
现在的问题就是source端,会抛异常,因为source端监控的文件在变化,而且我监控的不是log文件,就没了log的分割功能
主题帖子积分
现在的问题就是source端,会抛异常,因为source端监控的文件在变化,而且我监控的不是log文件,就没了log ...
你是采用不断追加的,还是什么方式,你参考一下这个帖子:
跟你的错误很类似:
主题帖子积分
中级会员, 积分 226, 距离下一级还需 774 积分
中级会员, 积分 226, 距离下一级还需 774 积分
你是采用不断追加的,还是什么方式,你参考一下这个帖子:
跟你的错误很类似:
flume 用spooldir的sour ...
监控的文件是不断追加的
主题帖子积分
你可以使用exec方式,spooldir经常出问题
主题帖子积分
中级会员, 积分 226, 距离下一级还需 774 积分
中级会员, 积分 226, 距离下一级还需 774 积分
你可以使用exec方式,spooldir经常出问题
我也想用exec,但是里面有很多小文件,不时有新文件读进来,exec处理后的文件还是原样子吗
主题帖子积分
这个就不太知道了,个人觉得你说的这个变与不变不要太重要吧。日志即使发生变化,你也可以通过自定义等其他的方式来保留。变的可能性很小。
更多内容,你可以参考下这个,我觉得还不错。
主题帖子积分
注册会员, 积分 122, 距离下一级还需 78 积分
注册会员, 积分 122, 距离下一级还需 78 积分
现在的问题就是source端,会抛异常,因为source端监控的文件在变化,而且我监控的不是log文件,就没了log ...
我现在也有这个需求,但是要监控log4j日志产生的日志,而已会一直变化,你是怎么监控这个目录的?
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered byhadoop大数据(14)
Flume的内置监控怎么整?这个问题有很多人问。目前了解到的信息是可以使用Cloudera Manager、Ganglia有图形的监控工具,以及从浏览器获取json串,或者自定义向其他监控系统汇报信息。那监控的信息是什么呢?就是各个组件的统计信息,比如成功接收的Event数量、成功发送的Event数量,处理的Transaction的数量等等。而且不同的组件有不同的Countor来做统计,目前直到1.5版本仍然只对三大组件:source、sink、channel进行统计分别是SourceCounter、SinkCounter、ChannelCounter,这三个计数器的统计项是固定的,就是你不能自己设置自己的统计项;另外还有ChannelProcessorCounter和SinkProcessorCounter,这两项目前没有设置统计项,所以是目前还是“摆设”。另外有些同学可能也发现了,有些内置的组件使用CounterGroup这个来统计信息,这个可以自己随意设置统计项,但是遗憾的是目前(1.5版本)这个可以自定义的计数器的信息还无法用在监控上,因为这只是一个单独的类,并没有继承MonitoredCounterGroup这个抽象类。有些内置组件使用的是CounterGroup,所以监控时会没有数据,不同的版本使用此CounterGroup的组件可能不同。下面我们重点介绍:SourceCounter、SinkCounter、ChannelCounter。
Flume-NG的所有统计信息、监控及相关的类都在org.apache.flume.instrumentation.http、org.apache.flume.instrumentation、org.apache.flume.instrumentation.util三个包下。
上面提到了MonitoredCounterGroup,这个类是用来跟踪内部的统计指标的,注册组件的MBean并跟踪和更新统计值。需要监控的组件都要继承这个类,这个类可以跟踪flume内部的所有组件,但是目前只实现了3个。其中比较重要的方法有以下几个:
(1)、构造方法MonitoredCounterGroup(Type type, String name, String... attrs),这个方法主要是设置组件的类型、名称;然后将所有的attrs(这是设定的各个统计项)加入Map&String, AtomicLong& counterMap,值设定为0;然后初始化计数器的开始时间和结束时间,都设为0.
(2)、start()方法,会先注册计数器,然后对所有统计项的统计值设为0;将开始时间设置为当前时间
(3)、register()方法,如果这个计数器还未注册,将这个计数器的MBean进行注册,就可以进行跟踪了
(4)、stop()方法,会设置结束时间为当前时间;输出各个统计项的信息。我们 Ctrl+C 结束进程时,最后显示的统计信息就是来自这里。
其它方法都是获取counterMap的中信息或者更新值等,比较简单。
接下来我们看看,三个组件中各种统计项及其含义吧:
一、SourceCounter,继承了MonitoredCounterGroup。主要统计项如下:
(1)&src.events.received&,表示source接受的event个数;
(2)&src.events.accepted&,表示source处理成功的event个数,和上面的区别就是上面虽然接受了可能没处理成功;
(3)&src.append.received&,表示调用append次数,在avrosource和thriftsource中调用;
(4)&src.append.accepted&,表示append处理成功次数;
(5)&src.append-batch.received&,表示appendBatch被调用的次数,在avrosource和thriftsource中调用;
(6)&src.append-batch.accepted&,表示appendBatch处理成功次数;
(7)&src.open-connection.count&,用在avrosource中表示打开连接的数量;
一般source调用都集中在前俩。
二、SinkCounter,继承了MonitoredCounterGroup
(1)&sink.connection.creation.count&,这个调用的地方颇多,都表示“链接”创建的数量,比如与HBase建立链接,与avrosource建立链接以及文件的打开等;
(2)&sink.connection.closed.count&,对应于上面的stop操作、destroyConnection、close文件操作等。
(3)&sink.connection.failed.count&,表示上面所表示“链接”时异常、失败的次数;
(4)&sink.batch.empty&,表示这个批次处理的event数量为0的情况;
(5)&sink.batch.underflow&,表示这个批次处理的event的数量介于0和配置的batchSize之间;
(6)&plete&,表示这个批次处理的event数量等于设定的batchSize;
(7)&sink.event.drain.attempt&,准备处理的event的个数;
(8)&sink.event.drain.sucess&,这个表示处理成功的event数量,与上面不同的是上面的是还未处理的。
三、ChannelCounter,继承了MonitoredCounterGroup
(1)&channel.current.size&,这个表示这个channel的当前容量;
(2)&channel.event.put.attempt&,一般指的是在channel的事务当中,source的put操作中记录尝试发送event的个数;
(3)&channel.event.take.attempt&,一般指的是在channel的事务中,sink的take操作记录尝试拿event的个数;
(4)&channel.event.put.success&,一般指的是在channel的事务中,put成功的event的数量;
(5)&channel.event.take.success&,一般指的是channel事务中,take成功的event的数量;
(6)&channel.capacity&,指的是channel的容量,在channel的start方法中设置。
上面这些统计项都是固定的,我们可以根据需要增加相应项的值,可以在监控中查看组件的变化情况,从而掌握flume进程的运行情况。比如可以查看channel的容量从而了解到source和sink的相对处理速度,还有可以看source或者sink每个批次处理成功与失败的次数,了解组件的运行状况等等。
当然有些同学可能在自定义自己的组件时,想统计一些自己的统计项,这些统计项在上面三大组件中是没有,怎么办?自己定制啊,上面说了必须要继承MonitoredCounterGroup这个抽象类,设定自己的统计项,然后将统计项设置成数组调用MonitoredCounterGroup的构造函数;然后在自定义的计数器中增加更新数值的方法。最后在自定义的组件中构造自定义的计数器,并启用它的start方法,剩下的就是在该更新统计项数值的地方更新就可以了。
还有一个重要的内容就是监控的实现!没错,内置的有两种HTTP方式(就是json串)和Ganglia,后者需要安装Ganglia,前者非常简单,只需要在Flume的启动命令中加上:-Dflume.monitoring.type=http -Dflume.monitoring.port=XXXX& ,最后的XXXX是你需要设置的端口!然后你就可以在浏览器上通过访问这个Flume所在节点的IP:XXXX/metrics,不断刷新就可以看到最新的组件统计信息。关于Ganglia的请读者自行组件Ganglia集群并餐卡用户指南来操作。
如果我想自己实现一个server向其他系统汇报信息,咋整?目前有至少两个方法:
一、就是上面的HTTP啊,你可以不断去获取json串,自己解析出来各个统计指标,然后剩下的就是你想怎么整就怎么整吧。
二、就是自己实现一个类似HTTP的server,必须实现org.apache.flume.instrumentation.MonitorService接口,这个接口只有俩方法:start和stop。这个接口继承自Configurable接口所以拥有可以读取配置文件的configure(configure(Context context))方法,来获取一些配置信息。
以HTTP为例(对应的类是org.apache.flume.instrumentation.http.HTTPMetricsServer),它的start方法启动了一个jetty作为web server,提供WEB服务。并实现了AbstractHandler的一个处理数据的类HTTPMetricsHandler,这个类的handle(String target, HttpServletRequest request, HttpServletResponse response,int dispatch)方法来设置一些WEB页面的格式以及通过JMXPollUtil.getAllMBeans()获取所有组件注册的MBean构成的Map&String,
Map&String, String&& metricsMap,遍历这个metricsMap将这个metricsMap转换成json输出到web页面。stop方法就是一些清理工作,这里是关闭jetty server。很简单吧,所以我们完全可以实现一个server,在start方法中启动一个线程每隔一秒或者自己定遍历这个metricsMap,写入mysql、HBase或者别的地方,你随便。。。
你可以在定义的组件中调用自己的计数器,然后将计数器、监控类、自定义组件(source、sink、channel)打包放到lib下,在启动命令后加-Dflume.monitoring.type=AAAAA -Dflume.monitoring.node=BBBB,就可以了。注意,Dflume.monitoring.type这个好似必须要设置的,就是你自己的监控类(这里是AAAAA),后面的可有可无都是一些参数,你可以自定义参数名,比如可以设置数据库服务器IP、端口等。
至此,这里介绍完了。这些都是从源码中看出来的,还未曾实现,供大伙借鉴。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:14801次
排名:千里之外
原创:13篇
转载:25篇
(1)(3)(9)(1)(2)(4)(1)(14)(7)Flume实时抽取监控目录数据
什么是flume
Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的。它是一个基于流数据的简单而灵活的架构。具有健壮的可靠性,容错性及故障转移和恢复机制。
flume原理简介
这是一个关于池子的故事。有一个池子,它一头进水,另一头出水,进水口可以配置各种管子,出水口也可以配置各种管子,可以有多个进水口、多个出水口。水术语称为Event,进水口术语称为Source、出水口术语成为Sink、池子术语成为Channel,Source+Channel+Sink,术语称为Agent。如果有需要,还可以把多个Agent连起来。
-source:采集数据,并发送给channel
-channel:管道,用于连接source和sink的
-sink:发送数据,用于采集channel中的数据
flume启动顺序
flume组件启动顺序:channels&&&sinks&&&sources,关闭顺序:sources&&&sinks&&&
tar -zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6/
2.修改配置文件
flume-env.sh:修改_home
将hdfs的配置文件放到conf目录
core hdfs两个文件拷贝到flume的conf目录下
3.拷贝四个jar包到flume的lib目录下
《《《《《《source-Hive的log,channel-内存,sink:终端》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.mand = tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
# define the sink
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
《《《《《《source-hive的log,channel-file,sink:终端》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.mand = tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/datas/flume-ch/check
a1.channels.c1.dataDirs = /opt/datas/flume-ch/data
# define the sink
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
《《《《《《source:exec,channel:mem,sink:HDFS》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.mand = tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# define the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/hdfs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
《《《《《《文件的大小》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.mand = tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# define the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/size
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.rollCount = 0
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
《《《《《《时间分区》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.mand = tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# define the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = hive-log
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
《《《《《《时间分区》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.mand = tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# define the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = tru
a1.sinks.k1.hdfs.filePrefix = hive-log
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
不设置a1.sinks.k1.hdfs.useLocalTimeStamp = true会报如下错误
《《《《《《监控文件夹》》》》》》
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/datas/flume-ch/spdir
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# define the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/spdir
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
这是轮询模式,不是中断模式。如果检测的目录下没有生成文件,hdfs目录也不会创建,当检测目录下有文件后,HDFS目录也会创建
解决所有文件都会上传的方法:查找配置项,进行配置
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/datas/flume-ch/spdir
a1.sources.s1.ignorePattern = ([^ ]*\.tmp$)
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# define the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/spdir
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
《《《《《《监控文件夹和文件》》》》》》
既要监控某一个目录,又要动态读取目录中文件的数据?
exec:动态读一个文件
spooling dir :动态读取文件夹
需要自动编译taildir如何在web页面监控flume_百度知道
如何在web页面监控flume
我有更好的答案
但这些数据如何归集,每时每刻也都在产生大量的数据。 几乎任何规模企业、提炼始终是一个困扰,谁就有可能掌握未来,谁掌握了足够的数据大数据时代,而其中的数据采集就是将来的流动资产积累
为您推荐:
其他类似问题
等待您来回答Flume(23)
可能你认为只要的自定义组件里使用XXXcounter,并调用相关方法就会在flume的监控页面上看到该组件的相关信息,那么你就错了!!!!
flume的http监控里有这样一句话
if (!obj.getObjectName().toString().startsWith(&org.apache.flume&)) {
}这个太狠了,你的组件package的name不是以org.apache.flume开头的,它就跳过你这个组件,不去获取相关的监控信息,所以在监控页面上你找不到的组件信息,你可能认为你的组件出问题了!!
根本原因就在这里,当然你也可以改源码,删掉这句话就行了!!
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:276532次
积分:5042
积分:5042
排名:第4704名
原创:204篇
转载:19篇
译文:12篇
评论:102条
(1)(1)(2)(3)(1)(1)(2)(1)(2)(2)(1)(1)(3)(1)(1)(4)(2)(1)(6)(24)(6)(15)(11)(5)(3)(9)(13)(4)(1)(3)(2)(2)(8)(13)(5)(2)(4)(5)(6)(4)(4)(6)(30)(14)}

我要回帖

更多关于 flume 监控目录 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信