sparkspark streaming hivewindow可以多长

现在的位置:
Spark Streaming原理简析
数据的接收
StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor。
实例化之后,首先,要指定一个接收数据的方式,如
val lines = ssc.socketTextStream("localhost", 9999)
这样从socket接收文本数据。这个步骤返回的是一个ReceiverInputDStream的实现,内含Receiver,可接收数据并转化为RDD放内存里。
ReceiverInputDStream有一个需要子类实现的方法
def getReceiver(): Receiver[T]
子类实现这个方法,worker节点调用后能得到Receiver,使得数据接收的工作能分布到worker上。
如果是local跑,由于Receiver接收数据在本地,所以在启动streaming application的时候,要注意分配的core数目要大于Receiver数目,才能腾出cpu做计算任务的调度。
Receiver需要子类实现
def onStart()def onStop()
来定义一个数据接收器的初始化、接收到数据后如何存、如何在结束的时候释放资源。
Receiver提供了一系列store()接口,如store(ByteBuffer),store(Iterator)等等。这些store接口是实现好了的,会由worker节点上初始化的ReceiverSupervisor来完成这些存储功能。ReceiverSupervisor还会对Receiver做监控,如监控是否启动了、是否停止了、是否要重启、汇报error等等。
ReceiverSupervisor的存储接口的实现,借助的是BlockManager,数据会以RDD的形式被存放,根据StorageLevel选择不同存放策略。默认是序列化后存内存,放不下的话写磁盘(executor)。被计算出来的RDD中间结果,默认存放策略是序列化后只存内存。
ReceiverSupervisor在做putBlock操作的时候,会首先借助BlockManager存好数据,然后往ReceiverTracker发送一个AddBlock的消息。ReceiverTracker内部的ReceivedBlockTracker用于维护一个receiver接收到的所有block信息,即BlockInfo,所以AddBlock会把信息存放在ReceivedBlockTracker里。未来需要计算的时候,ReceiverTracker根据streamId,从ReceivedBlockTracker取出对应的block列表。
RateLimiter帮助控制Receiver速度,spark.streaming.receiver.maxRate参数。
数据源方面,普通的数据源为file, socket, akka, RDDs。高级数据源为Twitter, Kafka, Flume等。开发者也可以自己定制数据源。
JobScheduler在context里初始化。当context start的时候,触发scheduler的start。
scheduler的start触发了ReceiverTracker和JobGenerator的start。这两个类是任务调度的重点。前者在worker上启动Receiver接收数据,并且暴露接口能够根据streamId获得对应的一批Block地址。后者基于数据和时间来生成任务描述。
JobScheduler内含一个线程池,用于调度任务执行。spark.streaming.concurrentJobs可以控制job并发度,默认是1,即它只能一个一个提job。
job来自JobGenerator生成的JobSet。JobGenerator根据时间,生成job并且执行cp。
JobGenerator的生成job逻辑:
- 调用ReceiverTracker的allocateBlocksToBatch方法,为本批数据分配好block,即准备好数据
- 间接调用DStream的generateJob(time)方法,制造可执行的RDD
DStream切分RDD和生成可执行的RDD,即getOrCompute(time):
- 如果这个时间点的RDD已经生成好了,那么从内存hashmap里拿出来,否则下一步
- 如果时间是批次间隔的整数倍,则下一步,否则这个时间点不切
- 调用DStream的子类的compute方法,得到RDD。可能是一个RDD,也可以是个RDD列表
- 对每个RDD,调用persist方法,制定默认的存储策略。如果时间点合适,同时调用RDD的checkpoint方法,制定好cp策略
- 得到这些RDD后,调用SparkContext.runJob(rdd, emptyFunction)。把这整个变成一个function,生成Job类。未来会在executor上触发其runJob
JobGenerator成功生成job后,调用JobScheduler.submitJobSet(JobSet),JobScheduler会使用线程池提交JobSet中的所有job。该方法调用结束后,JobGenerator发送一个DoCheckpoint的消息,注意这里的cp是driver端元数据的cp,而不是RDD本身的cp。如果time合适,会触发cp操作,内部的CheckpointWriter类会完成write(streamingContext, time)。
JobScheduler提交job的线程里,触发了job的run()方法,同时,job跑完后,JobScheduler处理JobCompleted(job)。如果job跑成功了,调用JobSet的handleJobCompletion(Job),做些计时和数数工作,如果整个JobSet完成了,调用JobGenerator的onBatchCompletion(time)方法,JobGenerator接着会做clearMetadata的工作,然后JobScheduler打印输出;如果job跑失败了,JobScheduler汇报error,最后会在context里抛异常。
transform:可以与外部RDD交互,比如做维表的join
updateStateByKey:生成StateDStream,比如做增量计算。WordCount例子
每一批都需要与增量RDD进行一次cogroup之后,然后执行update function。两个RDD做cogroup过程有些开销:RDD[K, V]和RDD[K, U]合成RDD[K, List[V], List[U]],List[U]一般size是1,理解为oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function处理完,变成RDD[K, newValue]。
批与批之间严格有序,即增量合并操作,是有序的,批之间没发并发
增量RDD的分区数可以开大,即这步增量的计算可以调大并发
window:batch size,window length, sliding interval三个参数组成的滑窗操作。把多个批次的RDD合并成一个UnionRDD进行计算。
foreachRDD: 这个操作是一个输出操作,比较特殊。
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
def foreachRDD(foreachFunc: (RDD[T], Time) =& Unit) { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
DStream.foreachRDD()操作使开发者可以直接控制RDD的计算逻辑,而不是通过DStream映射过去。所以借助这个方法,可以实现MLlib, Spark SQL与Streaming的集合,如:结合Spark SQL、DataFrame做Wordcount。
如果是window操作,默认接收的数据都persist在内存里。
如果是flume, kafka源头,默认接收的数据replicate成两份存起来。
Checkpoint
与state有关的流计算,计算出来的结果RDD,会被cp到HDFS上,原文如下:
Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.
cp的时间间隔也可以设定,可以多批做一次cp。
cp的操作是同步的。
简单的不带state操作的流任务,可以不开启cp。
driver端的metadata也有cp策略。driver cp的时候是将整个StreamingContext对象写到了可靠存储里。
EasyQuery的目标是不需要写一行java代码就可以实现非常非常复杂的查询,省时省力,提高效率。
【上篇】【下篇】
您可能还会对这些文章感兴趣!
您必须才能发表留言!
籍贯山东,落户北京,IT行业。
工作经历:
2014年至今&,自主创业
,传智播客
,超人学院
,亚信科技
教育经历:
,中科院研究生院
,河北大学Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析 -
- ITeye技术网站
博客分类:
最近在玩spark streaming, 感觉到了他的强大。 然后看 StreamingContext的源码去理解spark是怎么完成计算的。 大部分的源码比较容易看懂, 但是这个
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
还是花了不少时间。 主要还是由于对spark不熟悉造成的吧, 还好基本弄明白了。
总的来说SparkStreaming提供这个方法主要是出于效率考虑。 比如说我要每10秒计算一下前15秒的内容,(每个batch 5秒), 可以想象每十秒计算出来的结果和前一次计算的结果其实中间有5秒的时间值是重复的。
那么就是通过如下步骤
1. 存储上一个window的reduce值
2.计算出上一个window的begin 时间到 重复段的开始时间的reduce 值 =》 oldRDD
3.重复时间段的值结束时间到当前window的结束时间的值 =》 newRDD
4.重复时间段的值等于上一个window的值减去oldRDD
这样就不需要去计算每个batch的值, 只需加加减减就能得到新的reduce出来的值。
从代码上面来看, 入口为:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
.mapValues(mergeValues)
先计算oldRDD 和newRDD
//currentWindow& 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值
_____________________________
previous window
_________|___________________
// |___________________|
current window
--------------& Time
|_____________________________|
// |________ _________|
|________ _________|
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
得到newRDD和oldRDD后就要拿到previous windows的值: 如果第一次没有previous window那么建一个空RDD, 为最后计算结果时 arrayOfValues(0).isEmpty 铺垫
val previousWindowRDD =
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
然后把所有的值放到一个数组里面 0是previouswindow, 1到oldRDD.size是oldrdd, oldRDD.size到newRDD.size是newrdd
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
将每个RDD的(K,V) 转变成(K, Iterator(V))的形式:
比如说有两个值(K,a) 和(K,b) 那么coGroup后就会成为(K, Iterator(a,b))这种形式
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
partitioner)
进行最后的计算:
val mergeValues = (arrayOfValues: Array[Iterable[V]]) =& {
首先判断RDD的value数量是不是正确 previous window因为已经计算过所以只有一组值
正确值为 1 (previous window value) + numOldValues (oldRDD 每个RDD的value) + numNewValues (newRDD 每个RDD的value)
if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
接下来取出oldRDD的值和newRDD的值:
val oldValues = (1 to numOldValues).map(i =& arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
val newValues =
(1 to numNewValues).map(i =& arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
如果previous window是空的, 那么就直接计算newRDD的值(这也是为什么每次计算时候第一次打出来的值都比较少, 因为他只有newRDD部分没有重合部分, 也就是只有10秒的内容而不是15秒)
if (arrayOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found. " +
"Are you sure your key class hashes consistently?")
// Reduce the new values
newValues.reduce(reduceF) // return
如果有previous window的值, 那么先存到tempValue, 如果有oldRDD那么减去oldRDD, 如果有newRDD (一般都有) 那么加上newRDD的值 这样就组成上图里面10到25秒区间的值了
// Get the previous window's reduced value
var tempValue = arrayOfValues(0).head
// If old values exists, then inverse reduce then from previous value
if (!oldValues.isEmpty) {
tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
// If new values exists, then reduce them with previous value
if (!newValues.isEmpty) {
tempValue = reduceF(tempValue, newValues.reduce(reduceF))
tempValue // return
最后如果有filter的function的话就filter一下:
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))
Some(mergedValuesRDD)
这样就返回了新window内的值
humingminghz
浏览: 10065 次Spark Streaming编程指南_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
Spark Streaming编程指南
上传于||文档简介
&&Spark Streaming编程指南
阅读已结束,如果下载本文需要使用0下载券
想免费下载更多文档?
定制HR最喜欢的简历
下载文档到电脑,查找使用更方便
还剩8页未读,继续阅读
定制HR最喜欢的简历
你可能喜欢Spark 2.0介绍:Spark SQL中的Time Window使用 - 推酷
Spark 2.0介绍:Spark SQL中的Time Window使用
《Spark 2.0技术预览:更容易、更快速、更智能》文章中简单地介绍了
带来的新技术等。
的下一个主要版本。此版本在架构抽象、API以及平台的类库方面带来了很大的变化,为该框架明年的发展奠定了方向,所以了解
的一些特性对我们能够使用它有着非常重要的作用。本博客将对
2.0进行一序列的介绍(参见Spark 2.0分类),欢迎关注。
Spark SQL中Window API
Spark SQL中的window API是从1.4版本开始引入的,以便支持更智能的分组功能。这个功能对于那些有SQL背景的人来说非常有用;但是在Spark 1.x中,window API一大缺点就是无法使用时间来创建窗口。时间在诸如金融、电信等领域有着非常重要的角色,基于时间来理解数据变得至关重要。
不过值得高兴的是,在Spark 2.0中,window API内置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常类似。在这篇文章中,我将介绍如何在Spark SQL中使用time windows。
时间序列数据
在我们介绍如何使用time window之前,我们先来准备一份时间序列数据。本文将使用Apple公司从1980年到2016年期间的股票交易信息。如下(完整的数据点击获取):
Date,Open,High,Low,Close,Volume,Adj Close
,96.75,97...003
,96....68,.68
,95..5,95..002
,94....999
,95....001
,95....999
,94....998
,93....002
,92....996
,93,93..5,92.001
,92....002
,95...25,96.998
,96.25,96...003
,94...68,95.004
,96,96.57,95..998
,96....002
,96..75,96.07,97.003
,97.82,98...999
,97.32,98..75,97.999
,98....996
,98....002
,98.5,99...002
,99...68,98.002
,99.25,99...999
,97....997
,97....998
,97....001
,99....999
,99...82,99.001
股票数据一共有六列,但是这里我们仅关心Date和Close两列,它们分别代表股票交易时间和当天收盘的价格。
将时间序列数据导入到DataFrame中
我们有了样本数据之后,需要将它导入到DataFrame中以便下面的计算。所有的time window API需要一个类型为timestamp的列。我们可以使用spark-csv工具包来解析上面的Apple股票数据(csv格式),这个工具可以自动推断时间类型的数据并自动创建好模式。代码如下:
spark.read.option(
&inferSchema&
org.apache.spark.sql.DataFrame
timestamp, Open
double ...
more fields]
计算2016年Apple股票周平均收盘价格
现在我们已经有了初始化好的数据,所以我们可以进行一些基于时间的窗口分析。在本例中我们将计算2016年Apple公司每周股票的收盘价格平均值。下面将一步一步进行介绍。
步骤一:找出2016年的股票交易数据
因为我们仅仅需要2016年的交易数据,所以我们可以对原始数据进行过滤,代码如下:
* User: 过往记忆
* Date: 日
* Time: 下午23:45
* 本文地址:
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
stocksDF.filter(
&year(Date)==2016&
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
timestamp, Open
double ...
more fields]
上面代码片段我们使用了内置的year函数来提取出日期中的年。
步骤二:计算平均值
现在我们需要对每个星期创建一个窗口,这种类型的窗口通常被称为tumbling window,代码片段如下:
tumblingWindowDS
.groupBy(window(stocks
)).agg(avg(
&weekly_average&
tumblingWindowDS
org.apache.spark.sql.DataFrame
struct&start
timestamp, end
timestamp&, weekly
上面代码中展示了如何使用 time window API。window一般在group by语句中使用。window方法的第一个参数指定了时间所在的列;第二个参数指定了窗口的持续时间(duration),它的单位可以是seconds、minutes、hours、days或者weeks。创建好窗口之后,我们可以计算平均值。
步骤三:打印window的值
我们可以打印出window中的值,我们先定义好打印的公共函数,代码片段如下:
* User: 过往记忆
* Date: 日
* Time: 下午23:45
* 本文地址:
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
printWindow(windowDF
DataFrame, aggCol
windowDF.sort(
&window.start&
&window.start&
&window.end&
show(truncate
然后我们打印出tumblingWindowDS中的值:
printWindow(tumblingWindowDS,
&weekly_average&
+---------------------+---------------------+------------------+
|start&&&&&&&&&&&&&&& |end&&&&&&&&&&&&&&&&& |weekly
average&&& |
+---------------------+---------------------+------------------+
97.6719984
96.2525005
104.226001
106.0699996
107.8549995
110.4520004
101.5520004
93.9979994
93.3299974
+---------------------+---------------------+------------------+
only showing top
上面的输出按照
window.start
进行了排序,这个字段标记了窗口的开始时间。上面的输出你可能已经看到了第一行的开始时间是,结束时间是。但是你从原始数据可以得到:2016年Apple公司的股票交易信息是从开始的;原因是是元旦,而和正好是周末,期间没有股票交易。
我们可以手动指定窗口的开始时间来解决这个问题。
带有开始时间的Time window
在前面的示例中,我们使用的是tumbling window。为了能够指定开始时间,我们需要使用sliding window(滑动窗口)。到目前为止,没有相关API来创建带有开始时间的tumbling window,但是我们可以通过将窗口时间(window duration)和滑动时间(slide duration)设置成一样来创建带有开始时间的tumbling window。代码如下:
* User: 过往记忆
* Date: 日
* Time: 下午23:45
* 本文地址:
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
iteblogWindowWithStartTime
.groupBy(window(stocks
)).agg(avg(
&weekly_average&
上面的示例中,
参数就是开始时间的偏移量;前两个参数分别代表窗口时间和滑动时间,我们打印出这个窗口的内容:
printWindow(iteblogWindowWithStartTime,
&weekly_average&
+---------------------+---------------------+------------------+
|start&&&&&&&&&&&&&&& |end&&&&&&&&&&&&&&&&& |weekly
average&&& |
+---------------------+---------------------+------------------+
105.349998
99.0699982
98.1220016
96.2539976
94.2374975
96.7880004
101.6199998
110.3820004
96.8759994
93.6240004
+---------------------+---------------------+------------------+
only showing top
从上面的结果可以看出,我们已经有了一个从的结果;不过结果中还有2015年的数据。原因是我们的开始时间是
,之前的一周数据也会被显示出,我们可以使用filter来过滤掉那行数据:
filteredWindow
iteblogWindowWithStartTime.filter(
&year(window.start)=2016&
现在来看看输出的结果:
printWindow(filteredWindow,
&weekly_average&
+---------------------+---------------------+------------------+
|start&&&&&&&&&&&&&&& |end&&&&&&&&&&&&&&&&& |weekly
average&&& |
+---------------------+---------------------+------------------+
99.0699982
98.1220016
96.2539976
94.2374975
96.7880004
101.6199998
110.3820004
96.8759994
93.6240004
+---------------------+---------------------+------------------+
only showing top
到目前为止,我们已经了解了如何在Spark中使用Window了。
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自过往记忆(/)
本文链接: (/archives/1705)
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致spark streaming 的 state 操作示例_ IT技术精华
聚合国内IT技术精华文章,分享IT技术精华,帮助IT从业人士成长
>> 技术文章 >> 正文
spark streaming 的 state 操作示例
浏览: 1805 次
前一篇学习演示了 spark streaming 的基础运用。下一步进入稍微难一点的,利用 checkpoint 来保留上一个窗口的状态,这样可以做到移动窗口的更新统计。首先还是先演示一下 spark 里传回调函数的用法,上一篇里用 DStream 处理模拟了 SUM(),这个纯加法是最简单的了,那么如果 AVG() 怎么做呢?val r = logs.filter(l =& l.path.equals(&/var/log/system.log&)).filter(l =& l.lineno & 70)
r.map(l =& l.message -& (l.lineno, 1)).reduceByKey((a, b) =& {
(a._1 + b._1, a._2 + b._2)
}).map(t =& AlertMsg(t._1, t._2._2, t._2._1/t._2._2)).print()这段跟之前做 SUM 的那段的区别: DStream 处理成 PairDStream 的时候,Value 不是单纯的 1,而是一个 Seq[Double, Int]。避免了上一个示例里分开两个 DStream 然后再 join 起来的操作; 给 reduceByKey 传了一个稍微复杂的匿名函数。在这一个函数里计算了 SUM 和 COUNT,后面 map 只需要做一下除法就是 AVG 了。不过这里还用不上上一次窗口的状态。真正需要上一次窗口状态的,是 reduceByKeyAndWindow 和 updateStateByKey。reduceByKeyAndWindow 和 reduceByKey 的区别,就是除了计算新数据的函数,还要传递一个处理过期数据的函数。下面用 updateStateByKey ,演示一下如何计算每个窗口的平均值,跟上一个窗口的平均值的涨跌幅度,如果波动超过 10%,则输出:import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.StreamingContext._import scala.util.parsing.json.JSONobject LogStash {
case class LogStashV1(message:String, path:String, host:String, lineno:Double, timestamp:String)
case class Status(sum:Double = 0.0, count:Int = 0) {
val avg = sum / scala.math.max(count, 1)
var countTrend = 0.0
var avgTrend = 0.0
def +(sum:Double, count:Int): Status = {
val newStatus = Status(sum, count)
if (this.count & 0 ) {
newStatus.countTrend = (count - this.count).toDouble / this.count
if (this.avg & 0 ) {
newStatus.avgTrend = (newStatus.avg - this.avg) / this.avg
override def toString = {
s&Trend($count, $sum, $avg, $countTrend, $avgTrend)&
def updatestatefunc(newValue: Seq[(Double, Int)], oldValue: Option[Status]): Option[Status] = {
val prev = oldValue.getOrElse(Status())
var current = prev + ( newValue.map(_._1).sum, newValue.map(_._2).sum )
Some(current)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster(&local[2]&).setAppName(&LogStash&)
= new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream(&localhost&, 8888)
val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val logs = jsonf.map(data =& LogStashV1(data(&message&).toString, data(&path&).toString, data(&host&).toString, data(&lineno&).toString.toDouble, data(&@timestamp&).toString))
val r = logs.filter(l =& l.path.equals(&/var/log/system.log&)).filter(l =& l.lineno & 70)
r.map(l =& l.message -& (l.lineno, 1)).reduceByKey((a, b) =& {
(a._1 + b._1, a._2 + b._2)
}).updateStateByKey(updatestatefunc).filter(t =& t._2.avgTrend.abs & 0.1).print()
ssc.start()
ssc.awaitTermination()
}}这里因为流数据只有 sum 和 count,但是又想留存两个 trend 数据,所以使用了一个新的 cast class,把 trend 数据作为 class 的 value member。对于 state 来说,看到的就是一整个 class 了。依然有参考资料:
本页关键字}

我要回帖

更多关于 spark streaming 实例 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信