HashingEncoder-多进程支持

HashingEncoder-多进程支持

对数据做特征工程时哈希编码器是常用分类器,Scikit-Learn 扩展工具包提供了一个默认基于 MD5 的哈希工具,可以将特征转换为指定长度的数字特征;但这个工具包的默认哈希有一个很致命的问题:慢。

测试发现慢主要原因是没有利用多核 CPU 的并行能力,基于该结论通过增加并发的方式来提高编码速度,完整源码参阅:ScikitLearn_Contrib - hashing.py


1. 源码

遇到问题第一步:查接口。通常 Python 中支持并行处理的接口都会有类似 n_thread 的参数,可惜 sklearn 内置的 HashingEncoder 并没有。

遇到问题第二部:看源码。HashingEncoder 的核心源码部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@staticmethod
def hashing_trick(X_in, hashing_method='md5', N=2, cols=None, make_copy=False):
"""A basic hashing implementation with configurable dimensionality/precision

Performs the hashing trick on a pandas dataframe, `X`, using the hashing method from hashlib
identified by `hashing_method`. The number of output dimensions (`N`), and columns to hash (`cols`) are
also configurable.

Parameters
----------

X_in: pandas dataframe
description text
hashing_method: string, optional
description text
N: int, optional
description text
cols: list, optional
description text
make_copy: bool, optional
description text

Returns
-------

out : dataframe
A hashing encoded dataframe.

References
----------
Cite the relevant literature, e.g. [1]_. You may also cite these
references in the notes section above.
.. [1] Kilian Weinberger; Anirban Dasgupta; John Langford; Alex Smola; Josh Attenberg (2009). Feature Hashing
for Large Scale Multitask Learning. Proc. ICML.

"""

try:
if hashing_method not in hashlib.algorithms_available:
raise ValueError('Hashing Method: %s Not Available. Please use one from: [%s]' % (
hashing_method,
', '.join([str(x) for x in hashlib.algorithms_available])
))
except Exception as e:
try:
_ = hashlib.new(hashing_method)
except Exception as e:
raise ValueError('Hashing Method: %s Not Found.')

if make_copy:
X = X_in.copy(deep=True)
else:
X = X_in

if cols is None:
cols = X.columns.values

def hash_fn(x):
tmp = [0 for _ in range(N)]
for val in x.values:
if val is not None:
hasher = hashlib.new(hashing_method)
if sys.version_info[0] == 2:
hasher.update(str(val))
else:
hasher.update(bytes(str(val), 'utf-8'))
tmp[int(hasher.hexdigest(), 16) % N] += 1
return pd.Series(tmp, index=new_cols)

new_cols = ['col_%d' % d for d in range(N)]

X_cat = X.loc[:, cols]
X_num = X.loc[:, [x for x in X.columns.values if x not in cols]]

X_cat = X_cat.apply(hash_fn, axis=1)
X_cat.columns = new_cols

X = pd.concat([X_cat, X_num], axis=1)

return X

一个 for 冲到底,什么叫简单粗暴啊(后仰)

默认情况 Python 没有对多核环境做多线程优化,这意味着多个线程一定跑在同一个核心上,这在处理 UI、IO 密集型任务时可以更彻底发挥单核性能,但对于特征工程这种计算密集型任务来说,GIL 全局锁就是 Debuff。


2. 改进思路

2.1 理论支持

关于什么是哈希编码的内容网上有很多了。但在开始优化之前,需要考虑一个问题:

按顺序依次编码,和多进程分别编码再拼接,最后的结果是一样的吗?

换言之,哈希编码是否依赖数据之间的联系?如果多进程编码会导致结果变化,那一切都是白费功夫。

幸运的是,哈希编码是对数据原始信息的编码,与数据间的联系无关,只要规定好输入特征和输出维度,最终的输出是确定的。也就是说,每一次送去编码的进程,其数据都是独立的,并且完全可以分开编码而不需要考虑别的数据,只要给每个进程设置相同的编码算法和输出维度,最后再将每个独立的数据块按照原数据的顺序重新排列即可。

