Python多进程处理:如何将大量java 将数据放入缓存有限

Python多进程处理:如何将大量数据放入有限内存
简介这是一篇有关如何将大量的数据放入有限的内存中的简略教程。与客户工作时,有时会发现他们的数据库实际上只是一个csv或Excel文件仓库,你只能将就着用,经常需要在不更新他们的数据仓库的情况下完成工作。大部分情况下,如果将这些文件存储在一个简单的数据库框架中或许更好,但时间可能不允许。这种方法对时间、机器硬件和所处环境都有要求。下面介绍一个很好的例子:假设有一堆表格(没有使用Neo4j、MongoDB或其他类型的数据库,仅仅使用csvs、tsvs等格式存储的表格),如果将所有表格组合在一起,得到的数据帧太大,无法放入内存。所以第一个想法是:将其拆分成不同的部分,逐个存储。这个方案看起来不错,但处理起来很慢。除非我们使用多核处理器。目标这里的目标是从所有职位中(大约1万个),找出相关的的职位。将这些职位与政府给的职位代码组合起来。接着将组合的结果与对应的州(行政单位)信息组合起来。然后用通过word2vec生成的属性信息在我们的客户的管道中增强已有的属性。这个任务要求在短时间内完成,谁也不愿意等待。想象一下,这就像在不使用标准的关系型数据库的情况下进行多个表的连接。数据职位数据referencenumbertitlepostdateurlcompanycitystatedescription
Sales Associate 13:47:18URL linkCompany NameCityStateOur Sales Associates are…“表格太长,请到查看。”标题数据IDTitle82Pediatricians, GeneralOES数据areaarea_titlearea_typenaicsnaics_titleown_code后略…99U.S.1000000Cross-industry123500-0000“表格太长,请到查看。”SOC表2010 SOC Code2010 SOC Title2010 SOC Direct Match Titlellustrative Example11-1011Chief ExecutivesCEO 示例脚本下面的是一个示例脚本,展示了如何使用multiprocessing来在有限的内存空间中加速操作过程。脚本的第一部分是和特定任务相关的,可以自由跳过。请着重关注第二部分,这里侧重的是multiprocessing引擎。Python1234567891011121314151617181920212223242526272829303132333435363738394041424344#import the necessary packagesimport pandas as pdimport usimport numpy as npfrom multiprocessing import Pool,cpu_count,Queue,Manager # the data in one particular column was number in the form that horrible excel version # of a number where '12000' is '12,000' with that beautiful useless comma in there. # did I mention I excel bothers me?# instead of converting the number right away, we only convert them when we need todef median_maker(column):
return np.median([int(x.replace(',','')) for x in column]) # dictionary_of_dataframes contains a dataframe with inform e.g title is 'Data Scientist'# related_title_score_df is the dataframe of infor columns = ['title','score'] ### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871# code_title_df contains columns ['code','title']# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!) def job_title_location_matcher(title,location):
related_title_score_df = dictionary_of_dataframes[title]
# we limit dataframe1 to only those related_titles that are above
# a previously established threshold
related_title_score_df = related_title_score_df[title_score_df['score']&80]
#we merge the related titles with another table and its codes
codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()
# merge the two dataframes by the codes
merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
#limit the BLS data to the state we want
all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]
#calculate some summary statistics for the time we want
group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
#convert it all to strings so we can combine them all when writing to file
row_string = [str(x) for x in row]
return row_string
# if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
'do nothing'这里发生了神奇的事情:Python1234567891011121314151617181920212223242526272829303132333435363738394041#runs the function and puts the answers in the queuedef worker(row, q):
ans = job_title_location_matcher(row[0],row[1])
q.put(ans) # this writes to the file while there are still things that could be in the queue# this allows for multiple processes to write to the same file without blocking eachotherdef listener(q):
f = open(filename,'wb')
m = q.get()
if m =='kill':
f.write(','.join(m) + 'n')
f.close() def main():
#load all your data, then throw out all unnecessary tables/columns
filename = 'skill_TEST_POOL.txt'
#sets up the necessary multiprocessing tasks
manager = Manager()
q = manager.Queue()
pool = Pool(cpu_count() + 2)
watcher = pool.map_async(listener,(q,))
#titles_states is a dataframe of millions of job titles and states they were found in
for i in titles_states.iloc:
job = pool.map_async(worker, (i, q))
jobs.append(job)
for job in jobs:
q.put('kill')
pool.close()
pool.join() if __name__ == "__main__":
main()由于每个数据帧的大小都不同(总共约有100Gb),所以将所有数据都放入内存是不可能的。通过将最终的数据帧逐行写入内存,但从来不在内存中存储完整的数据帧。我们可以完成所有的计算和组合任务。这里的“标准方法”是,我们可以仅仅在“job_title_location_matcher”的末尾编写一个“write_line”方法,但这样每次只会处理一个实例。根据我们需要处理的职位/州的数量,这大概需要2天的时间。而通过multiprocessing,只需2个小时。虽然读者可能接触不到本教程处理的任务环境,但通过multiprocessing,可以突破许多计算机硬件的限制。本例的工作环境是c3.8xl ubuntu ec2,硬件为32核60Gb内存(虽然这个内存很大,但还是无法一次性放入所有数据)。这里的关键之处是我们在60Gb的内存的机器上有效的处理了约100Gb的数据,同时速度提升了约25倍。通过multiprocessing在多核机器上自动处理大规模的进程,可以有效提高机器的利用率。也许有些读者已经知道了这个方法,但对于其他人,可以通过multiprocessing能带来非常大的收益。顺便说一句,这部分是这篇博文的延续。
没有更多推荐了,扫一扫体验手机阅读
【Python之旅】第六篇(六):Python多进程使用
<span type="1" blog_id="1702270" userid='
256篇文章,118W+人气,602粉丝
我是叶子哈,欢迎大家多多支持我写的文章!
大数据时代的微服务之路
¥51.00433人订阅
<span type="1" blog_id="1702270" userid='基于ArcGIS的python编程 11、利用多进程优化根据Excel表格批量生成点数据,批量裁剪有问题,上知乎。知乎作为中文互联网最大的知识分享平台,以「知识连接一切」为愿景,致力于构建一个人人都可以便捷接入的知识分享网络,让人们便捷地与世界分享知识、经验和见解,发现更大的世界。前面《》的程序顺利跑完大概要1小时20分,确实很慢,想起之前看到有利用python的多线程多进程去提高效率的案例,这里尝试利用多线程多进程对程序进行优化,提高程序的运行效率。自己对于多线程多进程的理解与应用都是一知半解,这里就不再班门弄斧,分享一篇文章《》,我觉得这蛮好解释了多进程多线程的含义。有兴趣的可以去好好理解一下。批量生成点数据工具的优化(多进程的运用):思路还是与上一篇一样,只不过把上一次的代码进行功能拆分,拆分为一个个功能函数,然后通过多进程多线程去充分利用电脑的CPU,多核同时运行同一个功能函数,这样就可以提高程序的效率。因为对多线程多进程不太熟悉,只能参考别人的案例,去模仿;虽然也应用了多线程多进程,但是程序还有很大的改善空间,刚刚接触这个新的知识,只能接下来继续去完善。完整代码如下:#coding=utf-8
import arcpy
import xlrd
import time
import multiprocessing
#获得表格的名字
def gettablename(in_table):
tablenamelist=[]
tablepaths=in_table.split(';')
for path in tablepaths:
name=path.split('\\')[-1].split('.')[0]
tablenamelist.append(name)
return tablenamelist
#根据表名,保存路径,坐标参考新建点要素
def CreateFeaturclass(tablename,savepath,spatial):
if arcpy.Exists(savepath + '\\' + tablename + '.shp')==False:
arcpy.CreateFeatureclass_management(savepath,tablename,'POINT','','','',spatial)
#根据表的路径,保存的路径,为刚刚创建的点要素添加字段
def AddField(tablepath,savepath):
featurename=tablepath.split('\\')[-1].split('.')[0]
data=xlrd.open_workbook(tablepath)
sh=data.sheets()[0]
ziduanNamelist=sh.row_values(0)
for ziduan in ziduanNamelist:
arcpy.AddField_management(savepath+'\\'+featurename+'.shp',ziduan,'TEXT')
#根据表名,点要素的路径,表里的内容写入点要素中
def InsertRow(savepath,tablepath):
featurename = tablepath.split('\\')[-1].split('.')[0]
data = xlrd.open_workbook(tablepath)
sh = data.sheets()[0]
ziduanNamelist = sh.row_values(0)
rownum = sh.nrows
Insercur = arcpy.InsertCursor(savepath + '\\' + featurename + '.shp')
for rowid in range(1,rownum):
newpnt = arcpy.Point()
newpnt.X = float(str(sh.cell(rowid, 2).value))
newpnt.Y = float(str(sh.cell(rowid, 1).value))
pointGeo = arcpy.PointGeometry(newpnt)
newrow = Insercur.newRow()
newrow.shape = pointGeo
for c in range(0, len(ziduanNamelist)):
newrow.setValue(ziduanNamelist[c],sh.cell(rowid,c).value)
Insercur.insertRow(newrow)
if __name__=="__main__":
in_table = arcpy.GetParameterAsText(0)
savepath = arcpy.GetParameterAsText(1)
spatial = arcpy.GetParameterAsText(2)
tablenamelist=gettablename(in_table)
tablpathlist = in_table.split(';')
pool=multiprocessing.Pool(processes=4)#进程processes的个数根据电脑的核数,我的电脑是4核
#利用多进程创建要素
for tablename in tablenamelist:
pool.apply_async(CreateFeaturclass,(tablename,savepath,spatial,))
pool.close()
pool.join()
time.sleep(1)
#利用多进程添加字段,注意多次运用多进程需要重新创建进程池Pool
pool = multiprocessing.Pool(processes=4)
for tablepath in tablpathlist:
pool.apply_async(AddField,(tablepath,savepath,))
pool.close()
pool.join()
time.sleep(1)
#利用多进程插入数据
pool = multiprocessing.Pool(processes=4)
for tablepath in tablpathlist:
pool.apply_async(InsertRow,(savepath,tablepath,))
pool.close()
pool.join()
前面的文章有些网友问我要源代码,其实源代码我都已经全部帖子文章中了,只需要复制这些代码下来保存为 .py文件,然后再Arcmap中创建脚本工具即可运行,前面的文章已有创建脚本工具的过程,详细过程请参考前面的文章《》;这里不再详细阐述,只是简单的说明一下注意情况;参数的设置:三个,注意参数的数据类型,是否多值就好。如下图1图1 工具参数设置工具的界面如图2图2 工具的页面工具的运行模式:选择第一个,这样的话易于调试,另外如果选择第二个,运行脚本时会不停的打开Arcmap这个程序,有点奇怪;如图3图3 工具的运行模式运行成功页面如图4;效率有点难以置信,上面一篇文章运行需要1小20分钟左右,相当漫长,这里利用多进程以后,只需要2分钟左右,效率提高了40多倍,节省了很多时间,看来多进程是个好东西,要好好运用。图4 运行成功后的页面批量裁剪的工具的优化:这里添加的功能主要是使裁剪要素也可以输入多个,然后利用循环的嵌套,以实现多对对的裁剪功能。因为很多时候我们都遇到过分类或者提取某个要素的需求;比如说我有梅州市的基础数据(国道,省道,县道,高速公路,水系,POI点等),现在我需要梅州行政区域内每个县区的基础数据,那么我们就需要利用梅州市的各个县区的行政区域(面要素,多个)区裁剪梅州市的基础数据(国道,省道,县道,高速公路,水系,POI点等),这样就可以获得每个县区的基础数据了。以前不会程序,也做过类似重复的工作,只能一个面与一个基础数据去裁剪,工作量挺大的。完整代码如下:# coding=UTF-8
import arcpy
in_feature= arcpy.GetParameterAsText(0)
in_featurepath = in_feature.split(';')
clip_feature = arcpy.GetParameterAsText(1)
clip_feature_paths = clip_feature.split(';')
out_file = arcpy.GetParameterAsText(2) #保存路径
for clipfeature in clip_feature_paths:
for in_layer in in_featurepath:
name2 = clipfeature.split('\\')[-1].split('.')[0]
name1=in_layer.split('\\')[-1].split('.')[0]
name=name1+"_"+name2
out_layer=out_file+'\\'+name
arcpy.Clip_analysis(in_layer, clipfeature, out_layer)
如果多对对裁剪的数量比较大,也需要一定的时间去运行的,如果想提高效率,也可参考上面程序的案例,巧用多进程即可,这个工具就不再详细阐述了,读者可以自行尝试一下。总结:有点奇怪,在python的独立环境中,读取Excel的时候,路径中不能出现中文,否则会报错;但是在Arcgis脚本工具运行时却可以包含中文,不知道什么原因。另外,之前以为python的xlrd模块只能操作.xls版本的表格,但是经过这两次实验,原来xlrd模块可以操作.xls或者.xlsx版本的表格,之前的认知不够。程序中多次利用进程池Pool调用函数,因为对多进程不太熟悉,所以只创建了一次进程池pool=multiprocessing.Pool(processes=4),导致了第二次利用多进程运行AddField()函数总是报错,后面每使用一次进程池就创建一个,这样就解决了错误。使用多进程会使电脑的使用内存瞬间爆满,此时电脑很容易卡死或者崩溃;使用time模块的sleep()函数睡眠几秒钟(time.sleep(1),让程序睡眠/暂停1秒钟),可以有效防止电脑由于运行过快,使用内存过高而崩溃。追求高效率的同时,也要注意,要不然电脑很容易吃不消。另外,自己想在插入属性值的时候(newrow.setValue(ziduanNamelist[c],sh.cell(rowid,c).value))使用多进程,因为如果有好几万个点,如果多进程运行这个函数,同时插入属性值,那么这样效率就大大提高了;这个思路没错,但是忽略了游标里设有表锁,就是防止多进程同时操作一张表,官网的表述如下(斜体字部分为从Esri官网摘录):游标和锁定插入和更新游标遵循由 ArcGIS 应用程序设置的表锁。锁能够防止多个进程同时更改同一个表。有两种锁的类型:共享和排它。只要访问表或数据集就会应用共享锁。同一表中可以存在多个共享锁,但存在共享锁时,将不允许存在排它锁。应用共享锁的示例包括:在 ArcMap 中显示要素类时以及在 ArcCatalog 中预览表时。对表或要素类进行更改时,将应用排它锁。在 ArcGIS 中应用排它锁的示例包括:在 ArcMap 中编辑和保存要素类时;在 ArcCatalog 中更改表的方案时;或者在 Python IDE(例如 PythonWin)中在要素类上使用插入游标时。如果数据集上存在排它锁,则无法为表或要素类创建更新和插入游标。 或
函数会因数据集上存在排它锁而失败。如果这些函数成功地创建了游标,它们将在数据集上应用排它锁,从而使两个脚本无法在同一数据集上创建更新和插入游标。在 Python 中,在游标释放前保持锁定状态。否则,将会阻止所有其他应用程序或脚本访问数据集,而这是毫无必要的。可通过以下其中一种方法来释放游标:在 with 语句中加入游标,这样可以确保无论游标是否成功完成,都将释放锁在游标上调用 reset()完成游标使用 Python 的 del 语句显示删除游标ArcMap 中的编辑会话将在其会话期间对数据应用共享锁。保存编辑内容时将应用排它锁。已经存在排它锁时,数据集是不可编辑的。欢迎大家一起交流,一起学习,一起进步!参考:Esri15分享收藏文章被以下专栏收录菜鸟一个,第一次在论坛写文章,有什么不足的地方还望大家不吝指点。Python多进程分块读取超大文件的方法
转载 &更新时间:日 09:52:10 & 作者:asdfsx
这篇文章主要介绍了Python多进程分块读取超大文件的方法,涉及Python多进程操作与文件分块读取的相关技巧,需要的朋友可以参考下
本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:
读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件
# -*- coding: GBK -*-
import urlparse
import datetime
from multiprocessing import Process,Queue,Array,RLock
多进程分块读取文件
WORKERS = 4
BLOCKSIZE =
FILE_SIZE = 0
def getFilesize(file):
获取要读取文件的大小
global FILE_SIZE
fstream = open(file,'r')
fstream.seek(0,os.SEEK_END)
FILE_SIZE = fstream.tell()
fstream.close()
def process_found(pid,array,file,rlock):
global FILE_SIZE
global JOB
global PREFIX
pid:进程编号
array:进程间共享队列,用于标记各进程所读的文件块结束位置
file:所读文件名称
各个进程先从array中获取当前最大的值为起始位置startpossition
结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)&FILE_SIZE else FILE_SIZE
if startpossition==FILE_SIZE则进程结束
if startpossition==0则从0开始读取
if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理
if 当前位置 &=endpossition 就readline
否则越过边界,就从新查找array中的最大值
fstream = open(file,'r')
while True:
rlock.acquire()
print 'pid%s'%pid,','.join([str(v) for v in array])
startpossition = max(array)
endpossition = array[pid] = (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)&FILE_SIZE else FILE_SIZE
rlock.release()
if startpossition == FILE_SIZE:#end of the file
print 'pid%s end'%(pid)
elif startpossition !=0:
fstream.seek(startpossition)
fstream.readline()
pos = ss = fstream.tell()
ostream = open('/data/download/tmp_pid'+str(pid)+'_jobs'+str(endpossition),'w')
while pos&endpossition:
line = fstream.readline()
ostream.write(line)
pos = fstream.tell()
print 'pid:%s,startposition:%s,endposition:%s,pos:%s'%(pid,ss,pos,pos)
ostream.flush()
ostream.close()
ee = fstream.tell()
fstream.close()
def main():
global FILE_SIZE
print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")
file = "/data/pds/download/scmcc_log/tmp_format_2011004.log"
getFilesize(file)
print FILE_SIZE
rlock = RLock()
array = Array('l',WORKERS,lock=rlock)
threads=[]
for i in range(WORKERS):
p=Process(target=process_found, args=[i,array,file,rlock])
threads.append(p)
for i in range(WORKERS):
threads[i].start()
for i in range(WORKERS):
threads[i].join()
print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")
if __name__ == '__main__':
更多关于Python相关内容感兴趣的读者可查看本站专题:《》、《》及《》
希望本文所述对大家Python程序设计有所帮助。
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具博客访问: 3334081
博文数量: 275
博客积分: 7846
博客等级: 少将
技术积分: 6485
注册时间:
分类: Mysql/postgreSQL 17:20:15
& & & 前两天开了两个进程,把Python抓回的数据链接并发写入Mysql中,结果显示出错。后来一查才知道需要自己设置锁,好生麻烦。这时PostgreSQL进入了我的视野,因为这家伙原生就是多进程的,但它是否支持多进程并发写入呢,还需要实际实验一下才知道。
2、安装PostgreSQL
& & &第一步,进入官网:http://www.postgresql.org/,点击Download
& & &第二步,选择操作系统对应的版本
& & & 第三步,我选择的Windows平台,因此下载说明该套件还包括视窗管理工具pgAdmin III。
& & & 继续下载,我选择的是64位,然后安装。接下来就是用pgAdmin创建数据库和表,老一套了,在此省略不表。
3、编写Python脚本
& & &首先,需要安装Psycopg,这是Python访问PostgreSQL的链接库。官网上说windows版本需要下载安装包,其实也有pip的安装方式:
pip install psycopg2
& & & 遗憾的是pip方式出现了问题,不知道是不是阅兵时的网络问题。
& & & 所以,我选择下载安装包:
4、测试结果
& & & 写入测试结果显示:可以实现两个进程对同一数据表的并发写入
& & &但是,我也将同样的代码跑了一下Mysql,发现并发的很好。。。郁闷
& & &所以,现在要更多的测试
# -*- coding: utf-8 -*-
import threading,time
#import psycopg2
import MySQLdb
import string
def multiGet(who):
&&&&start = time.clock()
&&&&#conn = psycopg2.connect(user='postgres',password='123456',database='links',host='localhost',port="5432")
&&&&conn = MySQLdb.connect(host='localhost',user='root',passwd='',db='rmrb')
&&&&cursor = conn.cursor()
&&&&for i in range(1000):
&&&&&&&&sql = "INSERT INTO delta (ID,WHO) VALUES (NULL, '" + who + "' );"
&&&&&&&&cursor.execute(sql)
&&&&conn.commit()
&&&&cursor.close()
&&&&conn.close()
&&&&end = time.clock()
&&&&print who + " processing time is: %f s" % (end - start)
task1 = threading.Thread(target = multiGet, args = ("a"))
task1.start()
task2 = threading.Thread(target = multiGet, args = ("b"))
task2.start()
& & & 这时,出现了问题,主要是关键字段ID不能为空。所以,改善一下代码:
# -*- coding: utf-8 -*-
import threading,time
#import psycopg2
import MySQLdb
import string
def multiGet(who):
&&&&start = time.clock()
&&&&#conn = psycopg2.connect(user='postgres',password='123456',database='links',host='localhost',port="5432")
&&&&conn = MySQLdb.connect(host='localhost',user='root',passwd='',db='rmrb')
&&&&cursor = conn.cursor()
&&&&for i in range(1000):
&&&&&&&&if who == 'a':
&&&&&&&&&&&&sql = "INSERT INTO delta (ID,WHO) VALUES (" + str(i) + ", '" + who + "' );"
&&&&&&&&else:
&&&&&&&&&&&&sql = "INSERT INTO delta (ID,WHO) VALUES (" + str(i+1000) + ", '" + who + "' );"
&&&&&&&&cursor.execute(sql)
&&&&conn.commit()
&&&&cursor.close()
&&&&conn.close()
&&&&end = time.clock()
&&&&print who + " processing time is: %f s" % (end - start)
task1 = threading.Thread(target = multiGet, args = ("a"))
task1.start()
task2 = threading.Thread(target = multiGet, args = ("b"))
task2.start()
& & & Mysql的结果如下:
& & & 上述结果是最后全部写入的,改成每条都commit呢?结果如下:
b processing time is: 0.161019 sa processing time is: 0.162407 s
& & &但是,这是InnoDB引擎的结果,数据量特大时,这个引擎超级慢。所以设置为MyISAM再试试。
a processing time is: 0.160377 sb processing time is: 0.159764 s
& & & 速度加快了,程度的调度其实还是分片,a一片然后b一片。
& & &看来Mysql足够,只是需要在关键字段的设置上做好功夫就可以。
阅读(6250) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
请登录后评论。}

我要回帖

更多关于 java 将数据放入缓存 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信