关于上一节读者某些疑问:为什么你用进程并行不是线程并行?

回答:由于Python解释器有GIL(全局解释器锁),在单进程的解释器上有线程安全锁,也就是说每次只能一个线程访问解释器,因此Python在语法上的多线程(multithreads)实现是不会提高并行性能的。
这一点和C\C++上的编译级别的并行是不一样的,Python能做到的极限是多进程的解释级别并行。(上一节我实现的是多进程并行,和老师课上MPI的多线程是不一样的!!)

0. 引言

OpenMP是一种C/C++的并行编译标准方案,严谨地说,在Python上使用OpenMP是不可能的,因为本来Python是一门解释语言。

但如果在某个解释流程实现并行优化是可行的,也有一些方案。

如下所示C和Python在实现OpenMP接口的可能性对比:

在这里插入图片描述

如上两个地方可以实现mp的优化,分别为:在可交互级别的转化在解释阶段级别的转化

  • 在可交互级别的转化:一些第三方包pymp,pyopenmp,使用fork脱离GIL

  • 在解释阶段级别的转化:使用Cython直接编写C code,这样写出来的module在解释的时候会被解释器做并行优化。

早期Cython是不支持openmp的,但如今已支持了,我们可以非常方便地使用Cython语法编写支持openmp并行优化编译的代码。

因此,本文通过第一种实现方式对上一节的矩阵乘法优化做探究。

生成示例矩阵向量、计算代码都使用上一节内容。

链接高性能计算的矩阵乘法优化 - Python +MPI的实现

import numpy as np
from functools import wraps
import time

def generate_example_matrix(h, w):
    _vs = (-1, 2, -1)
    _i = -1  # shift bits
    example_data = np.zeros([h, w], dtype=np.int32)
    for i in range(3):
        example_data_eye_mask = np.eye(h, w, _i + i,
                                       dtype=np.bool_)  # build eyes and shift
        example_data[example_data_eye_mask == True] = _vs[i]
    return example_data


def generate_example_vector(w):
    _rest_dict = {
        1: [1],
        2: [1, 2],
        3: [1, 2, 3],
    }
    rest_bits = int(w % 3)
    repeat_times = int(w // 3)
    example_vector = np.repeat([[1, 2, 3]], repeat_times, axis=0).ravel()

    if rest_bits > 0:
        tail_vec = _rest_dict[rest_bits]
        tail_vec = np.array(tail_vec, dtype=np.int32)
        example_vector = np.concatenate([example_vector, tail_vec], axis=0)
    return example_vector
    

计算的naive code如下:

def naive_method(example_matrix, example_vector):
    result = []
    h, w = example_matrix.shape
    for hi in range(h):
        colv = example_matrix[hi, :]
        temp = 0
        for wi in range(w):
            temp += colv[wi] * example_vector[wi]
        result.append(temp)
    return np.array(result)

单进程单线程执行:

from utils import generate_example_matrix, generate_example_vector
import pymp
import time
import numpy as np

def naive_method(example_matrix, example_vector):
    result = []
    h, w = example_matrix.shape
    for hi in range(h):
        colv = example_matrix[hi, :]
        temp = 0
        for wi in range(w):
            temp += colv[wi] * example_vector[wi]
        result.append(temp)
    return np.array(result)

def main():
    start_time = time.perf_counter()
    h = 5000
    w = 5000
    print('--- Current matrix scale is: {} x {} ---'.format(h,w))
    example_matrix = generate_example_matrix(h, w)
    example_vector = generate_example_vector(w)
    result = naive_method(example_matrix, example_vector)
    end_time = time.perf_counter()
    print('single method used time is: {:.2f}s\n'.format(end_time - start_time))

if __name__ == '__main__':
    main()

1. 在可交互级别的转化

一个实现上是openmp stye的第三方库:pymp-pypi.

注意,该方法不能在Windows上实现,因为他使用fork来绕过GIL的。

操作系统的fork方法可以绕过GIL(全局解释器锁),从而实现多线程,这比Python原生的multithreads会更加并行高效。

题外话:当然还有一堆其他类似的第三方库,看看别人的教程也可以轻松实现。我这里只挑一个

安装pip install pymp-pypi

基本用法

优化前:

ex_array = np.zeros((100,), dtype='uint8')

for index in range(0, 100):
    ex_array[index] = 1
    print('Yay! {} done!'.format(index))
    

优化后:

import pymp

ex_array = pymp.shared.array((100,), dtype='uint8')

with pymp.Parallel(4) as p:
    for index in p.range(0, 100):
        ex_array[index] = 1
        # The parallel print function takes care of asynchronous output.
        p.print('Yay! {} done!'.format(index))

1.1 OpenMP Style的改写

将该用法改写到我们原来的运算代码中:

优化后:

from utils import generate_example_matrix, generate_example_vector
import pymp
import time
import numpy as np

def naive_multi_method(example_matrix, example_vector, threads):
    # result = pymp.shared.list()
    result = []
    h, w = example_matrix.shape
    with pymp.Parallel(num_threads = threads) as p:
        for hi in p.range(0, h):
            colv = example_matrix[hi, :]
            temp = 0
            for wi in p.range(0, w):
                temp += colv[wi] * example_vector[wi]
            result.append(temp)

def main():
    start_time = time.perf_counter()
    h = 50000
    w = 50000
    threads_num = 200 # 250, 200, 100, 50, 25, 16, 8
    print('--- Current matrix scale is: {} x {} ---'.format(h,w))
    print('<- Threads num is: {} ->'.format(threads_num))
    example_matrix = generate_example_matrix(h, w)
    example_vector = generate_example_vector(w)
    result = naive_multi_method(example_matrix, example_vector, threads = threads_num)
    end_time = time.perf_counter()
    print('multi-thread method used time is: {:.2f}s\n'.format(end_time - start_time))
    return np.array(result)
if __name__ == '__main__':
    main()

2.2 实验对比:

相较于上一次的实验,我换了一台设备,CPU配置如下:

11th Gen Intel® Core™ i5-11600K @ 3.90GHz,此外,我超频到了4.5GHz

这一次直接测试50000规模的运算。

单线程运行:

--- Current matrix scale is: 50000 x 50000 ---
single method used time is: 475.64s

Baseline:475秒

多线程优化之后

8线程

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 8 ->
multi-thread method used time is: 19.46s

T8:19秒

16线程:13秒

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 16 ->
multi-thread method used time is: 13.28s

T16:13秒

25线程

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 25 ->
multi-thread method used time is: 11.12s

T25:11秒

50线程

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 50 ->
multi-thread method used time is: 9.18s

T50:9.18秒

100线程

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 100 ->
multi-thread method used time is: 8.27s

T100:8.27秒

200线程

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 200 ->
multi-thread method used time is: 8.23s

T200:8.23秒

250线程

--- Current matrix scale is: 50000 x 50000 ---
<- Threads num is: 250 ->
multi-thread method used time is: 8.47s

T250:8.47秒

从以上数据可见,线程瓶颈数在200到250之间,接下来可以使用二分查找测试出最佳的线程数

Logo

汇聚原天河团队并行计算工程师、中科院计算所专家以及头部AI名企HPC专家,助力解决“卡脖子”问题

更多推荐