哈希编码的大致核心思想:

  1. 哈希表把输入散列到指定的输出范围内
  2. 对于不在训练数据的散列表中的输入,依然可以通过同样的哈希算法计算出散列值
  3. 只要指定了相同的散列范围,新的输入也会具有相同的输出特征维度
  4. 新输入可以直接进行哈希编码,而不需要再对整体数据重新处理,因此支持在线学习
  5. 只有原始值非零,哈希编码才有效,因此可以保持编码前后的稀疏性

参考的理论基础资料:

2.2 代码思想

既然是多核性能没发挥出来,那就从并行的角度出发,大致的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multithreading

class HashingEncoder:
self.multi_data = []
...

# 每个线程调用的方法
def work():
get_data() # 每个线程都从整体数据中获取一部分数据
hashing_data = hashing() # 每个线程都单独进行哈希编码
self.multi_data.append(hashing_data)

def transform():
for thread in multi_num
multithreading.Thread.Thread(target=work).start

data = pandas.concat(multi_data, ignore_index=True)

简单来说我想做的事情是:

  1. 允许用户自定义并行线程数以及每个线程每次编码的样本数
  2. 根据参数创建 n 个进程
  3. 每个线程都依次从所有数据中不重复地顺序取出参数定义数量的数据
  4. 每个线程都独立对自己取到的数据进行哈希编码
  5. 每个线程哈希编码完后将返回数据存储进一个列表内
  6. 将列表内的数据拼接成回一个整体。

3. 踩过的坑

3.1 多线程和多进程问题

可以看到上面的 Demo 伪代码,由于客户端开发的习惯使我想当然地认为多线程可以解决 CPU 利用率的问题,因此第一次优化时我在一个小型样本 Demo 上选择了 multithreading,尽管从 print 看起来似乎确实是并行处理,但上数据压测就发现一切都还在单核上,因此只能利用 multiprocessing 解决。

3.2 并行顺序读取数据问题

与其他平台的并发工具类似,Python 中多进程管理也有两种方式,一个是自行创建实例,另一个是创建实例「池」。相比之下使用进程池有以下特点:

  • 不必关心每个进程的生命周期;
  • 可以指定进程执行完成后的返回值;
  • ProcessPool 内部维护的进程都是由公共 Process 创建的,每个进程的上下文天然具有继承关系,因此可以互相共享进程锁等资源;
  • 内部已经通过 Pickle 中的 dump(obj, file, protocol=None,) 方法对参数进行了封装处理,已经实现了拷贝的支持;
  • 比手动管理要多一些调度 / 资源分配,会略微损失一些性能;

不过由于 Python 较少用在对性能敏感的场景,因此在很多时候进程池其实是更好的方案,此处选择手动管理仅是为了更好的示例。

现在假设我们开启 4 个进程同时执行一个任务,代码应该大致如下

1
2
3
4
5
6
7
8
9
10
def work():
get_data() # 每个线程都从整体数据中获取第 index 部分的数据

process_list = []
for i in range(4):
process.append(multiprocessing.Process(target=work, args=()))

for p in process_list:
p.start()
p.join()

由于在多核环境下,多进程是真正的并发,因此需要考虑数据异步读写的同步性问题。高端的框架往往使用简单的逻辑,加锁 + 分治就是处理低并发最经济的方式。为了确保每个进程能自己判断所需的数据范围,需要一个变量用于存放已被读取的范围,还需要注意一下锁的范围、锁重入、锁粗化的问题。代码思路就变成了(省略一些校验、判断细节):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def __init__(self):
self.lock = multiprocessing.Lock()

def work(self, done_index):
if self.lock.acquire():
start = done_index
end = done_index + max_samples # max_samples 是用户定义的每个进程单次读取数据量
done_index = end # 更新已完成的下标
self.lock.release()
get_data(start, end)
hashing()
else:
self.lock.release()

done_index = 0 # 初始化下标为从 0 开始
for i in range(max_process): # max_process 是用户定义的最大并行进程数
process.append(multiprocessing.Process(target=work, args=(done_index)))
...
...

聪明的你一定会发现一个问题:由于进程在创建时会为自己创建一份上下文拷贝,并运行在自己的独立内存中,所以每个进程在执行完成后,修改的 done_index 都只是自己内存中的引用(即便用 global 修饰为全局变量也同样),因此为了在进程间共享,就需要用 multiprocessing.Value() 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def __init__(self):
self.lock = multiprocessing.Lock()
self.done_index = multiprocessing.Value('d', 0)
self.data_lines = len(data) # 样本总数目

def work(self):
if self.lock.acquire(): # 加锁,已确保自己获取到的 done_index 是最新的
start = self.done_index.value
end = start + max_samples # max_samples 是用户定义的每个进程单次读取数据量
self.done_index.value = end # 更新已完成的下标
self.lock.release() # 释放锁,允许其他进程操作更新 done_index
get_data(start, end)
hashing()
if end < self.data_lines: # 尚有未完成的数据,再次调用分配
self.work()
else:
self.lock.release()

3.3 编码后数据的保存问题

现在并发已经正常了,数据一致性也保证了,只要能在编码完成后把最终结果通知给主进程即可。

这体现了进程池的好处之一:可以指定进程的返回值。

由于手动管理模式并没有返回值,因此想借鉴上面的方法使用一个共享变量来存放,但查了一下 Api 发现 multiprocessing.Value() 只支持几种基本类型,而需要收集的数据类型复杂:

表格来源:

Type code C Type Python Type Minimm size in bytes
'c' char character 1
'b' signed char int 1
'B' unsigned char int 1
'u' Py_UNICODE Unicode character 2 (see note)
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int long 2
'l' signed long int 4
'L' unsigned long long 4
'f' float float 4
'd' double float 8

不过 multiprocessing 提供了一个非常好的内置方案:Queuemultiprocessing.Queue 允许进程通过加锁以 FIFO 的顺序使用 put()get() 读写对象,且允许设置读写超时后中断锁并放弃操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def __init__(self):
self.hashing_parts = multiprocessing.Queue()

def work(self):
...
data = hashing()
self.hashing_parts.put(data)

......

# 主线程等待所有子线程执行完毕
list_data = []
while not hashing_parts.empty():
list_data.append(hashing_parts.get())

3.4 结果数据顺序问题

到这一步,已经可以将各个进程的编码结果存在 list_data 中,只需要使用 pandas.concat(list_data, ignore_index = 0) 就能把所有数据拼起来成为完整的编码后数据,但聪明的你一定又会想到,并发处理的返回顺序是不可预知的,直接按进程完成的顺序拼接很可能会改变原始样本数据的顺序:

With return_df=False, a different ordering of the samples could lead into nasty surprises, because numpy.ndarray doesn't preserve index -> the training data would not be aligned with the labels anymore.

因此还需要利用字典来存放当前数据块的索引位置与数据块的关系,最后主进程获取所有的字典数据,并按照键值顺序排序后再拼接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def __init__(self):
self.hashing_parts = multiprocessing.Queue()

def work(self):
...
start = ... # 当前数据块起始下标
end = ... # 当前数据块结束下标
data = hashing()
part_index = int(math.ceil(end_index / self.max_sample))
self.return_data.put({part_index: data}})

list_data = {}
while not hashing_parts.empty():
list_data.update(hashing_parts.get()) # 取出所有的代码块(无序状态)
sort_data = []
for part_index in sorted(list_data): # 将所有字典型数据块按照 key 排序后把 value 取出
sort_data.append(list_data[part_index])
data = pandas.concat(sort_data, ignore_index=True) # 拼接数据

其中 part_index 的算法是用当前已完成的下标除以每个进程单次处理的下标,并向上取整(为了让结果更直观地从 1 开始),来作为当前数据块的定位。

3.5 子进程调用问题

到了这一步,终于 Demo 测试已经完整支持了并发,但聪明的你一定会问:我们写代码不仅要能用还要足够鲁棒,你测试过代码吗?

好了小明你出去,这里不允许提问。

由于 sklearn-category_encoders 的 CI 接入了 nose 单元测试,流水线在跑单元测试代码时报了这么一个错误:

1
2
3
Runtime Error: Lock objects should only be shared between processes through inheritance.

意思大致是进程锁只能被锁的创建者的子进程共享。

