如何用Flume实现实时flume日志收集实例系统

您所在位置: &
&nbsp&&nbsp&nbsp&&nbsp
基于Flume日志收集统计系统.pdf58页
本文档一共被下载:
次 ,您可全文免费在线阅读后下载本文档。
文档加载中...广告还剩秒
需要金币:200 &&
你可能关注的文档:
··········
··········
摘要 摘要 随着计算机技术和互联网技术的快速发展,网站的规模以及数据正以指
数级形式飞速增长,电子商务网站每天需要处理的日志信息多达几terabyte。
然而,作为致力于网络的企业而言,web日志正是他们一笔宝贵的财富,企
业可以从日志信息中统计出在线产品的客户数量,以及客户喜好,这样便可
以更好为公司了解自己产品的不足,有利于制定更有效的方针和政策,进而
提高公司的效益。因此,如何应对海量的web日志进行收集统计,已经成为
国内外企业都特别关注的一个课题。 在分布式计算如此火热的背景下,本文针对海量web日志收集统计问题设 and
计开发了一套数据收集统计系统DCSS DataCollectionStatistics Client来发送日志的,然后通过thrift
System 。系统是通过日志发送端Flume Server,Flume
协议连接FlumeJ服务器,将日志文件发送到日志收集模块Flume DistributedFile
布式文件系统HDFS Hadoop System ,最后通过分布编程模型
MapReduce对web日志进行必要的统计,根据日志所提供的信息统计出某独立
用户在前端应用的某个页面操作的次数,统计用户行为上的不同表现,配置
满足用户需求的web信息的内容和结构,提升软件产品的综合竞争力。 本论文的主要工作如下: 1 实现java编写的日志发送模块FlumeClient,并将它封装成jar包, 前端应用通过调用封装好的iar包来实现日志的发送 2 通过thrift协议完成日志发送模块FlumeClient与日志收集模块 FlumeServer之间的连接 3 实现日志收集模块FlumeServer对日志数据的收集 4 通过HDFS,MapReduce来实现数据的存储和统计 5 实现FlumeClient模块的部分单元测试,以及系统的集成结果测试
关键词:网络日志:Flume;分布式系统 Abstract Abstract Withthe of andIntemet r
正在加载中,请稍后...当前位置: &
6,117 次阅读 -
数据流向图
(是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱)
实时日志分析系统架构简介
系统主要分为四部分:
1).数据采集
负责从各节点上实时采集数据,选用cloudera的flume来实现
2).数据接入
由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka
3).流式计算
对采集到的数据进行实时分析,选用apache的storm
4).数据输出
对分析后的结果持久化,暂定用mysql
详细介绍各个组件及安装配置:
操作系统:centos6.4
Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
下图为flume典型的体系结构:
Flume数据源以及输出方式:
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。
Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。在我们系统中由kafka来接收。
Flume版本:1.4.0
Flume下载及文档:
Flume安装:
$tar zxvf apache-flume-1.4.0-bin.tar.gz /usr/local
Flume启动命令:
$bin/flume-ng agent –conf conf –conf-file conf/flume-conf.properties –name producer -Dflume.root.logger=INFO,console
注意事项:需要更改conf目录下的配置文件,并且添加jar包到lib目录下。
Kafka是一个消息中间件,它的特点是:
1、关注大吞吐量,而不是别的特性
2、针对实时性场景
3、关于消息被处理的状态是在consumer端维护,而不是由kafka server端维护。
4、分布式,producer、broker和consumer都分布于多台机器上。
下图为kafka的架构图:
Kafka版本:0.8.0
Kafka下载及文档:http://kafka.apache.org/
Kafka安装:
& tar xzf kafka-&VERSION&.tgz
& cd kafka-&VERSION&
& ./sbt update
& ./sbt package
& ./sbt assembly-package-dependency Kafka
启动及测试命令:
(1) start server
& bin/zookeeper-server-start.sh config/zookeeper.properties
& bin/kafka-server-start.sh config/server.properties
(2)Create a topic
& bin/kafka-create-topic.sh –zookeeper localhost:2181 –replica 1 –partition 1 –topic test
& bin/kafka-list-topic.sh –zookeeper localhost:2181
(3)Send some messages
& bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
(4)Start a consumer
& bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
Storm是一个分布式的、高容错的实时计算系统。
Storm架构图:
storm工作任务topology:
Storm 版本:0.9.0
Storm 下载:
Storm安装:
第一步,安装Python2.7.2
# wget http://www.python.org/ftp/python/2.7.2/Python-2.7.2.tgz
# tar zxvf Python-2.7.2.tgz
# cd Python-2.7.2
# ./configure
# make install
# vi /etc/ld.so.conf
第二步,安装zookeeper(kafka自带zookeeper,如果选用kafka的,该步可省略)
#wget http://ftp.meisei-u.ac.jp/mirror/apache/dist//zookeeper/zookeeper-3.3.3/zoo keeper-3.3.3.tar.gz
# tar zxf zookeeper-3.3.3.tar.gz
# ln -s /usr/local/zookeeper-3.3.3/ /usr/local/zookeeper
# vi ~./bashrc (设置ZOOKEEPER_HOME和ZOOKEEPER_HOME/bin)
第三步,安装JAVA
jdk-7u45-linux-x64.tar.gz
/usr/local
如果使用storm0.9以下版本需要安装zeromq及jzmq。
第四步,安装zeromq以及jzmq
jzmq的安装貌似是依赖zeromq的,所以应该先装zeromq,再装jzmq。
1)安装zeromq(非必须):
# tar zxf zeromq-2.1.7.tar.gz
# cd zeromq-2.1.7
# ./configure
# make install
# sudo ldconfig (更新LD_LIBRARY_PATH)
缺少c++环境:yum install gcc-c++
之后遇到的问题是:Error:cannot link with -luuid, install uuid-dev
这是因为没有安装uuid相关的package。
解决方法是:# yum install uuid*
# yum install e2fsprogs*
# yum install libuuid*
2)安装jzmq(非必须)
# yum install git
# git clone git:///nathanmarz/jzmq.git
# ./autogen.sh
# ./configure
# make install
然后,jzmq就装好了,这里有个网站上参考到的问题没有遇见,遇见的童鞋可以参考下。在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。
如果安装的是storm0.9及以上版本不需要安装zeromq和jzmq,但是需要修改storm.yaml来指定消息传输为netty:
storm.local.dir: “/tmp/storm/data”
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
storm.messaging.netty.min_wait_ms: 100
第五步,安装storm
$unzip storm-0.9.0-wip16.zip
备注:单机版不需要修改配置文件,分布式在修改配置文件时要注意:冒号后必须加空格。
测试storm是否安装成功:
1. 下载strom starter的代码 git clone
2. 使用mvn -f m2-pom.xml package 进行编译
如果没有安装过maven,参见如下步骤安装:
1.从maven的官网下载
tar zxvf apache-maven-3.1.1-bin.tar.gz /usr/local
配置maven环境变量
export MAVEN_HOME=/usr/local/maven
export PATH=$PATH:$MAVEN_HOME/bin
验证maven是否安装成功:mvn -v
修改Storm-Starter的pom文件m2-pom.xml ,修改dependency中twitter4j-core 和 twitter4j-stream两个包的依赖版本,如下:
org.twitter4j
twitter4j-core
org.twitter4j
twitter4j-stream
编译完后生成target文件夹
启动zookeeper
zkServer.sh start
启动nimbus supervisor ui
storm nimbus
storm supervisor
jps查看启动状态
进入target目录执行:
storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology wordcountTop
然后查看http://localhost:8080
注释:单机版 不用修改storm.yaml
kafka和storm整合
1.下载kafka-storm0.8插件:
2.该项目下载下来需要调试下,找到依赖jar包。然后重新打包,作为我们的storm项目的jar包。
3.将该jar包及kafka_2.9.2-0.8.0-beta1.jar
metrics-core-2.2.0.jar
scala-library-2.9.2.jar (这三个jar包在kafka-storm-0.8-plus项目依赖中能找到)
备注:如果开发的项目需要其他jar,记得也要放进storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下
flume和kafka整合
1.下载flume-kafka-plus:
2.提取插件中的flume-conf.properties文件
修改该文件:#source section
producer.sources.s.type = exec
producer.mand = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c
修改所有topic的值改为test
将改后的配置文件放进flume/conf目录下
在该项目中提取以下jar包放入环境中flume的lib下:
以上为单机版的flume+kafka+storm的配置安装
flume+storm插件
/xiaochawan/edw-Storm-Flume-Connectors
安装好storm,flume,kafka之后开始项目部署启动(在部署启动之前最好按照安装文档进行storm kafka flume各个组件测试)。
将编写好的storm项目打成jar包放入服务器上,假如放在/usr/local/project/storm.xx.jar
注:关于storm项目的编写见安装文档中的 kafka和storm整合 。
启动zookeeper(这里可以启动kafka自带的zookeeper或者启动单独安装的kafka,以下以kafka自带为例)
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
cd /usr/local/kafka
& bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server.properties
& bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
注:因为kafka消息的offset是由zookeeper记录管理的,所以在此需指定zookeeper的ip,replica 表示该主题的消息被复制几份,partition 表示每份主题被分割成几部分。test表示主题名称。
注:因为kafka消息的offset是由zookeeper记录管理的,所以在此需指定zookeeper的ip,replica 表示该主题的消息被复制几份,partition 表示每份主题被分割成几部分。test表示主题名称。
& storm nimbus
> storm nimbus
& storm supervisor
> storm supervisor
& storm ui
> storm ui
cd /usr/local/project/
cd /usr/local/project/
& storm jar storm.xx.jar storm.testTopology test
> storm jar storm.xx.jar storm.testTopology test
注:storm.xx.jar 为我们编写好的storm项目jar包,第一步完成的工作。 storm.testTopology 为storm项目中main方法所在的类路径。test为此次topology的名字。
注:storm.xx.jar 为我们编写好的storm项目jar包,第一步完成的工作。 storm.testTopology 为storm项目中main方法所在的类路径。test为此次topology的名字。
cd /usr/local/flume
cd /usr/local/flume
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer
注:flume.conf.properties为我们自定义的flume配置文件,flume安装好后是没有此文件的,需要我们自己编写,编写方式见flume安装的文章。
注:flume.conf.properties为我们自定义的flume配置文件,flume安装好后是没有此文件的,需要我们自己编写,编写方式见flume安装的文章。
至此需要启动的程序已经全部启动,storm项目已经开始运行,可以打开storm ui 观察运行是否正常。
至此需要启动的程序已经全部启动,storm项目已经开始运行,可以打开storm ui 观察运行是否正常。
http://localhost:8080
http://localhost:8080
注:此处ip为storm nimbus所在机器Ip 端口可在storm配置文件 storm/conf/storm.yaml中修改
注:此处ip为storm nimbus所在机器Ip 端口可在storm配置文件 storm/conf/storm.yaml中修改
注:转载文章均来自于公开网络,仅供学习使用,不会用于任何商业用途,如果侵犯到原作者的权益,请您与我们联系删除或者授权事宜,联系邮箱:contact@dataunion.org。转载数盟网站文章请注明原文章作者,否则产生的任何版权纠纷与数盟无关。
相关文章!
期待你一针见血的评论,Come on!
不用想啦,马上 发表自已的想法.
做最棒的数据科学社区
扫描二维码,加微信公众号
联系我们:君,已阅读到文档的结尾了呢~~
扫扫二维码,随身浏览文档
手机或平板扫扫即可继续访问
基于ApacheFlume的分布式日志收集系统设计与实现
举报该文档为侵权文档。
举报该文档含有违规或不良信息。
反馈该文档无法正常浏览。
举报该文档为重复文档。
推荐理由:
将文档分享至:
分享完整地址
文档地址:
粘贴到BBS或博客
flash地址:
支持嵌入FLASH地址的网站使用
html代码:
&embed src='/DocinViewer--144.swf' width='100%' height='600' type=application/x-shockwave-flash ALLOWFULLSCREEN='true' ALLOWSCRIPTACCESS='always'&&/embed&
450px*300px480px*400px650px*490px
支持嵌入HTML代码的网站使用
您的内容已经提交成功
您所提交的内容需要审核后才能发布,请您等待!
3秒自动关闭窗口使用flume+kafka+storm构建实时日志分析系统_服务器应用_Linux公社-Linux系统门户网站
你好,游客
使用flume+kafka+storm构建实时日志分析系统
来源:Linux社区&
作者:yueys_canedy
本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他文章。
Kafka-Storm 集成部署
1. flume安装使用& & 下载flume安装包& & 解压$ tar -xzvf apache-flume-1.5.2-bin.tar.gz -C /opt/flume& & flume配置文件放在conf文件目录下,执行文件放在bin文件目录下。& & 1)配置flume& & 进入conf目录将flume-conf.properties.template拷贝一份,并命名为自己需要的名字& & $ cp flume-conf.properties.template flume.conf& &
修改flume.conf的内容,我们使用file sink来接收channel中的数据,channel采用memory channel,source采用exec source,配置文件如下:
agent.sources = seqGenSrcagent.channels = memoryChannelagent.sinks = loggerSink
# For each one of the sources, the type is definedagent.sources.seqGenSrc.type = execagent.mand = tail -F /data/mongodata/mongo.log#agent.sources.seqGenSrc.bind = 172.168.49.130
# The channel can be defined as follows.agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be definedagent.sinks.loggerSink.type = file_rollagent.sinks.loggerSink.sink.directory = /data/flume
#Specify the channel the sink should useagent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)# can be defined as well# In this case, it specifies the capacity of the memory channelagent.channels.memoryChannel.capacity = 1000agent.channels.memory4log.transactionCapacity = 100& & 2)运行flume agent& & 切换到bin目录下,运行一下命令:& & $ ./flume-ng agent --conf ../conf -f ../conf/flume.conf --n agent -Dflume.root.logger=INFO,console& & 在/data/flume目录下可以看到生成的日志文件。
2. 结合kafka& & 由于flume1.5.2没有kafka sink,所以需要自己开发kafka sink& & 可以参考flume 1.6里面的kafka sink,但是要注意使用的kafka版本,由于有些kafka api不兼容的& & 这里只提供核心代码,process()内容。
Sink.Status status = Status.READY;
& & Channel ch = getChannel();& & Transaction transaction =& & Event event =& & String eventTopic =& & String eventKey =& & & & try {& & & & transaction = ch.getTransaction();& & & & transaction.begin();& & & & messageList.clear();& & & & & & & & if (type.equals("sync")) {& & & & & & event = ch.take();
if (event != null) {& & & &
byte[] tempBody = event.getBody();& & & &
String eventBody = new String(tempBody,"UTF-8");& & & &
Map&String, String& headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {& & & &
eventTopic =& & & &
eventKey = headers.get(KEY_HDR);
if (logger.isDebugEnabled()) {& & & &
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "& & & &
+ eventBody);& & & &
}& & & & & & & &
ProducerData&String, Message& data = new ProducerData&String, Message&& & & &
(eventTopic, new Message(tempBody));& & & & & & & &
long startTime = System.nanoTime();& & & &
logger.debug(eventTopic+"++++"+eventBody);& & & &
producer.send(data);& & & &
long endTime = System.nanoTime();& &
}& & & & } else {& & & & & & long processedEvents = 0;& & & & & & for (; processedEvents & batchS processedEvents += 1) {& & & & & & & & event = ch.take();
if (event == null) {& & & && & & &
byte[] tempBody = event.getBody();& & & &
String eventBody = new String(tempBody,"UTF-8");& & & &
Map&String, String& headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {& & & &
eventTopic =& & & &
eventKey = headers.get(KEY_HDR);
if (logger.isDebugEnabled()) {& & & &
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "& & & &
+ eventBody);& & & &
logger.debug("event #{}", processedEvents);& & & &
// create a message and add to buffer& & & &
ProducerData&String, String& data = new ProducerData&String, String&& & & &
(eventTopic, eventBody);& & & &
messageList.add(data);& & & & & & }& & & & & & & & & & & & // publish batch and commit.& &
if (processedEvents & 0) {& &
long startTime = System.nanoTime();& &
long endTime = System.nanoTime();& &
}& & & & }& & & & & & & & mit();& & } catch (Exception ex) {& & & & String errorMsg = "Failed to publish events";& & & & logger.error("Failed to publish events", ex);& & & & status = Status.BACKOFF;& & & & if (transaction != null) {& & & & & try {& & & & & & transaction.rollback();& & & & & } catch (Exception e) {& & & & & & logger.error("Transaction rollback failed", e);& & & & & & throw Throwables.propagate(e);& & & & & }& & & & }& & & & throw new EventDeliveryException(errorMsg, ex);& & & } finally {& & & & if (transaction != null) {& & & & & transaction.close();& & & & }& & & }& & & && & 下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:& &
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.brokerList = bigdata-node00:9092producer.sinks.r.requiredAcks = 1producer.sinks.r.batchSize = 100#producer.sinks.r.kafka.producer.type=async#producer.sinks.r.kafka.customer.encoding=UTF-8producer.sinks.r.topic = testFlume1& & type指向kafkasink所在的完整路径& & 下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数
现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志
分布式发布订阅消息系统 Kafka 架构设计
Apache Kafka 代码实例
Apache Kafka 教程笔记
Apache kafka原理与特性(0.8V)&
Kafka部署与代码实例&
Kafka介绍和集群环境搭建&
Kafka 的详细介绍:Kafka 的下载地址:
本文永久更新链接地址:
相关资讯 & & &
& (05月30日)
& (04月10日)
& (06月06日)
& (05月16日)
& (03月09日)
图片资讯 & & &
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款分布式海量日志采集、聚合和传输系统:Cloudera Flume
作者: on 星期二, 三月 1, 2011 & &【阅读:2,556 次】
原文地址:
Flume是Cloudera提供的日志收集系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
上图的Flume的Architecture,在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。在上图中,实线描述了data flow。
其中,Agent用于采集数据,agent是flume中产生数据流的地方,同时,agent会将产生的数据流传输到collector。对应的,collector用于对数据进行聚合,往往会产生一个更大的流。
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。同时,Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。
其中,收集数据有2种主要工作模式,如下:
Push Sources:外部系统会主动地将数据推送到Flume中,如RPC、syslog。
Polling Sources:Flume到外部系统中获取数据,一般使用轮询的方式,如text和exec。
注意,在Flume中,agent和collector对应,而source和sink对应。Source和sink强调发送、接受方的特性(如数据格式、编码等),而agent和collector关注功能。
Flume Master用于管理数据流的配置,如下图。
为了保证可扩展性,Flume采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。
Flume Master间使用gossip协议同步数据。
下面简要分析Flume如何支持Reliability、Scalability、Manageability和Extensibility。
Reliability:Flume提供3中数据可靠性选项,包括End-to-end、Store on failure和Best effort。其中End-to-end使用了磁盘日志和接受端Ack的方式,保证Flume接受到的数据会最终到达目的。Store on failure在目的不可用的时候,数据会保持在本地硬盘。和End-to-end不同的是,如果是进程出现问题,Store on failure可能会丢失部分数据。Best effort不做任何QoS保证。
Scalability:Flume的3大组件:collector、master和storage tier都是可伸缩的。需要注意的是,Flume中对事件的处理不需要带状态,它的Scalability可以很容易实现。
Manageability:Flume利用ZooKeeper和gossip,保证配置数据的一致性、高可用。同时,多Master,保证Master可以管理大量的节点。
Extensibility:基于Java,用户可以为Flume添加各种新的功能,如通过继承Source,用户可以实现自己的数据接入方式,实现Sink的子类,用户可以将数据写往特定目标,同时,通过SinkDecorator,用户可以对数据进行一定的预处理。
anyShare一切看了好文章不转的行为,都是耍流氓!
关注与订阅
NoSQLFan图书频道}

我要回帖

更多关于 flume kafka 收集日志 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信