社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

python multiprocessing模块进行多进程处理

生信修炼手册 • 5 年前 • 636 次点击  

欢迎关注”生信修炼手册”!
多进程可以有效利用服务器多核CPU的计算资源,加速运行效率,在python中,通过内置模块multiprocessing来进行多进程编程。
子进程通过Process类来设置,示例如下
from  multiprocessing import Processimport subprocessimport shlex
def cal_seqs(fq): print('calculate fastq sequences') cnt = 0 with open(fq) as f: for line in f: cnt += 1 print('fastq sequences : {}'.format(cnt / 4))
def fastqc(fq): cmd = 'fastqc -t 10 {}'.format(fq) cmd_args = shlex.split(cmd) p = subprocess.Popen(cmd_args, stdout = subprocess.PIPE) p.stdout.read()
if __name__ == '__main__': fq = 'test.fq' p = Process(target = fastqc, args = (fq, )) p.start() cal_seqs(fq) print('Finish calculate fastq sequences')
上述代码启动了一个子进程来运行fastqc, 主进程则执行计算fastq文件的序列数,运行上述代码,你会看到类似如下的输出
calculate fastq sequencesfastq sequences : 100000Finish calculate fastq sequencesStarted analysis of test.fqApprox 5% complete for test.fqApprox 10% complete for test.fqApprox 15% complete for test.fqApprox 20% complete for test.fq

主进程中的行数计算先执行完毕,子进程后执行完毕, 说明子进程是非阻塞的。在主进程中启动了子进程后,主进程不管子进程是否运行结束,接着往下执行计算行数的结果,这就是非阻塞。如果需要等子进程执行完毕后,主进程再直接执行,也就是阻塞式的运行,需要join函数来进行阻塞,上述代码修改如下

from  multiprocessing import Processimport subprocessimport shlex
def cal_seqs(fq): print('calculate fastq sequences') cnt = 0 with open(fq) as f: for line in f: cnt += 1 print('fastq sequences : {}'.format(cnt / 4))
def fastqc(fq): cmd = 'fastqc -t 10 {}'.format(fq) cmd_args = shlex.split(cmd) p = subprocess.Popen(cmd_args, stdout = subprocess.PIPE) p.stdout.read()
if __name__ == '__main__': fq = 'test.fq' p = Process(target = fastqc, args = (fq, )) p.start() p.join() cal_seqs(fq) print('Finish calculate fastq sequences')

再次运行,可以看到如下输出

Started analysis of test.fq


    
Approx 5% complete for test.fqApprox 10% complete for test.fq...Approx 95% complete for test.fqApprox 100% complete for test.fqcalculate fastq sequencesfastq sequences : 100000Finish calculate fastq sequences

join的作用就是阻塞子程序,等待子进程执行完毕之后,主进程才可以接着往下执行。如果只是需要执行某些程序,而且下文中也不依赖其结果,可以选择非阻塞式的运行,如果下文需要依赖其结果,就需要阻塞式的运行了,应该根据实际情况,灵活进行选择。

对于多个样本的重复处理,可以用多进程达到并行的目的,代码示例如下

process_list = []samples = ['control1', 'control2', 'control3', 'case1', 'case2', 'case3']for sample in sample:    fq = '{}.fq'.format(sample)    p = Process(target = fastqc, args = (sample, ))    p.start()    process_list.append(p)
# 根据需要选择是否阻塞for p in process_list: p.join()

上述代码有一个问题,会一次性将所有的样本都并行,在实际开发中,我们需要结合计算资源来控制最大的并行数,此时可以使用进程池Pool类,可以方便的指定最大进程数,示例如下

p = Pool(3)process_list = []samples = ['control1', 'control2', 'control3', 'case1', 'case2', 'case3']for sample in sample:    fq = '{}.fq'.format(sample)    p.append_async(target = fastqc, args = (sample, ))
p.close()p.join()

上述代码还可以用map方法进行改写,更加简便,改写之后的完整代码如下

from multiprocessing import Poolimport subprocessimport shlex
def fastqc(fq): cmd = 'fastqc -t 10 {}'.format(fq) cmd_args = shlex.split(cmd) p = subprocess.Popen(cmd_args, stdout = subprocess.PIPE) p.stdout.read()
if __name__ == '__main__': samples = ['control1.fq', 'control2.fq', 'control3.fq', 'case1.fq', 'case2.fq', 'case3.fq'] with Pool(3) as p: samples = ['control1.fq', 'control2.fq', 'control3.fq', 'case1.fq', 'case2.fq', 'case3.fq'] p.map(fastqc, samples) print('Finish')

以上就是python多进程的基本用法,除此以外,python还支持进程间通信以及共享变量,更高级的用法请查看官方的API文档。

·end·

—如果喜欢,快分享给你的朋友们吧—



原创不易,欢迎收藏,点赞,转发!生信知识浩瀚如海,在生信学习的道路上,让我们一起并肩作战!
本公众号深耕耘生信领域多年,具有丰富的数据分析经验,致力于提供真正有价值的数据分析服务,擅长个性化分析,欢迎有需要的老师和同学前来咨询。
  更多精彩



  写在最后


转发本文至朋友圈,后台私信截图即可加入生信交流群,和小伙伴一起学习交流。


扫描下方二维码,关注我们,解锁更多精彩内容!


一个只分享干货的

生信公众号



Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/63105
 
636 次点击