Py学习  »  Python

如何在进程池中使用共享/托管字典(Python 3.x)

Paul • 5 年前 • 1777 次点击  

我正在做一个项目,需要我从一些文件中提取大量信息。关于这个项目的格式和大部分信息与我要问的问题无关。我不知道如何将此字典与进程池中的所有进程共享。

这是我的代码(更改了变量名,删除了大部分代码,只需要知道部分):

import json

import multiprocessing
from multiprocessing import Pool, Lock, Manager

import glob
import os

def record(thing, map):

    with mutex:
        if(thing in map):
            map[thing] += 1
        else:
            map[thing] = 1


def getThing(file, n, map): 
    #do stuff
     thing = file.read()
     record(thing, map)


def init(l):
    global mutex
    mutex = l

def main():

    #create a manager to manage shared dictionaries
    manager = Manager()

    #get the list of filenames to be analyzed
    fileSet1=glob.glob("filesSet1/*")
    fileSet2=glob.glob("fileSet2/*")

    #create a global mutex for the processes to share
    l = Lock()   

    map = manager.dict()
    #create a process pool, give it the global mutex, and max cpu count-1 (manager is its own process)
    with Pool(processes=multiprocessing.cpu_count()-1, initializer=init, initargs=(l,)) as pool:
        pool.map(lambda file: getThing(file, 2, map), fileSet1) #This line is what i need help with

main()

据我所知,lamda函数应该可以工作。我需要帮助的行是:pool.map(lambda file:getThing(file,2,map),fileSet1)。这给了我一个错误。给出的错误是“AttributeError:Cant pickle local object'main..'”。

任何帮助都将不胜感激!

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/49837
文章 [ 1 ]  |  最新文章 5 年前
elemakil
Reply   •   1 楼
elemakil    6 年前

为了并行执行任务 multiprocessing “pickles”任务函数。在你的例子中,这个“任务函数”是 lambda file: getThing(file, 2, map) .

不幸的是,默认情况下,lambda函数不能在python中进行pickle(另请参见 this stackoverflow post ). 让我用最少的代码来说明这个问题:

import multiprocessing

l = range(12)

def not_a_lambda(e):
    print(e)

def main():
    with multiprocessing.Pool() as pool:
        pool.map(not_a_lambda, l)        # Case (A)
        pool.map(lambda e: print(e), l)  # Case (B)

main()

案例A 我们有一个适当的,自由的功能,可以腌制 pool.map 手术会成功的。在 案例B 我们有一个lambda函数,会发生崩溃。

一种可能的解决方案是使用适当的模块作用域函数(如 not_a_lambda ). 另一种解决方案是依赖第三方模块,如 dill ,以扩展酸洗功能。在后一种情况下,您可以使用 pathos 作为常规的替代品 多处理 模块。最后,您可以创建 Worker 收集您的 共享状态 作为成员。可能是这样的:

import multiprocessing

class Worker:
    def __init__(self, mutex, map):
        self.mutex = mutex
        self.map = map

    def __call__(self, e):
        print("Hello from Worker e=%r" % (e, ))
        with self.mutex:
            k, v = e
            self.map[k] = v
        print("Goodbye from Worker e=%r" % (e, ))

def main():
    manager = multiprocessing.Manager()
    mutex = manager.Lock()
    map = manager.dict()

    # there is only ONE Worker instance which is shared across all processes
    # thus, you need to make sure you don't access / modify internal state of
    # the worker instance without locking the mutex.
    worker = Worker(mutex, map)

    with multiprocessing.Pool() as pool:
        pool.map(worker, l.items())

main()