flumeflume logstash 对比v1appender 稳定吗

随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析。目前我们服务的用户包括微博,微盘,云存储,弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理约32亿条(2TB)日志。
简单介绍一下服务的技术架构:
这是一个再常见不过的架构了:
(1)Kafka:接收用户日志的消息队列
(2)Logstash:做日志解析,统一成json输出给Elasticsearch
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。
努力提供更好的服务
我这次分享的重点不是这种架构的优劣或为什么选择这样的架构,而是在如此的架构上如何更好地传递实时日志分析的价值。为用户做好服务也不是修改几个配置文件,调优几个程序运行参数就能搞定的。为了提供更好的服务,我们在下面三个方向做了努力:
一、提升服务质量
我们首先做了Elasticsearch优化,Hardware Level由于我们当时拿到机器没有选择余地,只开启了超线程;System Level的优化如关闭swap,调整max open files等;App Level的优化如Java运行环境版本的选择,ES_HEAP_SIZE的设置,修改bulk index的queue size等,另外还设置了默认的index template, 目的是更改默认的shard,replica数并将string改为not_analyzed, 开启doc_values,以应对elasticsearch进程OOM。详细的优化内容见&&。
随着用户数据的不断增长,index管理也成了大问题,我们需要基于大量不同的用户配置定期的create, optimize, close, delete, snapshot不同的index,在某个服务器上手工配置crontab已是不可能,而且cron是单点。于是我们开发了一个独立的 Elasticsearch Index管理系统,负责以上任务的调度及执行。这个管理系统背后使用的技术是Celery,一个用Python开发的任务队列及执行系统,提供了类似 crontab的定时任务配置语法,并且实现了分布式,可用性更高的架构。
最近的服务升级,我们为Elasticsearch安装了Hdfs Snapshot插件,可以定期将index 备份到Hdfs,这个功能目前主要用于备份Kibana的配置index,用以恢复用户查看或配置可视化界面时的错误操作。
监控报警方面,System Level的监控报警(如硬盘满,损坏,服务器宕机)直接使用了在新浪内部提供了多年服务的sinawatch;App Level(如Elasticsearch JVM Heap Usage过高,Kibana能否正常访问,Kafka topic的consumer offset lag), 我们开发了对应的监控报警脚本。User Level(如日志解析失败数量),主要通过elasticsearch python client执行&query&去统计或搜索。常见的报警是Logstash-filter-grok,logstash-filter-json解析日志失败会输出的json中添加_grokparserfailure,_jsonparsefailure,我们执行query判断解析错误的量。
要说明的是,Marvel是Elasticsearch很好的监控工具和插件,但是它们是商业软件,我们没有采用。Marvel是基于Kibana做的,里面对一些重要指标(如index bulk reject number)的展示很有价值。
二、增强易用性
增强服务的易用性就是给用户更好的用户体验,减少用户的抱怨。ELK性能优化是一方面,但它是远远不够的,我们遇到的实际情况是,用户在其他方面抱怨更多,如下:
(1)用户最先抱怨的是IP解析成地区、ISP信息一点都不准,完全没有参考意义
如对于CDN这种服务,我们解析用户IP不准,定位问题边缘节点错误,问题没法查,这是帮倒忙。原因:Logstash默认自带的IP库是国外 maxmind公司的免费版本,中国的信息尤其不准。解决方案:使用我浪较新较全的IP库生成能适配maxmind geoip2 api的二进制格式IP库(maxmindDB),再开发logstash-filter-geoip2来解析IP。实测不仅IP解析准确率与公司IP库 相同了,解析速度也提高了。
(2)然后我们与用户都发现日志接入流程复杂,沟通困难。
人做不到机器那样分毫不差,有啥说啥。接入用户日志的时候,例如常常因为用户对日志格式表达的不全面,模棱两可,导致日志解析失败,服务对接人多 次重写配置。从用户提需求到用户可以看到数据可视化效果或搜到日志,需要几个小时到几天。一来二去,用户和我们都烦了,只能求变。为此,我们正在逐步实现 用户数据接入的自动化,减少接入时间和沟通成本这个过程需要3个关键:A.用户配置日志格式的界面,尽可能简洁简单;B.根据用户配置自动生成 logstash config, index管理需要的配置;C.自动部署配置(logstash config等),打通日志流。
后来我们做了一个简单的用来协商日志格式的界面:
目前我们已完成了A的一部分:用户日志格式配置界面;B的全部:开发了自动生成logstash conf的 python api;C即将开始,并且考虑使用docker技术为我们提供一些便利。
(3)部分数据可视化需求得不到满足,Kibana配置难度大
我们起初采用官方Kibana v3, 用户提出的类似SQL中的多个group by,画百分比,求指定区间占比等常见需求无法满足。之后通过三斗大神(微博@argv)定制版的&&满足了一些用户需求。Kibana 4诞生后,代码几乎是对Kibana3的重写,做了大幅改进,通过&Elasticsearch Aggregation&的强大数据统计功能及灵活的配置从Kibana 3解放出来。近期我们将迁移到Kibana 4。
三、提供新功能
我们为Elasticsearch安装了国内medcl大神开发的ik中文分词插件&&。之前被分词为&中&和&国&的中国,现在终于可以被当做一个完整的词汇,否则搜索&中国&,&美国&也会出现。微盘的一些离线搜索需求使用了我们的服务,也用到了中文分词,Elasticsearch的搜索天赋满足了他们的需求,减少了他们的痛苦。
我们经历过的坑和坎儿:
(1)elasticsearch 进程JVM Heap High Usage( &90% )
很长一段时间,我们都在应对JVM Heap High Usage,他带了的问题是Old GC次数多,时间长,es节点频繁退出集群,整个集群几乎停止响应。现在我们的主要策略是开启doc_values;限制query执行时占用的JVM Heap size;analyzed string只允许做query, 不允许facets或者aggs;定期close 用户不需要的index。
(2) Elasticsearch Query DSL, Facets, Aggs学习困惑
有人为此开发了使用SQL执行ES Query的插件,一定程度上减轻了进入门槛。我们给出的学习他们的建议是观察Kibana的Request Body或试用Marvel的Senese插件,它有自动完成Query,Facets, Aggs的功能。另外最常用的query是&query string query&,最常用的aggs是&Terms&,&Date Histogram&,可以应付大部分需求。
(3)logstash不工作
非官方的问题插件,及使用logstash-filter-ruby时未考虑到的异常等,导致Logstash运行时工作线程(worker thread)异常退出,Logstash僵死。我们的建议是尽可能不要在config中使用logstash-filter-ruby,尽量使用官方插 件。不过我们也遇到过复杂的日志,写过250行+的config,用尽了ruby filter。当前未发现Logstash有好的成熟的监控方案,Logstash的内部状态也获取不到。我们目前通过间接的监控Kafka topic consumer是否落后或elasticsearch indexing rate来检验logstash的工作情况。
(4)Kibana没有用户的概念,不同用户的数据无法隔离
多个用户共享的Kibana Dashboard, 误操作或误删时常影响其他用户,保存的dashboard太多,找到特定的dashboard很困难。官方到目前为止,未在这方面做过改进。有很多非官方 的改进,我们也曾经用过三斗大神定制的Kibana3,也对Kibana index做了snapshot储存到Hdfs里面。
(5)与用户沟通成本高
与我们的用户协商日志格式,数据可视化配置时,由于人的不确定性容易造成多次来回确定和修改,效率低下。我们毕竟是提供日志分析服务的,不给用户 做日志运维,所以近期也在探索通过日志接入自动化、推荐用户提供给我们json格式数据,定期组织用户的Kibana培训来减少沟通成本。
问:logstash连es出现timeout的情况有没?如何解决的?
答:我们常见的是ES Jvm Heap Usage比较高的时候会timeout,如果是服务内存小换大内存。另外不要对analyzed的string做aggs,facets ,开启doc_values。
问:关于日志中异常报警的,有哪些方式?关键字过滤?
答:对于日志解析失败的情况,logstash 常见的是_grokparsefailuer,和_jsonparsefailure,数据写入es后,执行query查询这两个关键词的数量即可。对于 报警方案,watch是官方刚出的,其实比它早的实现方案,如Yelp的elastalert。
问:大数据分析平台(基于hdfs)跟kibana的展现会有很大区别吗?或者说最大的区别会在哪些方面?
答:你说的区别,我理解是hadoop与elasticsearch的区别,一个是离线分析,以job为单位,一个是实时搜索和统计,以 query为单位。这里有三个关键词:实时,搜索,统计。hadoop是离线的,es是实时的;es本质上是一个搜引擎,可以用来做全文检索等工 作,hadoop显然于此无关。统计是hadoop与es都能做的,我不了解hadoop有没有像Kibana这样的数据可视化组件。
问:你们的ES集群数据节点和查询节点做了分离吗?logstash是直接把数据写入查询节点还是数据节点?另外你们直接用的node模式还是transport模式呢?
答:(1)还没有做分离。(2)我们还在用http protocol模式
参考资料:
Python 多进程日志记录:
Better ELK, 新浪实时日志分析服务进化:
Kafka实战-实时日志统计流程:
基于fluem和kafka的实时日志收集系统:
日志客户端(Logstash,Fluentd, Logtail)横评:
Logstash and Log Monitoring With Nagios:
How can we monitor performance of Kafka, Logstash and elasticsearch? Is there any tool available for monitoring their performance?:
Send Nagios metrics to Graphite via Logstash:
报警到 Nagios:
Logstash and Log Monitoring With Nagios:
scribe、chukwa、kafka、flume日志系统对比:
/blog//fluentd-vs-logstash/
http://logz.io/blog/fluentd-logstash/
/articles/3228
/articles/7FzqeeI
http://blog.csdn.net/benpaobagzb/article/details/
阅读(...) 评论()Logstash,flume,sqoop比较 - Java技术客栈 - ITeye博客
博客分类:
Logstash,flume,sqoop比较
1.插件式组织方式,易于扩展和控制
2.数据源多样不仅限于日志文件,数据处理操作更丰富,可自定义(过滤,匹配过滤,转变,解析......)
3.可同时监控多个数据源(input插件多样),同时也可将处理过的数据同时有不同多种输出(如stdout到控制台,同时存入elasticsearch)
4.安装简单,使用简单,结构也简单,所有操作全在配置文件设定,运行调用配置文件即可
5.管道式的dataSource——input plugin——filter plugin——output plugin——dataDestination
6.有logstash web界面,可搜索日志
7.有一整套的EKL日志追踪技术栈,可收集处理(logstash),存储管理搜索(elasticsearch),图形显示分析(kibana)
8,做到更好的实时监控(插件设置时间间隔属性,对监控的数据源检查更新)
Flume (1.x
flume-ng)
1.分布式的可靠的可用的系统,高效的从不同数据源收集聚合迁移大量数据到一个集中的数据存储
2.安装部署比较logstash复杂
3.同样以配置文件为中心
提供了JavaAPI
4.是一个完整的基于插件的 有独立开发的第三方插件
5.三层架构:source
Flume使用基于事务的数据传递方式来保证事件传递的可靠性。Source和Sink被封装进一个事务。事件被存放在Channel中直到该事件被处理,Channel中的事件才会被移除。这是Flume提供的点到点的可靠机制。从多级流来看,前一个agent的sink和后一个agent的source同样有它们的事务来保障数据的可靠性。
6,一个agent可指定多个数据源(同一agent内多个source连接到同一个channel上)?
一个agent可将收集的数据输出到多个目的地(HDFS,JMS,agent.....)span-out
Sqoop2 (1.99X)
1. 用于结构化数据源()与半结构化()非结构化(HDFS)数据源之间相互转换,是为和关系型相互转换的工具(MYsql orcal postgres 等数据库与HDFS Hbase )利用的是Hadoop的mapreduce技术
2. 分为server和client两部分,server是与所有client连接的接入点,安装在Hadoop client;client 无需Hadoop 数量任意多
3. 访问方式多样化,可用REST API,API,WEB UI,CIL控制台进行
浏览: 102502 次
来自: 武汉日志系统之Flume采集加morphline解析 - zzm - ITeye博客
博客分类:
这段时间花了部分时间在处理消息总线跟日志的对接上。这里分享一下在日志采集和日志解析中遇到的一些问题和处理方案。
日志采集-flume
logstash VS flume
首先谈谈我们在日志采集器上的选型。由于我们选择采用ElasticSearch作为日志的存储与搜索引擎。而基于 ELK(ElasticSearch,Logstash,Kibana)的技术栈在日志系统方向又是如此流行,所以把Logstash列入考察对象也是顺 理成章,Logstash在几大主流的日志收集器里算是后起之秀,被Elastic收购之后更加成熟,社区也比较活跃。
Logstash的设计:input,filter,output。flume的设计source,channel,sink,当然flume也有interceptor。具体的设计就不多废话,大致上都是拆分,解耦,pipeline(管道)的思想。同时,它们都支持分布式扩展,比如Logstash既可以作为shipper也可作为indexer,flume可以多个agent组成分布式事件流。
我对flume的接触早于Logstash。最近调研Logstash的时候,对它强大的filter印象深刻,特别是grok。而之前flume阵营强调最多的是它的source,sink,channel对各种开源组件的扩展支持非常强大。
Logstash固然是一个不错的,但它采用JRuby语言(一种形似Ruby语法的JVM平台的语言)实现使得它的定制性不够灵活, 这是我放弃Logstash的主要原因。因为生态的原因,我确实需要Java技术栈提供的扩展性(这里主要目标是将消息总线作为日志采集的缓存队列),而 这正是flume的强项。但flume里很少有提及对日志的解析支持,即便有支持正则的interceptor,也只是很有限的查找、替换之类的。经过一 番调研发现其实flume提供了这样一个interceptor——morphline。它可以完成对日志的解析。
日志解析-morphline
morphline简介
morphline是由flume的母公司cloudera开源的一个ETL框架。它用于构建、改变基于Hadoop进行ETL(extract、 transfer、load)的流式处理程序。(值得一提的是flume是由cloudera捐献给Apache的,后来经过重构成了flume- ng)。morphline使得你在构建ETL Job不需要编码并且不需要大量的MapReduce技巧。
morphline是一个富配置文件可以很简单得定义一个转化链,用于从任何数据源消费任何类型的数据,处理数据然后加载结果到Hadoop组件中。它用简单的配置步骤代替了Java编程。
morphline是一个类库,可以嵌入任何java程序中。morphline是一个内存容器可以存储转化命令。这些命令以插件的形式被加载到 morphline中以执行任务,比如加载、解析、转化或者处理单条记录。一个记录是在内存中的名称-值对的数据结构。而且morphline是可扩展 的,可以集成已存在的功能和第三方系统。
这篇文章不是morphline的软文,所以更多介绍请移步cloudera的CDK官方文档。
这里有副图,形象地展示了morphline大致的处理模型:系统。利用storm stream做实时解析,利用mapreduce做离线分析,这种高度定制化的使用场景,几乎不需要flume的agent在客户端进行解析的能力,因此flume的morphline也就很少被提及。
但morphline还是不可多得的文本ETL利器,无论你是在采集的时候直接用morphline 做ETL还是在服务端做,flume+morphline加起来带来的灵活性也不输Logstash。
/kf/943.html
http://my.oschina.net/u/2311010/blog/523066?p=1
浏览: 1180649 次
来自: 南京
很不错,收藏啦推荐下rocketmq 事务消息:http:// ...
雪鞋了~~~~
赞!!!!!!!!!!!!!!!!!!!!!!!!赞!!!!! ...
不错。学习一下。谢谢
如果你知道的话,希望你指点一二。。。8731人阅读
Flume(8)
使用Flume Log4j Appender正确的姿势
我们使用Flume-ng的LoadBalancingLog4jAppender,将线上服务的日志实时传输到日志服务器,转交给告警系统和HDFS做存储。
FLume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机。
使用过程中的坑
问题1: Flume Log4j使用异步加载器,日志服务器宕机情况导致业务系统阻塞
在阅读了Flume的RPC源码以及LoadBalancingLog4jAppender的实现之后,发现问题原来在Log4j的异步加载器AsyncAppender。异步加载器的原理见
根本原因是、日志服务器宕机导致消费者消费能力不足,缓冲区满的情况下,AsyncAppender会阻塞程序。设置Blocking=false之后就可以了。
问题2:Flume Log4j失败重连策略异常
当其中一台日志服务器宕机,其他的日志服务器就会不停的接收到链接异常的日志。明显是重连的时间间隔太短。在LoadBalancingRpcClient中,
while (it.hasNext()) {
HostInfo host = it.next();
RpcClient client = getClient(host);
client.append(event);
eventSent = true;
} catch (Exception ex) {
rmFailure(host); //宕机情况标志该主机异常
LOGGER.warn(&Failed to send event to host & + host, ex);
Flume默认不启用back off,也就是说rmFailure(host)这行代码完全没用。简直坑爹。OrderSelector.java:
public void informFailure(T failedObject) {
//If there is no backoff this method is a no-op.
if (!shouldBackOff) {
//将该主机暂时移除可用主机列表
所以解决办法:配置max back off
问题3:Flume Log4j失败重连策略异常
问题体现在,设置了max back off,重连时间居然一直是2000ms,看了一下它的算法,指数退避算法。在OrderSelector.java的informFailure函数中。
public void informFailure(T failedObject) {
//If there is no backoff this method is a no-op.
if (!shouldBackOff) {
FailureState state = stateMap.get(failedObject);
long now = System.currentTimeMillis();
long delta = now - state.lastF
long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 && state.sequentialFails));
long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
if (allowableDiff & delta) {
if (state.sequentialFails & EXP_BACKOFF_COUNTER_LIMIT) {
state.sequentialFails++;
state.sequentialFails = 1;
state.lastFail =
//Depending on the number of sequential failures this component had, delay
//its restore time. Each time it fails, delay the restore by 1000 ms,
//until the maxTimeOut is reached.
state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 && state.sequentialFails));
最后生成的restoreTime即下一次进行重试的时间。我没有去设置avro connect time out 和request time out,默认都是20s,应该算是偏长了。根据他的算法,delta永远是大于40s,但是allowableDiff却一直是3s,4s.所以我直接改了判定条件,allowableDiff & delta,之后就正常。但是还存在一个问题,sequentialFails并不会在一段时间后reset.
问题4:Log4j异步加载器丢失日志数据
AsyncAppender默认缓冲区大小128,满了之后会丢失数据。调大缓冲区,avro connect time out 和request time out也得适当调一下
另外,由于我们的告警系统接收的告警日志必须是时间顺序的,所以我写了个FlumeFailoverAppender了
本博客已迁移至:
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:810709次
积分:5310
积分:5310
排名:第5037名
原创:81篇
转载:14篇
评论:96条
微博: /shicongyu
(3)(1)(3)(2)(4)(1)(1)(10)(16)(13)(1)(4)(2)(4)(4)(1)(1)(10)(13)(6)}

我要回帖

更多关于 flume logstash 区别 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信