Py学习  »  Python

多线程python中的同步与数据合并问题

andre ahmed • 4 年前 • 641 次点击  

我有一个需要在paralle处理的zip文件列表。

我就是这么做的:

 for i in range(len(list_files)):
        p.apply_async(process_data,  args=(general_pd, list_files[i], TimeStamp[i], S1_Sync, S2_Sync, S3_Sync, S4_Sync, S5_Sync, S6_Sync, S7_Sync, T1_Sync, T2_Sync, T3_Sync, T4_Sync, T5_Sync, Time_Sync))
    p.close()
    p.join()

s1_sync等是列表

S1_Sync = manager.list()
S2_Sync = manager.list()

过程数据:

  def process_data(general_dic, path, time, s1, s2, s3, s4, s5, s6, s7, t1, t2, t3, t4, t5, t_sync):

  if os.path.isfile(path):
                 s1.append(df_conv['C_strain_COY'].T.max())
                 s2.append(df_conv['C_strain_CUY'].T.max())
                 s3.append(df_conv['C_strain_ROX'].T.max())
                 s4.append(df_conv['C_strain_CUX'].T.max())
                 s5.append(df_conv['C_strain_CMX'].T.max())
                 s6.append(df_conv['C_strain_COX'].T.max())
                 s7.append(df_conv['C_strain_LOX'].T.max())

                 t1.append(df_conv['C_temp_CUY'].T.max())
                 t2.append(df_conv['C_temp_COY'].T.max())
                 t3.append(df_conv['C_temp_CUX'].T.max())
                 t4.append(df_conv['C_temp_CMX'].T.max())
                 t5.append(df_conv['C_temp_COX'].T.max())
                 print("file does exist at this time")
            else:
                s1.append(np.nan)
                s2.append(np.nan)
                s3.append(np.nan)
                s4.append(np.nan)
                s5.append(np.nan)
                s6.append(np.nan)
                s7.append(np.nan)
                t1.append(np.nan)
                t2.append(np.nan)
                t3.append(np.nan)
                t4.append(np.nan)
                t5.append(np.nan)

如果有一个文件,我会得到该数据的中值,并将其存储在s1、s2等中。

在p.close()之后,我将数据写入excel

print("writing data")
    result['TimeStamp'] = pd.Series(convert_proxy(Time_Sync))
    result['C_strain_COY'] = pd.Series(convert_proxy(S1_Sync))
    result['C_strain_CUY'] = pd.Series(convert_proxy(S2_Sync))
    result['C_strain_ROX'] = pd.Series(convert_proxy(S3_Sync))
    result['C_strain_CUX'] = pd.Series(convert_proxy(S4_Sync))
    result['C_strain_CMX'] = pd.Series(convert_proxy(S5_Sync))
    result['C_strain_COX'] = pd.Series(convert_proxy(S6_Sync))
    result['C_strain_LOX'] = pd.Series(convert_proxy(S7_Sync))

    result['C_temp_CUY'] = pd.Series(convert_proxy(T1_Sync))
    result['C_temp_COY'] = pd.Series(convert_proxy(T2_Sync))
    result['C_temp_CUX'] = pd.Series(convert_proxy(T3_Sync))
    result['C_temp_CMX'] = pd.Series(convert_proxy(T4_Sync))
    result['C_temp_COX'] = pd.Series(convert_proxy(T5_Sync))
    #general_pd.sort_values(by='TimeStamp')
    writer = pd.ExcelWriter(r'c:\ahmed\median_data_meter_1_max.xlsx', engine='xlsxwriter')
    # Convert the dataframe to an XlsxWriter Excel object.
    general_pd.to_excel(writer, sheet_name='Sheet1')
    # Close the Pandas Excel writer and output the Excel file.
    writer.save()

我将每个代理列表转换为常规列表并保存在excel中。

问题是我得到的数据是在我生成的时间戳的随机位置处理的,例如一些数据放在excel表中的错误位置。

是同步问题吗如何正确合并每个线程中的数据?

我就是这样生成时间戳的:

ts = datetime.datetime.strptime('2018-08-21', '%Y-%m-%d')
    end_ts = datetime.datetime.strptime('2018-08-23', '%Y-%m-%d')
    while ts < end_ts:
           print(ts)
           ts += datetime.timedelta(minutes=15)
           path = os.path.join(
               r'\\file01\vnxmo\\TIT\NAS\\Baudynamik\\_meter\\SpeedFT-meter1\\peakdata\\' + ts.strftime("%Y-%m-%d") + r'\\peakdata_' + ts.strftime(
                   "%Y%m%d_%H%M") + r'.bz2')
           TimeStamp.append(ts)
           list_files.append(path)
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/48846
 
641 次点击  
文章 [ 1 ]  |  最新文章 4 年前
Neil
Reply   •   1 楼
Neil    5 年前

如果一个进程是令人尴尬的并行的,并且线程之间不需要通信,那么我将只使用一个进程池上下文管理器你有很多争论,这可能很棘手,但有很多解决方法来传递争论我有时会使用嵌套函数或全局变量来转换成一个变量。否则,您总是可以发送一个列表或类似的列表。

from multiprocessing.dummy import Pool as ProcessPool # Only use this if you're CPU bound. If you're IO bound use a threadpool rather

my_zip_files = [
    ("arg1", "arg2", "arg3"),      # Whatever the arguments are. Just roll them into a tuple and store all the tuples in a list.
    ("arg12", "arg22", "arg23")
]

def do_something(arg):
    arg1 = arg[0]   # recover the individual arguments. Can use python's * syntax as well.
    arg2 = arg[1]
    arg3 = arg[2]
    result = _do_something(arg1, arg2, arg3)
    return result

def _do_something(arg1, arg2, arg3):
    <whatever logic>

with ProcessPool(4) as pool:
    result = pool.map(do_something, my_zip_files)
print(result) # And now the order of the files should be the same as the order you put them in. I've run some checks and order seems to be preserved but maybe check the docs to be sure.