问题的背景是对于一个大批量的数据,想要分割成多个小批量然后使用多进程的方式进行处理,目前这一步已经实现,现在想加上一个进度条显示总的处理进程,目前调研出的结果都是只能显示每个小批量的完成进度(tqdm 配合 imap),每个小批量数据必须在一个进程内进行处理,不能分开(也就是不能将整个大批量数据直接丢给 imap)
举个例子可能更好理解:有 10000 条数据,我想分 10 份 1000 条数据的小批量用 10 个进程进行处理,我想要的效果是[0/10000]这种,而 tqdm 配合 imap 实现效果是[0/10]这种。重点在于 如何在子进程的 for 循环中更新 tqdm 进度条
伪代码如下:
def worker(data, pbar):
for i in data:
print(data)
pbar.update(1)
if __name__ == "__main__":
data = [1]*10000
pbar = tqdm.tqdm(10000)
for i in range(0, 10000, 1000):
p_list.append(Process(target=worker, args=(data[i, i+1000],pbar)))
p_list[-1].start()
for p in p_list:
p.jion()
希望大佬能指点一下,感谢
感谢各位的回复,经过2,3楼大佬的提醒,使用Queue实现了子进程和主进程之间的通信,实现伪代码如下,遇到同样需求的朋友可以参考一下:
def worker(data, pbar_queue):
for i in data:
print(data)
pbar_queue.put(1)
if __name__ == "__main__":
data = [1]*10000
pbar = tqdm.tqdm(10000)
pbar_queue = Queue()
for i in range(0, 10000, 1000):
p_list.append(Process(target=worker, args=(data[i, i+1000],pbar_queue)))
p_list[-1].start()
with tqdm(total=10000) as pbar:
pbar_queue.get()
pbar.update(1)
for p in p_list:
p.jion()
再次感谢🙏
1
renmu 2022-09-20 23:00:25 +08:00 via Android
写个全局变量,每次处理完毕后加一,但是这样处理好像还要加锁。
或者手动调用 tqdm 的 update 函数,虽然我怀疑大概率也会出问题。 要么就单独处理每个进程内的进度条,分别显示十个任务的进度条 |
2
MasterCai OP |
3
leonshaw 2022-09-21 00:01:43 +08:00 1
semaphore 或者 shared memory + 锁,主进程更新
|
4
ysc3839 2022-09-21 00:57:45 +08:00 via Android 1
共享内存,子进程写主进程读,然后把所有子进程的数值相加传递给 tqdm
|