这体现了进程池的好处之一:每个进程天然具有继承关系,可以共享进程锁等资源。

为了解决这个问题,可以通过 multiprocessing.Manager.XXX 来代替所有需要进程共享的资源:

1
2
3
4
def __init__(self):
self.lock = multiprocessing.Manager.Lock()
self.hashing_parts = multiprocessing.Manager.Queue()
self.done_index = multiprocessing.Manager.Value('d', 0)

3.6 深拷贝问题

现在数据测试已经通过,CI 测试也已经通过,可当使用这套 HashingEncoder 运行一个大数据量的基准测试时,multiprocessing 的 Manager 又报错了(省略部分信息):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Traceback (most recent call last):
...
...
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
...
...
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Unserializable message: Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 289, in serve_client
send(msg)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

Traceback 中的信息不够清晰,但是发现了一个关键信息:deepcopy,为了验证写个小测试:

1
2
3
4
from copy import deepcopy

he = HashingEncoder()
he_cp = deepcopy(he)

报错复现,看来主要问题就出在深拷贝上。从网上搜,能参考的信息实在太少了,不过参考 Pickle 模块的作用(用于 Python 特有数据类型和基本数据类型转换),聪明的你一定又想到了:这不就是序列化吗?

这体现了进程池的好处之一:内部对参数进行了封装处理,实现了拷贝的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def __init__(self):
self.lock = multiprocessing.Manager.Lock()
self.hashing_parts = multiprocessing.Manager.Queue()
self.done_index = multiprocessing.Manager.Value('d', 0)

def work(self):
self.lock ...
self.hashing_parts ...
self.done_index ...
...
...

for i in range(max_process): # max_process 是用户定义的最大并行进程数
process.append(multiprocessing.Process(target=work, args=()))
...
...

lock, hashing_parts, done_index,都是在 HashingEncoder 初始化时定义的,虽然这些都不是我的自定义类对象,但报错信息的最后一行:TypeError: can't pickle \_thread.lock objects 明确指出:进程锁 Lock 是不能被 pickle 转化的,事实上,另外两种 Value 和 Queue 都不可以,只不过因为执行到 pickle Lock 的时候就报错中断了而已。

不过因为这几个参数的目的只是为了运行时记录一下数据处理进度并不需要实现得太复杂,既然造成这个错误的原因是开启子进程时,参数带有自定义类对象(通常是不可被 pickle 的),那就把这些 Manager() 下的对象都放到创建子进程之前,并通过参数传递进去即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
def work(self, _lock, _hashing_parts, _done_index):
_lock ...
_hashing_parts ...
_done_index ...

lock = multiprocessing.Manager.Lock() # 创建为局部变量
hashing_parts = multiprocessing.Manager.Queue() # 创建为局部变量
done_index = multiprocessing.Manager.Value('d', 0) # 创建为局部变量
for i in range(max_process): # max_process 是用户定义的最大并行进程数
process.append(
# 通过参数传递
multiprocessing.Process(target=work,
args=(lock, hashing_parts, done_index)))

另外,在查找 multiprocessing 相关内容的时候,无意间看到一个因为 logging 导致多进程死锁的问题,出现概率较低,记录在附录【10】。作者自己的解答为:

由于主进程中使用了thread+mutiprocessing(fork),导致logging出现死锁,现象就是遇到子进程里第一句logging就hang住。问题只会发生在Linux下。

以及在本地进行单元测试时通常使用 nose 模块测试,但在提交 PR 时测试是部署在 CI 上进行的,这里面也遇到了一些问题,例如代码文件内的 example 的输入和输出必须是真实运行结果,CI 检测时会真实运行 example 的输入,并且所写的输出也要和 CI 运行结果一致。


4. 优化效果

释放多核能力,相比单核的提升当然是巨大的,不信你问问 Intel 和 AMD。放上一些毫无悬念的对比测试:

样本数量 特征数量 CPU 物理核心数 优化前编码耗时 优化后编码耗时
100,000 2 8 166.02 s 25.79 s
1,000,000 8 8 1.89 h 15.56 min
50,000,000 4 12 48.22 h 4.39 h

(测试都是在 ECS 上跑出来的,真金白银……)

The End


参考文献