pyspark 中怎么做matlab中的for循环环

Python中for循环详解
字体:[ ] 类型:转载 时间:
这篇文章主要介绍了Python中for循环,有需要的朋友可以参考一下
与其它大多数语言一样,Python 也拥有 for 循环。你到现在还未曾看到它们的唯一原因就是,Python 在其它太多的方面表现出色,通常你不需要它们。
其它大多数语言没有像 Python 一样的强大的 list 数据类型,所以你需要亲自做很多事情,指定开始,结束和步长,来定义一定范围的整数或字符或其它可重复的实体。但是在 Python 中,for 循环简单地在一个列表上循环,与 list 解析的工作方式相同。
1. for& 循环介绍
代码如下:&&& li = ['a', 'b', 'e']&&& for s in li:&&&&&&&& (1)...&&&& print s&&&&&&&&& (2)ae&&& print "\n".join(li)& (3)ae
(1)& for 循环的语法同 list 解析相似。li 是一个 list,而 s 将从第一个元素开始依次接收每个元素的值。(2)& 像 if 语句或其它任意缩进块,for 循环可以包含任意数目的代码行。(3)& 这就是你以前没看到过 for 循环的原因:至今我们都不需要它。太令人吃惊了,当你想要的只是一个 join 或是 list 解析时,在其它语言中常常需要使用 for 循环。
要做一个 “通常的” (Visual Basic 标准的) 计数 for 循环也非常简单。
2. 简单计数
代码如下:&&& for i in range(5):&&&&&&&&&&&& (1)...&&&& print i01234&&& li = ['a', 'b', 'c', 'd', 'e']&&& for i in range(len(li)):&&&&&& (2)- 104 -Dive Into Python http://diveintopython.org/...&&&& print li[i]acde
(1)& range 生成一个整数的 list,通过它来控制循环。我知道它看上去有些奇怪,但是它对计数循环偶尔 (我只是说偶尔) 会有用 。(2)& 我们从来没这么用过。这是 Visual Basic 的思维风格。摆脱它吧。正确遍历 list 的方法是前面的例子所展示的。
for 循环不仅仅用于简单计数。它们可以遍历任何类型的东西。下面的例子是一个用 for 循环遍历 dictionary 的例子。
3. 遍历& dictionary 代码如下:&&& import os&&& for k, v in os.environ.items():&&&&& (1) (2)...&&&& print "%s=%s" % (k, v)USERPROFILE=C:\Documents and Settings\mpilgrimOS=Windows_NTCOMPUTERNAME=MPILGRIMUSERNAME=mpilgrim[...略...]&&& print "\n".join(["%s=%s" % (k, v)...&&&& for k, v in os.environ.items()]) (3)USERPROFILE=C:\Documents and Settings\mpilgrimOS=Windows_NTCOMPUTERNAME=MPILGRIMUSERNAME=mpilgrim[...略...]
(1)& os.environ 是在你的系统上所定义的环境变量的 dictionary。在 Windows 下,这些变量是可以从 MS-DOS 访问的用户和系统变量。在 UNIX 下,它们是在你的 shell 启动脚本中所 export (输出) 的变量。在 Mac OS 中,没有环境变量的概念,所以这个 dictionary 为空。(2)& os.environ.items() 返回一个 tuple 的 list:[(key1, value1), (key2, value2), ...]。for 循环对这个 list 进行遍历。第一轮,它将 key1 赋给 k ,value1 赋给 v,所以 k = USERPROFILE,v = C:\Documents and Settings\mpilgrim。第二轮,k 得到第二个键字 OS,v 得到相应的值 Windows_NT。(3)& 使用多变量赋值和 list 解析,你可以使用单行语句来替换整个 for 循环。在实际的编码中是否这样做只是个人风格问题;我喜欢它是因为,将一个dictionary 映射到一个 list,然后将 list 合并成一个字符串,这一过程显得很清晰。其它的程序员宁愿将其写成一个 for 循环。请注意在两种情况下输出是一样的,然而这一版本稍微快一些,因为它只有一条 print 语句而不是许多。
现在我们来看看在 第 5 章介绍的样例程序 fileinfo.py 中 MP3FileInfo 的 for 循环 。
代码如下:&&& tagDataMap = {"title"&& : (& 3,& 33, stripnulls),&&&&&&&&&&&&&&&&& "artist"& : ( 33,& 63, stripnulls),&&&&&&&&&&&&&&&&& "album"&& : ( 63,& 93, stripnulls),&&&&&&&&&&&&&&&&& "year"&&& : ( 93,& 97, stripnulls),&&&&&&&&&&&&&&&&& "comment" : ( 97, 126, stripnulls),&&&&&&&&&&&&&&&&& "genre"&& : (127, 128, ord)}&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& (1)&&& .&&& .&&& .&&&&&&&&&&& if tagdata[:3] == "TAG":&&&&&&&&&&&&&&& for tag, (start, end, parseFunc) in self.tagDataMap.items(): (2)&&&&&&&&&&&&&&&&&&& self[tag] = parseFunc(tagdata[start:end])&&&&&&&&&&&&&&& (3)
(1)& tagDataMap 是一个类属性,它定义了我们正在一个 MP3 文件中搜索的标记。标记存储为定长字段,只要我们读出文件最后 128 个字节,那么第 3 到 32 字节总是歌曲的名字,33-62 总是歌手的名字,63-92 为专辑的名字,等等。请注意 tagDataMap 是一个 tuple 的 dictionary,每个 tuple 包含两个整数和一个函数引用。(2)& 这个看上去复杂一些,但其实并非如此。这里的 for 变量结构与 items 所返回的 list 的元素的结构相匹配。记住,items 返回一个形如 (key, value) 的 tuple 的 list。list 第一个元素是 ("title", (3, 33, &function stripnulls&)),所以循环的第一轮,tag 为 "title",start 为 3,end 为 33,parseFunc 为函数 stripnulls。(3)& 现在我们已经从一个单个的 MP3 标记中提取出了所有的参数,将标记数据保存起来挺容易。我们从 start 到 end 对 tagdata 进行分片,从而得到这个标记的实际数据,调用 parseFunc 对数据进行后续的处理,接着将 parseFunc 的返回值作为值赋值给伪字典 self 中的键字 tag。在遍历完 tagDataMap 中所有元素之后,self 拥有了所有标记的值,你知道看上去是什么样。
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具spark(26)
1、http://spark.apache.org/docs/latest/streaming-programming-guide.html
2、/apache/spark/tree/v2.2.0
Spark Streaming编程指南
一个快速的例子
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext(&local[2]&, &NetworkWordCount&)
ssc = StreamingContext(sc, 1)
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream(&localhost&, 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(& &))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
ssc.start()
# Start the computation
ssc.awaitTermination()
# Wait for the computation to terminate
初始化StreamingContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
streamingContext.textFileStream(dataDirectory)
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
# add the new values with the previous running count to get the new count
runningCounts = pairs.updateStateByKey(updateFunction)
spamInfoRDD = sc.pickleFile(...)
# RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
Stream-stream joins
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
DStreams的输出操作
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
foreachRDD(func)
使用foreachRDD的设计模式
def sendRecord(rdd):
connection = createNewConnection()
# executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
DataFrame and SQL 操作
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if (&sparkSessionSingletonInstance& not in globals()):
globals()[&sparkSessionSingletonInstance&] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()[&sparkSessionSingletonInstance&]
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print(&========= %s =========& % str(time))
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView(&words&)
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql(&select word, count(*) as total from words group by word&)
wordCountsDataFrame.show()
words.foreachRDD(process)
如何配置 Checkpointing
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...)
# new context
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...)
# create DStreams
ssc.checkpoint(checkpointDirectory)
# set checkpoint directory
return ssc
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...
# Start the context
context.start()
context.awaitTermination()
Accumulators, Broadcast Variables, and Checkpoints
def getWordBlacklist(sparkContext):
if (&wordBlacklist& not in globals()):
globals()[&wordBlacklist&] = sparkContext.broadcast([&a&, &b&, &c&])
return globals()[&wordBlacklist&]
def getDroppedWordsCounter(sparkContext):
if (&droppedWordsCounter& not in globals()):
globals()[&droppedWordsCounter&] = sparkContext.accumulator(0)
return globals()[&droppedWordsCounter&]
def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
counts = &Counts at time %s %s& % (time, rdd.filter(filterFunc).collect())
wordCounts.foreachRDD(echo)
数据接收中的并行级别
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:10634次
积分:1508
积分:1508
排名:千里之外
原创:48篇
转载:127篇
译文:74篇
(7)(182)(55)(4)(1)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'1被浏览13分享邀请回答暂时还没有回答,开始写第一个回答pyspark实现Apriori算法、循环迭代、并行处理 - CSDN博客
pyspark实现Apriori算法、循环迭代、并行处理
from pyspark import
SparkContext
myDat=[ [ 1, 3, 4,5 ], [ 2, 3, 5 ], [ 1, 2, 3,4, 5 ], [ 2,3,4, 5 ] ]
sc = SparkContext( 'local', 'pyspark')
myDat=sc.parallelize(myDat) #得到输入数据RDD #myDat.collect(): [[1, 3, 4, 5], [2, 3, 5], [1, 2, 3, 4, 5], [2, 3, 4, 5]]
C1=myDat.flatMap(lambda x: set(x)).distinct().collect() #distinct()是去重操作,对应C1=createC1(myDat) #得到1项集 #[1, 2, 3, 4, 5],
C1=[frozenset([var]) for var in C1] #需要这样做,因为python的代码里需要处理集合操作
D=myDat.map(lambda x: set(x)).collect() #将输入数据RDD转化为set的列表 #[{1, 3, 4, 5}, {2, 3, 5}, {1, 2, 3, 4, 5}, {2, 3, 4, 5}]
D_bc=sc.broadcast(D)
length=len(myDat.collect())
suppData=sc.parallelize(C1).map(lambda x: (x,len([var for var in D_bc.value if x.issubset(var)])/length) if len([var for var in D_bc.value \
if x.issubset(var)])/length &=0.75 else ()).filter(lambda x: x).collect()
L1=[frozenset(var) for var in map(lambda x:x[0],suppData)] #筛选出大于最小支持度
L.append(L1)
#D_bc=sc.broadcast(D)
while (len(L[k-2])&0):
Ck=[var1|var2 for index,var1 in enumerate(L[k-2]) for var2 in L[k-2][index+1:] if list(var1)[:k-2]==list(var2)[:k-2]]
#count_each_ele=myDat.flatMap(lambda x:x).map(lambda x: (x,1)).countByKey()
#count_each_ele=sc.parallelize(Ck).map(lambda x: filter(lambda y: x.issubset(y),D_bc.value))
suppData_temp=sc.parallelize(Ck).map(lambda x: (x,len([var for var in D_bc.value if x.issubset(var)])/length) if len([var for var in D_bc.value \
if x.issubset(var)])/length &=0.75 else ()).filter(lambda x: x).collect()
#Ck中的多个子集会分布到多个分布的机器的任务中运行,D_bc是D的分发共享变量,在每个任务中,都可以使用D_bc来统计本任务中包含某子集的个数
suppData+=suppData_temp
L.append([var[0] for var in suppData_temp]) #使用这行代码,最后跳出while后再过滤一下空的项
L=[var for var in L if var]
print(suppData)
def calcConf(freqSet, H, supportData, brl, minConf=0.7 ):
prunedH=[]
#sc.parallelize(H).map(lambda x: ...) #这里也无法并行,因为,freqSet是局部的,如果弄成广播,那得好多副本
for conseq in H:
conf = supportData[ freqSet ] / supportData[ freqSet - conseq ]
if conf &= minConf:
print(freqSet - conseq, '--&', conseq, 'conf:', conf)
brl.append( ( freqSet - conseq, conseq, conf ) )
prunedH.append( conseq )
return prunedH
def rulesFromConseq(freqSet,H,supportData,brl,minConf=0.7):
m=len(H[0])
if len(freqSet)&m+1:
Hmp1=[var1|var2 for index,var1 in enumerate(H) for var2 in H[index+1:] if list(var1)[:m+1-2]==list(var2)[:m+1-2]]
Hmp1 = calcConf( freqSet, Hmp1, supportData, brl, minConf )
if len( Hmp1 ) & 1:
rulesFromConseq( freqSet, Hmp1, supportData, brl, minConf )
def generateRules( L, supportData, minConf=0.7 ):
bigRuleList = []
for i in range( 1, len( L ) ):
for freqSet in L[ i ]:
H1 = [ frozenset( [ item ] ) for item in freqSet ]
rulesFromConseq( freqSet, H1, supportData, bigRuleList, minConf )
calcConf( freqSet, H1, supportData, bigRuleList, minConf )
return bigRuleList
suppData_dict={}
suppData_dict.update(suppData) #查字典类型的update用法
sD_bc=sc.broadcast(suppData_dict)
rules = generateRules( L, sD_bc.value, minConf=0.9 )
print('rules:\n', rules)
python版:
进一步优化,将计算rules的部分的代码写成如下形式:
#上述计算rules的代码的进一步修剪
newL=[[x,[]] for x in sc.parallelize(L).flatMap(lambda x: x).collect() if len(x)&1]
suppData_dict={}
suppData_dict.update(suppData)
sD_bc=sc.broadcast(suppData_dict) #查字典类型的update用法
def f2(freqSet, H, supportData, minConf=0.7 ):
prunedH=[]
for conseq in H:
conf = supportData[ freqSet[0] ] / supportData[ freqSet[0] - conseq ]
if conf &= minConf:
#print(freqSet[0] - conseq, '--&', conseq, 'conf:', conf)
freqSet[1]=freqSet[1]+[( freqSet[0] - conseq, conseq, conf )]
prunedH.append( conseq )
return (prunedH,freqSet)
def f1(freqSet,H,supportData,minConf=0.7): #需要这个H,因为H并不一定都由freqSet面来
m=len(H[0])
if len(freqSet[0])&m+1:
Hmp1=[var1|var2 for index,var1 in enumerate(H) for var2 in H[index+1:] if list(var1)[:m+1-2]==list(var2)[:m+1-2]]
Hmp1 = f2( freqSet, Hmp1, supportData, minConf )
if len( Hmp1[0] ) & 1:
f1( freqSet, Hmp1[0], supportData, minConf )
return Hmp1[1]
result=sc.parallelize(newL).map(lambda x: f1(x,[frozenset([var]) for var in x[0]],sD_bc.value,0.9) if len(x[0])&2 else f2(x,[frozenset([var]) for var in x[0]],sD_bc.value,0.9)[1]).collect()
rules=[var[1] for var in result]
print(rules)
本文已收录于以下专栏:
相关文章推荐
val mydata = Array(Array(1,3,4,5),Array(2,3,5),Array(1,2,3,4,5),Array(2,3,4,5))
val pamydata = sc.pa...
2014届全国高校云计算大赛技能赛
K-频繁项集挖掘并行化算法
? 环境描述: 本题目需要运行在 Apache Spark 1.0.1Apache Spark 1.0.1Apache Spa...
1、首先在官网上下载:spark-1.6.0-bin-hadoop2.6.tgz
2、解压后,放到D盘下,并改名为spark-1.6.0,将D:\spark-1.6.0\bin加入到环境变量中,在终端...
原始链接:基于Python的机器学习实战:Apriori
原始链接里的代码是在python2下写的,有的地方我看的不是太明白,在这里,我把它修改成能在python3下运行了,还加入了一些方便自己理解的...
他的最新文章
讲师:宋宝华
讲师:何宇健
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)Python中for循环控制语句用法实例
作者:way_testlife
字体:[ ] 类型:转载 时间:
这篇文章主要介绍了Python中for循环控制语句用法,较为详细的分析了for循环语句的原理与相关使用技巧,需要的朋友可以参考下
本文实例讲述了Python中for循环控制语句用法。分享给大家供大家参考。具体分析如下:
第一个:求 50 - 100 之间的质数
import math
for i in range(50, 100 + 1):
for j in range(2, int(math.sqrt(i)) + 1):
if i % j == 0:
输出如下:
第二个:把else的位置与if处于同一缩进。
import math
for i in range(50, 100 + 1):
for j in range(2, int(math.sqrt(i)) + 1):
if i % j == 0:
第三个:在else后加一个break语句。
import math
for i in range(50, 100 + 1):
for j in range(2, int(math.sqrt(i)) + 1):
if i % j == 0:
for语句是python中的循环控制语句。可用来遍历某一对象,还具有一个附带的可选的else块,主要用于处理for语句中包含的break语句。
如果for循环未被break终止,则执行else块中的语句。
break 在需要时终止for循环
continue 跳过位于其后的语句,开始下一轮循环。
for语句的格式如下:
&&&for && in &对象集合&:
...&&& if &条件&:
...&&&&&&& break
...&&& if &条件&:
...&&&&&&& continue
...&&& &其他语句&
关于第一个程序
在这里,我解释一下为何导入math模块:导入math模块就是为了开方。
如果导入了math模块,然后对 i 进行开方,可以减少运算次数。
求一个数是否质数。只需对它进行这样的运算:
&&& 将这个数n,循环与 2 到 这个n的开平方 进行相除
如果这个区间内的所有整数不能整除n,则n为质数。
这样,就节省了运算 ‘大于n的开平方 小于n' 之间这段运算的时间。
第二,我解释一下那‘+1':
int(math.sqrt(i)) 输出的是比 i的开平方 小 的最大整数。
比如说:math.sqrt(51) 结果比7大一点,而 int(math.sqrt(51)) 输出的是7
而且在range(m, n)这里,range()函数产生的是一个从 m至n-1的整数列表,因而需要‘+1',使运算完整。
顺便提一下range()函数。
range([start,] stop [, step])
# start& 可选参数,起始数
#stop&& 终止数,如果 range 只有一个参数x,则产生一个包含 0 至 x-1 的整数列表
#step&& 可选参数,步长
第二个程序
else那行不对,如果else放在那个地方的话,一旦有某个数遇到不能整除自己的数,就会输出i,直道找到一个整除自己等于0的数。那样就会连续输出这个数。
例如:i = 77,他不是质数,但是也会连续输出5次77,懂不?
只不过,只是自己不明白当else与if位于同一缩进的话,它是怎样运行的。
你解释得很详细,用‘茅塞顿开'来形容一点都不过分。
而且,我必觉得画图是理解循环一个非常好的办法。
希望本文所述对大家的Python程序设计有所帮助。
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具}

我要回帖

更多关于 shell中for循环 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信