拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 在回圈中保存影像比多执行绪/多处理更快

在回圈中保存影像比多执行绪/多处理更快

白鹭 - 2022-02-13 2184 0 0

这是将不同大小的多个影像阵列保存在回圈中以及同时使用执行绪/行程的定时示例:

import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter

import numpy as np
from cv2 import cv2


def save_img(idx, image, dst):
    cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)


if __name__ == '__main__':
    l1 = np.random.randint(0, 255, (100, 50, 50, 1))
    l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
    l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
    temp_dir = tempfile.mkdtemp()
    workers = 4
    t1 = perf_counter()
    for ll in l1, l2, l3:
        t = perf_counter()
        for i, img in enumerate(ll):
            save_img(i, img, temp_dir)
        print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
        for executor in ThreadPoolExecutor, ProcessPoolExecutor:
            with executor(workers) as ex:
                futures = [
                    ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
                ]
                for f in as_completed(futures):
                    f.result()
            print(
                f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
            )

我在 i5 mbp 上得到这些持续时间:

Time for 100: 0.09495482999999982 seconds
Time for 100 (ThreadPoolExecutor): 0.14151873999999998 seconds
Time for 100 (ProcessPoolExecutor): 1.5136184309999998 seconds
Time for 1000: 0.36972280300000016 seconds
Time for 1000 (ThreadPoolExecutor): 0.619205703 seconds
Time for 1000 (ProcessPoolExecutor): 2.016624468 seconds
Time for 10000: 4.232915643999999 seconds
Time for 10000 (ThreadPoolExecutor): 7.251599262 seconds
Time for 10000 (ProcessPoolExecutor): 13.963426469999998 seconds

难道执行绪/行程不需要更少的时间来实作同样的事情吗?在这种情况下为什么不呢?

uj5u.com热心网友回复:

代码中的计时t错误的,因为在测验池之前没有重置计时器然而,时间的相对顺序是正确的。一个可能的定时器复位代码是:

import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter

import numpy as np
from cv2 import cv2


def save_img(idx, image, dst):
    cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)

if __name__ == '__main__':
    l1 = np.random.randint(0, 255, (100, 50, 50, 1))
    l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
    l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
    temp_dir = tempfile.mkdtemp()
    workers = 4

    for ll in l1, l2, l3:
        t = perf_counter()
        for i, img in enumerate(ll):
            save_img(i, img, temp_dir)
        print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
        for executor in ThreadPoolExecutor, ProcessPoolExecutor:
            t = perf_counter()
            with executor(workers) as ex:
                futures = [
                    ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
                ]
                for f in as_completed(futures):
                    f.result()
            print(
                f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
            )

多执行绪速度更快,特别是对于 I/O 系结行程。在这种情况下,压缩影像是 CPU 密集型的,因此根据 OpenCV 和 python 包装器的实作,多执行绪可能会慢得多。在很多情况下,罪魁祸首是 CPython 的 GIL,但我不确定是否是这种情况(我不知道 GIL 是否在imwrite呼叫程序中被释放)。在我的设定(i7 8th gen)中,执行绪与 100 张影像的回圈一样快,而 1000 和 10000 张影像的速度几乎没有快。如果ThreadPoolExecutor重用执行绪,则将新任务分配给现有执行绪会产生开销。如果它不重用执行绪,则启动新执行绪会产生开销。

多处理绕过了 GIL 问题,但还有一些其他问题。首先,在行程之间传递资料需要一些时间,而在影像的情况下,它可能非常昂贵。其次,在 windows 的情况下,生成一个新行程需要很多时间。查看开销(行程和执行绪)的一个简单测验是将save_image函式更改为一个什么都不做但仍需要酸洗等函式:

def save_img(idx, image, dst):
    if idx != idx:
        print("impossible!")

并通过一个类似的没有自变量的方式来查看产生行程的开销等。

我的设定中的计时显示仅生成 10000 个行程需要 2.3 秒,而酸洗需要 0.6 秒,这远远超过处理所需的时间。

提高吞吐量并将开销保持在最低限度的一种方法是中断块上的作业,并将每个块提交给作业人员:

import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter

import numpy as np
from cv2 import cv2


def save_img(idx, image, dst):
    cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)

def multi_save_img(idx_start, images, dst):
    for idx, image in zip(range(idx_start, idx_start   len(images)), images):
        cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)


if __name__ == '__main__':
    l1 = np.random.randint(0, 255, (100, 50, 50, 1))
    l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
    l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
    temp_dir = tempfile.mkdtemp()
    workers = 4

    for ll in l1, l2, l3:
        t = perf_counter()
        for i, img in enumerate(ll):
            save_img(i, img, temp_dir)
        print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
        chunk_size = len(ll)//workers 
        ends = [chunk_size * (_ 1)  for _ in range(workers)]
        ends[-1]  = len(ll) % workers
        starts = [chunk_size * _  for _ in range(workers)]
        for executor in ThreadPoolExecutor, ProcessPoolExecutor:
            t = perf_counter()
            with executor(workers) as ex:
                futures = [
                    ex.submit(multi_save_img, start, ll[start:end], temp_dir) for (start, end) in zip(starts, ends)
                ]
                for f in as_completed(futures):
                    f.result()
            print(
                f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
            )

对于多处理和多执行绪方法,这应该比简单的 for 有显著的提升。

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *