社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

如何在进程池中使用共享/托管字典(Python 3.x)

Paul • 5 年前 • 1692 次点击  

我正在做一个项目,需要我从一些文件中提取大量信息。关于这个项目的格式和大部分信息与我要问的问题无关。我不知道如何将此字典与进程池中的所有进程共享。

这是我的代码(更改了变量名,删除了大部分代码,只需要知道部分):

import json

import multiprocessing
from multiprocessing import Pool, Lock, Manager

import glob
import os

def record(thing, map):

    with mutex:
        if(thing in map):
            map[thing] += 1
        else:
            map[thing] = 1


def getThing(file, n, map): 
    #do stuff
     thing = file.read()
     record(thing, map)


def init(l):
    global mutex
    mutex = l

def main():

    #create a manager to manage shared dictionaries
    manager = Manager()

    #get the list of filenames to be analyzed
    fileSet1=glob.glob("filesSet1/*")
    fileSet2=glob.glob("fileSet2/*")

    #create a global mutex for the processes to share
    l = Lock()   

    map = manager.dict()
    #create a process pool, give it the global mutex, and max cpu count-1 (manager is its own process)
    with Pool(processes=multiprocessing.cpu_count()-1, initializer=init, initargs=(l,)) as pool:
        pool.map(lambda file: getThing(file, 2, map), fileSet1) #This line is what i need help with

main()

据我所知,lamda函数应该可以工作。我需要帮助的行是:pool.map(lambda file:getThing(file,2,map),fileSet1)。这给了我一个错误。给出的错误是“AttributeError:Cant pickle local object'main..'”。

任何帮助都将不胜感激!

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/49837
 
1692 次点击  
文章 [ 1 ]  |  最新文章 5 年前
elemakil
Reply   •   1 楼
elemakil    6 年前

为了并行执行任务 multiprocessing “pickles”任务函数。在你的例子中,这个“任务函数”是 lambda file: getThing(file, 2, map) .

不幸的是,默认情况下,lambda函数不能在python中进行pickle(另请参见 this stackoverflow post ). 让我用最少的代码来说明这个问题:

import multiprocessing

l = range(12)

def not_a_lambda(e):
    print(e)

def main():
    with multiprocessing.Pool() as pool:
        pool.map(not_a_lambda, l)        # Case (A)
        pool.map(lambda e: print(e), l)  # Case (B)

main()

案例A 我们有一个适当的,自由的功能,可以腌制 pool.map 手术会成功的。在 案例B 我们有一个lambda函数,会发生崩溃。

一种可能的解决方案是使用适当的模块作用域函数(如 not_a_lambda ). 另一种解决方案是依赖第三方模块,如 dill ,以扩展酸洗功能。在后一种情况下,您可以使用 pathos 作为常规的替代品 多处理 模块。最后,您可以创建 Worker 收集您的 共享状态 作为成员。可能是这样的:

import multiprocessing

class Worker:
    def __init__(self, mutex, map):
        self.mutex = mutex
        self.map = map

    def __call__(self, e):
        print("Hello from Worker e=%r" % (e, ))
        with self.mutex:
            k, v = e
            self.map[k] = v
        print("Goodbye from Worker e=%r" % (e, ))

def main():
    manager = multiprocessing.Manager()
    mutex = manager.Lock()
    map = manager.dict()

    # there is only ONE Worker instance which is shared across all processes
    # thus, you need to make sure you don't access / modify internal state of
    # the worker instance without locking the mutex.
    worker = Worker(mutex, map)

    with multiprocessing.Pool() as pool:
        pool.map(worker, l.items())

main()