从零实现一个高并发内存池
1. 初识高并发内存池
1.1 项目介绍
当前项目是实现一个高并发的内存池,它的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存1管理,用于替代系统的内存分配相关的函数(malloc 、free)。
1.2 项目所需的知识
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。
1.3 了解池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
1.4 内存池主要解决的问题
内存池主要解决的是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那什么是内存碎片呢?
这是我们申请空间的一般步骤:
当我们将指针a与c管理的空间释放后,堆区现在有2KB的空间,但是我们要申请2KB的空间却申请不出来,因为这两块空间碎片化,不连续了!
上面讲的是外碎片问题,指的是一些空闲的连续内存区域太小,且这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配的申请需求。内存碎片问题也包括内部碎片的问题,指的是由于一些对齐的要求,导致分配出去的空间中一些内存无法被利用(比如struct结构体中的内存对齐)。
2. 小试牛刀–设计一个定长内存池
我们先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以设计它的目的有两个,先熟悉一下简单内存池是如何控制的,第二它会作为我们后面内存池的一个基础组件。
内存池框架代码如下:
template<class T>
class ObjectPool {
public:
T* New(){} //从内存池申请一块内存
void Delete(T* obj){} //释放一块内存至内存池
private:
char* _memory = nullptr; //管理向系统申请的大块内存
void* _freeList = nullptr; //管理返回定长内存的头指针
size_t _remainBytes = 0; //记录大块内存所剩字节数
};
2.1 将内存还给定长内存池
因为我们采用了池化技术,所以对于还回来的内存,我们需要将它们组织管理起来,那么如何管理呢,我们采用类似于单链表的方式,如下图:
每块内存的前4个字节(32位下)或8个字节(64位下)存放的是指向下一块空间的指针,增加头指针是为了方便对还回的内存进行插入,否则每次插入都选择尾插效率太低。
对应Delete的实现:
void Delete(T* obj) {
//显示调用析构函数处理对象
obj->~T();
//将该内存头插进自由链表中
(*(void**))obj = _freeList;
_freeList = obj;
}
2.2 向定长内存池中申请内存
- 当对应的内存池中已经有被还回来的内存块,即自由链表不为空时
我们直接从自由链表中获取内存块,头删一块内存块,如下图示意:
该部分代码实现如下:
T* New() {
T* obj;
if (_freeList) {
void* next = (*(void**))_freeList;
obj = (T*)_freeList;
_freeList = next;
}
return obj;
}
- 当自由链表为空并且所剩内存字节数不满足我们要求时,就要向系统申请以页为单位的空间(这里一页为8KB)
申请出的空间逻辑示意图如下:
当自由链表为空时,我们就在该大内存上切下我们需要的空间,示意图如下:
切下我们所需要大小的内存块,剩下的内存继续由_memory管理。
在此需要注意内存对齐的问题,比如我们的T是char类型的,当我们返回这块空间时,这块空间实际上是1字节,但自由链表存储空间的最低要求是4字节或8字节(因为要存指针),因此切内存时要注意内存对齐。
整体代码如下:
T* New() {
T* obj;
if (_freeList) {
void* next = (*(void**))_freeList;
obj = (T*)_freeList;
_freeList = next;
}
else {
if (_remainBytes < sizeof(T)) { //如果剩余内存不够了,向系统申请
_remainBytes = 1024 * 128;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr) {
throw std::bad_alloc();
}
}
obj = (T*)_memory;
size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*); //内存对齐
_memory += objSize;
_remainBytes -= objSize;
}
new(obj)T; //显示调用new初始化
return obj;
}
3. 高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
1. 性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎片问题。
concurrent memory pool主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
4. 梳理向内存池中申请内存的步骤
4.1 哈希表与内存对齐
在使用ThreadCache和CentralCache时,都需要根据所需内存的字节数去对应的哈希桶中申请内存,若我们采用一对一的哈希映射关系,即以字节为单位的任意大小的内存均要对应上哈希表的下标,且因为管理内存采用的是单链表的方式(申请的内存大小至少为4或8字节),即哈希表的下标范围应为[8,256*1024],显然下标范围这么大的哈希表不切实际,且实用性低。因此要采用一定的内存对齐原则。我们采用如下原则进行内存对齐:
这样不仅缩小哈希表长度,大幅减少哈希表占用的空间,而且有效控制了内碎片的浪费问题。
这样我们可以根据所需字节数算出对齐后的字节数,进而根据对齐后的字节数算出哈希表对应的桶的位置。
代码实现如下:
static const size_t MAX_BYTES = 256 * 1024; //申请空间的最大字节数
static const size_t NFREELIST = 208; //哈希表的长度
class SizeClass {
public:
static size_t _RoundUp(size_t size, size_t align) {
if (size % align == 0) {
return size;
}
return (align - (size % align) + size);
}
static size_t _Index(size_t alignSize, size_t align) {
return alignSize / align - 1;
}
static size_t RoundUp(size_t size) { //算内存大小的对齐数
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
if (size <= 128) {
return _RoundUp(size, 8);
}
else if (size <= 1024) {
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024) {
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024) {
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024) {
return _RoundUp(size, 8 * 1024);
}
else {
assert(size < 256 * 1024);
}
}
static size_t Index(size_t size) { //计算哈希桶下标
assert(size < 256 * 1024);
size_t alignSize = RoundUp(size);
static int group_array[4] = { 16,56,56,56 };//每个对齐区间有多长
if (alignSize <= 128) {
return _Index(alignSize,8);
}
else if (alignSize <= 1024) {
return _Index(alignSize - 128 , 16) + group_array[0];
}
else if (alignSize <= 8 * 1024) {
return _Index(alignSize - 1024 , 128) + group_array[0] + group_array[1];
}
else if (alignSize <= 64 * 1024) {
return _Index(alignSize - 8 * 1024, 1024) + group_array[0] + group_array[1] + group_array[2];
}
else if (alignSize <= 256 * 1024) {
return _Index(alignSize - 64 * 1024, 8 * 1024) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
return -1;
}
};
4.2 ThreadCache结构分析与申请内存流程
- 线程如何独享ThreadCache
每个线程去申请ThreadCache时是在堆上申请的,但一个进程中每个线程共享一份堆区,所以无法保证每个线程独享一份ThreadCache,只能通过加锁的方式来共用ThreadCache。那有办法支持线程无锁访问ThreadCache并且每个线程都独享一份ThreadCache吗?答案是有的。即线程局部存储:线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
我们通过加关键字静态编译来实现该方法:
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
- ThreadCache基本结构
ThreadCache中是哈希表的结构,哈希表中的哈希桶采用单链表的形式将内存块链接起来,与定长内存池链接内存块的方式一致,结构逻辑图如下:
下面是哈希桶中自由链表的实现代码:
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
class FreeList {
public:
void Push(void* obj) { //插入一块内存
assert(obj);
NextObj(obj) = _freeList;
_freeList = obj;
}
void* Pop() { //弹出一块内存
assert(_freeList);
void* obj = nullptr;
obj = _freeList;
_freeList = NextObj(obj);
return obj;
}
bool Empty() { //查看自由链表是否为空
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;
};
当每个线程去自己独享的ThreadCache中申请内存时,ThreadCache会根据所需字节数先按照内存对齐的原则,再找到对应桶的下标,去取内存块给线程。当对应下标的桶中没有内存时,则该Threadcache应该向CentralCache申请内存。Threadcache结构代码如下:
class ThreadCache {
public:
//申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
//当内存不够时,向CentralCache获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST]; //内存对齐的哈希表
};
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
Threadcache中Allocate函数实现较为简单,不再图解,实现代码如下:
void* ThreadCache::Allocate(size_t size) {
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
void* obj = nullptr;
if (!_freeLists[index].Empty()) {
//证明该桶中有内存块
obj = _freeLists[index].Pop();
}
else {
//该桶为空,那么我们要向CentralCache申请内存
obj = FetchFromCentralCache(index, alignSize);
}
return obj;
}
4.3 CentralCache结构分析与申请内存流程
4.3.1 CentralCache结构分析(承上启下,非常关键)
CentralCache也是一个哈希桶结构,它的哈希桶的映射关系跟ThreadCache是一样的。不同的是他的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的Span中的大内存块被按映射关系切成了所对应特定字节的一个个小内存块对象挂在Span的自由链表中。结构示意图如下:
因为已经知晓了STL中的List,且该SpanList与STL中的List结构相似,因此这里直接将SpanList结构的代码给出
class SpanList {
public:
SpanList() {
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
void Insert(Span* pos, Span* newSpan) {
//在pos位之前插入span
assert(pos);
assert(newSpan);
Span* prevSpan = pos->_prev;
newSpan->_prev = prevSpan;
pos->_prev = newSpan;
prevSpan->_next = newSpan;
newSpan->_next = pos;
}
void Erase(Span* pos) {
assert(pos);
assert(pos != _head);
Span* prevSpan = pos->_prev;
Span* nextSpan = pos->_next;
prevSpan->_next = nextSpan;
nextSpan->_prev = prevSpan;
}
private:
Span* _head;
public:
std::mutex _mtx; // 桶锁
};
那么Span的结构是什么样呢,我们将Google中Span的核心框架拿出来,一个个给大家解释:
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
#endif
static const size_t PAGE_SHIFT = 13;//一页是8kb,也就是2^13字节
struct Span {
//1.一页的内存我们假定为8KB
//2.Span管理的是根据起始地址到末尾地址的大块内存
//3.管理以页为单位的大块内存
//4.管理多个连续页大块内存跨度结构
PAGE_ID _pageId = 0;//span管理的大块内存起始页的页号
size_t _n = 0; //span管理的有多少页
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _objectSize = 0; //切好的小内存对象的大小
size_t _useCount = 0; //对被分配给threadcache的小块内存计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //该span是否在被使用
};
- 关于_pageId与_n
当我们走到最底层,即向系统申请内存空间时,系统返回给我们的指向这段空间起始地址的指针(32位系统下是4字节,64位系统下是八字节)。而_pageId就是与指针建立起映射关系!!假如现在我们申请的起始地址是(32位系统下)0xffff0000,那么它对应的起始页号_pageId就是524280。因为我们一页的大小为8KB,所以让它除以8192(十进制)就可以得出起始页号,同理根据它的页数_n的数量,就可以知道Span管理的大块内存的末尾地址,假设_n=1,那么它的末尾地址就是0xffff2000。而为了在不同的系统下都能够保存下_pageId的值,所以采用条件编译,32位系统下采用size_t类型,64位系统下采用unisgned long long类型。
- 关于_objectSize与_useCount
因为CentralCache采用的哈希表映射规则与Threadcache一致,所以每个Span挂到相应位置的桶的时候,它所切成的小块内存的大小必须是符合ThreadCache哈希表映射关系的,所以_objectSize可以是8字节-1024*256字节。前文说过,CentralCache是起一个调度作用的中间媒介,当ThreadCache中内存过多时,要进行回收并整合,同理当Span过多时,也要返回给PageCache,而_useCount就是对分出去的小内存块进行计数,并在必要时回收。
分析完CentralCache的哈希表结构后,再来思考一下如何将CentralCache只在全局中设计出一份,让每个线程都共享CentralCache。我们采用单例模式,即将CentralCache的构造和拷贝构造都加上delete,在全局只创建出唯一一个CentralCache。 代码如下:
class CentralCache {
public:
CentralCache(const CentralCache& c) = delete;
CentralCache& operator=(const CentralCache& c) = delete;
static CentralCache* GetInstance() { //单例模式
return &_sInst;
}
private:
CentralCache(){}
SpanList _spanLists[NFREELIST]; //与ThreadCache结构相似的哈希表
static CentralCache _sInst; //单例模式
};
ThreadCache来CentralCache中申请内存时,我们先要获取一个非空的Span,再从这个Span中拿出一段内存给ThreadCache。基于此要增加两个成员函数。
class CentralCache {
public:
CentralCache() = delete;
CentralCache(const CentralCache& c) = delete;
CentralCache& operator=(const CentralCache& c) = delete;
static CentralCache* GetInstance() { //单例模式
return &_sInst;
}
//获取一个非空的Span
Span* GetOneSpan(SpanList& list, size_t size);
//从中心缓存获取一定的数量给ThreadCache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
private:
SpanList _spanLists[NFREELIST];
static CentralCache _sInst; //单例模式
};
4.3.2 CentralCache如何将内存给ThreadCache
上文讲到ThreadCache没有内存后就向CentralCache索要内存,那么为了更好的体现池化思想,CentralCache应该每次都多给ThreadCache一点内存。在这里我们采用类似于TCP协议中的慢增长算法。
慢开始反馈调节算法
1 .最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
2. 如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
3. size越大,一次向central cache要的batchNum就越小
4. size越小,一次向central cache要的batchNum就越大
ThreadCache中FetchFromCentralCache代码如下:
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) {
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (batchNum == _freeLists[index].MaxSize()) {
++_freeLists[index].MaxSize();
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
if (actualNum == 1) {
assert(start == end);
return start;
}
else {
_freeLists[index].PushRange(NextObj(start), end);
return start;
}
}
void PushRange(void* start, void* end) {
NextObj(end) = _freeList;
_freeList = start;
}
static size_t NumMoveSize(size_t size) {
assert(size);
//小对象一次批量上限高
//大对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2) {
num = 2;
}
if (num > 512) {
num = 512;
}
return num;
}
此时ThreadCache采用了一个慢增长算法向CentralCache索要内存,那站在CentralCache的角度,它该如何将内存给ThreadCache呢。
- 首先CentralCache要找出桶中不为空的span
因为CentralCache中存的是SpanList,且整个进程只有一份CentralCache,所以每次访问对应的Spanlist桶时要加锁,访问时要找出Spanlist中不为空的Span
- 对该Span进行切分
拿出Span后由于它可能满足我们的需求,也有可能不满足,所以我们返回的是实际拿到的内存块数量。代码如下:
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) {
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock(); //访问该桶的时候加上锁
Span* span = CentralCache::GetInstance()->GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
//从span中获取batchnum个内存块
//有多少拿多少
start = span->_freeList;
end = start;
size_t actualNum = 1;
int i = 0;
while (i < batchNum - 1 && NextObj(end) != nullptr) {
end = NextObj(end);
++i;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actualNum;
}
4.3 PageCache结构分析与申请内存流程
4.3.1 PageCache结构分析
PageCache 的核心结构是Spanlist的哈希桶,但是和CentralCache与ThreadCache有本质区别的,CentralCache中哈希桶,是按跟ThreadCache一样的大小对齐关系映射的,他的Spanlist中挂的Span中的内存都被按映射关系切好链接成小块内存的自由链表。而PageCache 中的Spanlist则是按下标桶号映射的,也就是说第i号桶中挂的Span都是i页内存。同样全局只有唯一一个PageCache,将PageCache设计为单例。且为了线程安全对其上锁。
示意图如下
class PageCache {
private:
SpanList _spanLists[NPAGES];
static PageCache _sInst;
PageCache(){}
public:
PageCache(const PageCache& p) = delete;
PageCache& operator=(const PageCache& p) = delete;
std::mutex _pageMtx;
static PageCache* GetInstance() {
return &_sInst;
}
Span* NewSpan(size_t k);
};
4.3.2 PageCache如何将内存给CentralCache
上面提到,CentralCache要想给ThreadCache内存,则它当中必须有空余的Span,当没有空余的Span时,就要向PageCache申请,而申请下来的Span是一大块内存,我们还需要将它切好并挂入CentralCache的SpanList中。
刚申请出Span时的逻辑示意图:
我们要对这大块内存进行切分,如何找到大块内存的起始地址呢?,前面提到,Span管理大块内存,根据Span的页号,就可以得到大块内存的起始地址,再根据Span管理的页数,就可以得出大块内存的结束地址。
为了方便进行切分,我们先将大块内存切下来一小块做头部,并不断进行尾插,逻辑图如下:
就这样循环往复,直到start到达end位置时,证明该大块内存已经被切完了。
代码如下:
Span* CentralCache::GetOneSpan(SpanList& list, size_t size) {
//先查看该桶中的链表是否有空余的Span
Span* it = list.Begin();
while (it != list.End()) {
if (it->_freeList) {
return it;
}
else {
it = it->_next;
}
}
//走到这一步该桶里没有多余的Span,要向Page拿Span
//可以将桶锁解开,因为我们要去访问Page了
list._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
PageCache::GetInstance()->_pageMtx.unlock();
//先将该span切好成小块内存,再插入到spanlist中。
char* start = (char*)(span->_pageId << PAGE_SHIFT);
char* end = start + (span->_n << PAGE_SHIFT);
span->_objectSize = size;
//如何切呢?先切一块下来,再方便尾插
span->_freeList = start;
void* tail = span->_freeList;
start += size;
int i = 0;
while (start < end) {
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
++i;
}
NextObj(tail) = nullptr;
//再插入到链表中
list._mtx.lock();
list.PushFront(span);
return span;
}
//计算一次向系统获取几个页
static size_t NumMovePage(size_t size) {
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0) {
npage = 1;
}
return npage;
}
4.3.3 PageCache切分内存与系统调用
根据上面提到,PageCache将自己的大内存块Span给CentralCache,那PageCache中是如何调度呢。
- 当PageCache中有CentralCache所需页数的Span时,PageCache直接返回给CentralCache。
- 当PageCache中没有CentralCache所需页数的Span时,但有超过它需求的页数时,先将超出这部分切开,再还给CentralCache。
- 当PageCache上述两种情况都不满足时,它向操作系统申请128页的大内存块,然后再重复情况2.
假设我们需要1页的Span,情况2图示如下:
代码如下:
Span* PageCache::NewSpan(size_t k) {
assert(k);
assert(k < NPAGES);
if (!_spanLists[k].Empty()) {
return _spanLists[k].PopFront();
}
//走到这里代表没有多余的Span,可以去大于n(n>k)页的spanlist中找,把它切成k页与n-k页
//比如要2页,但桶2是空的,但桶5不为空,可以把桶5切为桶3和桶2,桶3保存,桶2返回
for (int i = k + 1; i < NPAGES; ++i) {
if (!_spanLists[i].Empty()) {
Span* newSpan = new Span;
Span* curSpan = _spanLists[i].PopFront();
newSpan->_n = k;
newSpan->_pageId = curSpan->_pageId;
curSpan->_n -= k;
curSpan->_pageId += k;
_spanLists[curSpan->_n].PushFront(curSpan);
return newSpan;
}
}
//整个PageCache都是空的,向操作系统申请
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1); //申请128页的内存空间
bigSpan->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT);
bigSpan->_n = NPAGES - 1;
_spanLists[NPAGES - 1].PushFront(bigSpan);
return NewSpan(k);
}
5. 梳理将内存还给内存池的步骤
5.1 将内存还给ThreadCache
ThreadCache是哈希桶的结构,与内存字节数映射。所以当我们还内存时,只需将内存块挂到(头插)对应的哈希桶上即可。
void* ThreadCache::Deallocate(void* ptr, size_t size) {
assert(ptr);
assert(size <= MAX_BYTES);
//找到对应的链表桶,头插进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//当还回内存块的数量大于等于申请的最多内存块时,要还给CentralCache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize()) {
ListToLong(_freeLists[index], size);
}
}
前面提到,该项目能有效解决内存碎片化的问题,即还给ThreadCache的内存块过多时,就要还给CentralCache
void ThreadCache::ListToLong(FreeList& list, size_t size) {
void* start = nullptr;
void* end = nullptr;
//将链表中的内存块都取出来
list.PopRange(start, end, list.MaxSize());
//CentralCache回收ThreadCache中的内存
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
void PopRange(void*& start, void*& end, size_t n) {
assert(n >= _size);
start = _freeList;
end = start;
for (int i = 0; i < n - 1; ++i) {
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void PushRange(void* start, void* end, size_t pushNum) {
assert(start);
assert(pushNum > 0);
NextObj(end) = _freeList;
_freeList = start;
_size += pushNum;
}
5.2 CentralCache利用Span回收管理小内存块
我们有每块内存的地址,但如何靠地址找到它属于哪块Span呢?前文提到,靠Span的页号,建立起页号与地址的对应关系,这里页号与地址是一对多的关系,关系图如下:
在PageCache里增加关联式容器unordered_map,且为了方便页与页之间的合并,故unordered_map存放<PAGE_ID,Span*>的关系。
void CentralCache::ReleaseListToSpans(void* start, size_t size) {
//找桶上锁
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start) {
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
--span->_useCount;
// 说明span的切分出去的所有小块内存都回来了
// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
if (span->_useCount == 0) {
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
// 释放span给page cache时,使用page cache的锁就可以了
// 这时把桶锁解掉
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
Span* PageCache::MapObjToSpan(void* obj) {
assert(obj);
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end()) {
return ret->second;
}
else {
assert(false);
return nullptr;
}
}
5.3 PageCache合并更大的Span
当Span中切出的小内存块都还回来时,将该Span还给PageCache,利用该Span去与其它空闲的Span合并,合并出更大的页的Span,解决内存外碎片问题。
void PageCache::ReleaseSpanToPageCache(Span* span) {
//先向前找,合并
while (1) {
PAGE_ID prevId = span->_n - 1;
auto ret = _idSpanMap.find(prevId);
//前面的页号不存在
if (ret == _idSpanMap.end()) {
break;
}
//找到了在使用
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true) {
break;
}
//合并的页超出128页没法管理了
if (prevSpan->_n + span->_n > NPAGES - 1) {
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
//再向后找,合并
while (1) {
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
//再有合并的时候,要不从前面找到,要不从后面找到,所以只映射首尾页号
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
6. 超过128页内存的申请与性能测试
当申请的页数超过128时,即意味着PageCache中的哈希桶无法再对管理128页的Span进行管理,所以我们直接调用系统接口,去堆上申请与释放。
该项目研究的是多线程环境下与malloc效率的对比,所以我们设计多线程的测试环境。设计代码如下:
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k) {
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t r = 0; r < rounds; ++r) {
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; ++i) {
v.push_back(ConcurrentAlloc(16));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; ++i) {
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
int main()
{
size_t n = 1000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
可以看到释放内存时,时间差好多,猜测一下。
- 一方面应该是因为在map中查找时浪费时间过多。
- 另一方面应该是获取锁时花费的时间。
7.性能瓶颈优化
为了查找到瓶颈所在,我们使用Visual Studio自带的性能分析工具
点击确定后等待编译器分析完成即可,下图是分析报告
验证了我们刚刚的猜想,锁占用和map查找的时间过长。但这两个问题本质上是一个问题,因为查找慢所以锁住的时间长,关键所在即解决查找慢的问题。
这里采用基数树进行优化。
基数树有如下优点:
- 逻辑和物理上是直接映射的顺序结构,无需考虑结构的变化。
- 不用考虑结构的变化就可以无需考虑对整个结构进行读写时加锁的问题(不论多少线程读写,因为结构不变,所以不加锁)。
- 基数树最多有三层,避免一次向堆区申请空间过大的问题。
基数树一层结构如下:
基数树二层结构如下
最终引入基数树,替换我们代码中用到map的地方,基数树代码如下:
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1<<PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
改进后效果如下:
注:该项目只将tcmalloc的精华提炼出来学习并简单实现,与真正的tcmalloc存在一定差距。
创作不易,希望各位伙伴多多三连!
该项目已上传至码云:https://gitee.com/enjoy-a-cup-of-tea/project/tree/master/ReivewConcurrentMemoryPool
更多推荐