当前位置: 首页 > news >正文

【项目】高并发内存池

目录

项目介绍

内存池介绍

高并发内存池整体框架设计

threadcache

threadcache整体设计

ThreadCache哈希桶映射与对齐规则

threadcacheTLS无锁访问

centralcache

centralcache整体设计

centralcache结构设计

centralcache核心实现

pagecache

pagecache整体设计

pagecache中获取Span

threadcache回收内存

centralcache回收内存

pagecache回收内存

大于256KB的大块内存申请问题

使用定长内存池配合脱离使用new

释放对象时优化为不传对象大小

多线程环境下对比malloc测试

针对性能瓶颈使用基数树进行优化

使用基数树进行优化代码实现

项目源码


项目介绍

  本项目实现的是一个高并发的内存池,原型是Google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数malloc和free。

tcmalloc的知名度也是非常高的,不少公司都在用它,比如Go语言就直接用它做了内存分配器。

本项目是将tcmalloc最核心的框架简化后拿出来,模拟实现出一个高并发内存池,目的是为了学习tcamlloc的精华。

该项目主要涉及C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的技术。


内存池介绍

池化技术

池化技术,就是程序先向系统申请过量的资源,然后自行进行管理,以备不时之需。

之所以要申请过量的资源,是因为申请和释放资源都有较大的开销,不如提前申请一些资源放入"池"中,当需要资源时则可以直接从"池"中获取,不需要时就将该资源重新放回"池"中即可。这样使用时就会变得较为快捷,可以达到提高程序的运行效率的目的。

在计算机中,有很多地方都使用了"池"这种技术,如连接池、线程池、对象池等。以服务器上的线程池为例,其主要思想就是:先启动若干数量的线程,让它们处于睡眠状态。当接收到客户端的请求时,再唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求后,线程又进入睡眠状态。

内存池

内存池是指程序预先向操作系统申请一块足够大的内存.此后,当程序中需要申请内存的时候,不需直接向操作系统申请,而是直接从内存池中获取;同理,当释放内存的时候,并不是真正将内存返回给操作系统,而是将内存返回给内存池。当程序退出时(或某个特定时间),内存池才将之前申请的内存真正释放。

内存池所解决问题

内存池主要解决的就是效率的问题,其能够避免程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题

内存碎片分为内部碎片和外部碎片:

外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求

内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用

注意: 内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生

malloc

C语言中动态申请内存并不是直接向堆申请的,而是通过malloc函数去申请的;C++中的new实际上也是封装了malloc函数

申请内存块时是先调用malloc,malloc再去向操作系统申请内存。malloc实际就是一个内存池,malloc相当于向操作系统"批发"了一块较大的内存空间,然后"零售"给程序用,当全部"售完"或程序有大量的内存需求时,再根据实际需求向操作系统"进货"

malloc的实现方式有很多种,一般情况下不同编译器平台用的是不同的。比如Windows的VS系列中的malloc就是微软自行实现的,而Linux下的gcc用的是glibc中的ptmalloc

定长内存池

malloc其实是一个通用的内存池,在什么场景下都适用,但也意味着malloc在什么场景下都不会具有很高的性能,因为malloc并不是针对某种场景专门设计的

定长内存池则是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此性能可以达到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为申请/释放的都是固定大小的内存块

通过实现定长内存池可以熟悉对简单内存池的控制,其次,这个定长内存池也会在后面会作为高并发内存池的一个基础组件(代替new操作符)

实现定长

在实现定长内存池时要做到"定长"有许多方式,比如可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N

template<size_t N>
class ObjectPool
{};

定长内存池也可以被称为"对象池"。在创建对象池时,对象池可以根据传入的对象类型的大小来实现"定长",比如创建定长内存池时传入对象类型int,那么该内存池就只支持 sizeof(int) 字节大小内存的申请和释放

template<class T>
class ObjectPool
{};

向堆区申请内存

既然是内存池,那么首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下可以调用VirtualAlloc()系统接口;在Linux下可以调用brk()或mmap()系统接口

#ifdef WIN32#include <windows.h>
#else#include <sys/mman.h>#include <unistd.h>
#endifinline static void* SystemAlloc(size_t kpage)//按页申请内存
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else//Linux下brk mmap等
#endifif (ptr == nullptr) throw std::bad_alloc();return ptr;
}

通过条件编译将对应平台下向堆区申请内存的函数进行封装,此后就不必再关心当前所在平台,当需要直接向堆区申请内存时直接调用封装后的SystemAlloc()函数即可

定长内存池中的成员变量

对于向堆区申请到的大块内存,可以用一个指针来对其进行管理,但仅用一个指针肯定不够,还需要一个变量来记录这块内存的长度

由于此后需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针的步长以及查看数据的大小。对于字符指针而言,当需要向后移动n个字节时,直接对字符指针进行加n操作即可

使用完后释放回来的定长内存块也需被管理,可以将这些释放回来的定长内存块链接成一个链表。管理释放回来的内存块的链表被称为自由链表,为了能找到这个自由链表,因此还需要一个指向自由链表的指针

char* _memory = nullptr;//char*便于切割分配内存 指向大块内存
void* _freeList = nullptr;//自由链表 管理归还的内存块
size_t _remainBytes = 0;//记录剩余字节数

管理被释放内存块的具体方案

对于回收的定长内存块,可以使用自由链表将其链接起来,但并不需要为其专门定义链式结构,可以让内存块的前4个字节(32位平台)或8个字节(64位平台)存储后面内存块的起始地址

指针在32位平台上占用4个字节,在64位平台上占用8个字节,那么如何写出既适应32位平台也适应64位平台的代码呢?

当需要访问一个内存块的前4/8个字节时,可以先该内存块的首地址强转为二级指针,由于二级指针存储的是一级指针的地址,二级指针解引用能向后访问一个指针的大小(在32位下为4个字节、64位平台为8个字节,自动适应了环境),此时就访问到了该内存块的前4/8个字节,即下一个内存块的首地址

void*& NextObj(void* ptr) { return *(void**)ptr; }

申请对象

申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此若自由链表中有内存块的话,就直接从自由链表中头删一个内存块直接返回即可

若自由链表中没有空闲内存块,那么就在大块内存中切出定长的内存块进行返回。当内存块切出后及时更行 _memory 指针的指向,以及 _remainBytes 的值即可

若大块内存已经不足以切分出一个对象时,就应该调用封装的SystemAlloc()函数,再次向堆申请一块内存空间,此时也要及时更新_memory指针的指向,以及_remainBytes的值(可能存在浪费内存,即所剩内存不足以切出一个对象但_memory却有了新的指向)

由于当内存块释放时需要将内存块链接到自由链表当中,因此必须保证切出来的对象至少能够存储得下一个地址,所以当对象的大小小于当前所在平台指针的大小时,需要按指针的大小进行内存块的切分,即需向上对齐

T* New()
{T * obj = nullptr;if (_freeList != nullptr)//优先使用分配过的内存{obj = (T*)_freeList;_freeList = *(void**)_freeList;//强转为void**后解引用为void*,即在32位系统下可以看到4个字节,64位系统下可以看到8个字节}else{size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);//确保至少能存储一个指针大小if (_remainBytes < objSize)//大块内存空间不足{_remainBytes = 128 * 1024;_memory = (char*)SystemAlloc(_remainBytes >> 13);}obj = (T*)_memory;_memory += objSize;_remainBytes -= objSize;}new(obj)T;//定位new 调用对象构造函数初始化return obj;
}

注意:这是一个定长对象内存池,当内存块切分出来后,应使用定位new,显示调用该对象的构造函数对其进行初始化

释放对象

在释放对象时,应该显示调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,若不对其进行清理那么这些资源将无法被释放,就会导致内存泄漏,析构后将该内存块头插入_freeList中即可

完整代码

#pragma once
#include <iostream>
using std::cout;
using std::endl;#ifdef WIN32#include <windows.h>
#else#include <sys/mman.h>#include <unistd.h>
#endifinline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr) throw std::bad_alloc();return ptr;
}template <class T>
class ObjectPool
{
public:T* New(){T * obj = nullptr;if (_freeList != nullptr)//优先使用分配过的内存{obj = (T*)_freeList;_freeList = *(void**)_freeList;//强转为void**后解引用为void*,即在32位系统下可以看到4个字节,64位系统下可以看到8个字节}else{size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);//确保至少能存储一个指针大小if (_remainBytes < objSize)//大块内存空间不足{_remainBytes = 128 * 1024;_memory = (char*)SystemAlloc(_remainBytes >> 13);}obj = (T*)_memory;_memory += objSize;_remainBytes -= objSize;}new(obj)T;//定位new 调用对象构造函数初始化return obj;}void Delete(T* obj){obj->~T();//调用对象析构函数*(void**)obj = _freeList;//头插_freeList = obj;}private:char* _memory = nullptr;//char*便于切割分配内存 指向大块内存void* _freeList = nullptr;//自由链表 管理归还的内存块size_t _remainBytes = 0;//记录剩余字节数
};

性能对比

在只创建定长对象的情况下,使用下面的代码对new/delete和定长内存池进行性能对比

#include "ObjectPool.h"
#include <vector>
#include <ctime>
using std::vector;struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode() :_val(0), _left(nullptr), _right(nullptr) {}
};void TestObjectPool()
{const size_t Rounds = 3;// 申请释放的轮次const size_t N = 1000000;// 每轮申请释放多少次std::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i) v1.push_back(new TreeNode);for (int i = 0; i < N; ++i) delete v1[i];v1.clear();}size_t end1 = clock();std::vector<TreeNode*> v2;v2.reserve(N);ObjectPool<TreeNode> TNPool;size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i) v2.push_back(TNPool.New());for (int i = 0; i < N; ++i) TNPool.Delete(v2[i]);v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}
int main()
{TestObjectPool();return 0;
}

在代码中,我们先用new申请若干个TreeNode对象,然后再用delete将这些对象再释放,通过clock函数得到整个过程消耗的时间。(new和delete底层就是封装的malloc和free)

然后再重复该过程,只不过将其中的new和delete替换为定长内存池当中的New和Delete,此时再通过clock函数得到该过程消耗的时间。

可以看到在这个过程中,定长内存池消耗的时间比malloc/free消耗的时间要短。这就是因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下定长内存池的效率更高,正所谓“尺有所短,寸有所长”。


高并发内存池整体框架设计

该项目解决的是什么问题?

如今很多的开发环境都是多核多线程,因此在申请内存的时,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是在并发场景下可能会因为频繁的加锁和解锁导致效率有所降低,而该项目的原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的内存池

在实现内存池时一般需要考虑到效率问题和内存碎片的问题,但对于高并发内存池来说,还需要考虑在多线程环境下的锁竞争问题

高并发内存池整体框架设计

高并发内存池主要由以下三个部分构成:

  • thread cache: 线程缓存是每个线程独有的,用于小于等于256KB的内存分配,每个线程独享一个thread cache。
  • central cache: 中心缓存是所有线程所共享的,当thread cache需要内存时会按需从central cache中获取内存,而当thread cache中的内存满足一定条件时,central cache也会在合适的时机对其进行回收。
  • page cache: 页缓存中存储的内存是以页为单位进行存储及分配的,当central cache需要内存时,page cache会分配出一定数量的页分配给central cache,而当central cache中的内存满足一定条件时,page cache也会在合适的时机对其进行回收,并将回收的内存尽可能的进行合并,组成更大的连续内存块,缓解内存碎片的问题。

进一步说明:

  • 每个线程都有一个属于自己的thread cache,也就意味着线程在thread_cache申请内存时是不需加锁的,而一次性申请大于256KB内存的情况是很少的,因此大部分情况下申请内存时都是无锁的,这也是高并发内存池高效的地方
  • 每个线程的thread cache会根据自己的情况向central cache申请或归还内存,这避免了出现单个线程的thread cache占用太多内存,而其余thread cache出现内存吃紧的问题
  • 多线程的thread cache可能会同时找central cache申请内存,此时就会涉及线程安全的问题,因此在访问central cache时是需要加锁的,但central cache实际上是一个哈希桶的结构,只有当多个线程同时访问同一个桶时才需要加锁,所以这里的锁竞争也不会很激烈

各个模块的作用

thread_cache主要解决锁竞争的问题,每个线程独享thread_cache,当自己的thread_cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在各自的thread_cache申请内存就行了

central_cache主要起到一个居中调度的作用,每个线程的thread_cache需要内存时从central _cache获取,而当thread_cache的内存多了就会将内存还给central_cache,其作用类似于一个中枢,因此取名为中心缓存

page_cache就负责提供以页为单位的大块内存,当central_cache需要内存时就会去向page_cache申请,而当page_cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块


threadcache

threadcache整体设计

定长内存池只需支持固定大小内存块的申请释放,因此定长内存池中只需一个自由链表管理释放回来的内存块。现在要支持申请和释放不同大小的内存块,那么就需要多个自由链表来管理释放回来的内存块。ThreadCache实际上是一个哈希桶结构,每个桶中存放的都是一个自由链表

ThreadCache支持小于等于256KB内存的申请,若将每种字节数的内存块都用一个自由链表进行管理的话,那么就需要20多万个(256*1024)自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的

此时可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如让这些字节数都按照8字节进行向上对齐。譬如当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推

因此当线程要申请某一大小的内存块时,就需要经过对齐规则计算得到对齐后的字节数,进而找到对应的哈希桶,若该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;若该自由链表已经为空了,那么就需要向下一层的CentralCache进行获取

但由于对齐的原因,就会产生一些碎片化的内存无法被利用,比如线程只申请了6Byte的内存,而ThreadCache却直接给了8Byte的内存,多给出的2Byte就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内碎片问题

ThreadCache哈希桶映射与对齐规则

内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针

但若所有的字节数都按照8字节进行对齐的话,那么就需要建立256 * 1024 ÷ 8 = 32768 个桶,这个数量还是比较多的,实际上可以让不同范围的字节数按照不同的对齐数进行对齐

字节数对齐数哈希桶下标

[1,128]

8[0,16)
[128 + 1,1024]16[16,72)
[128 + 1,8 × 1024]128[72,128)
[8 × 128 + 1,64 × 1024]1024[128,184
[64 × 1024 + 1,256 × 1024]8 × 1024[184,208)

空间浪费率

虽然对齐产生的内碎片会引起一定程度上的空间浪费,但按照上面的对齐规则,可以将浪费率控制到百分之十左右。

需要说明的是,1~128这个区间不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,并且小区间就算浪费率较高也并不会产生太大的浪费,这里从第二个区间开始进行计算

根据上面的公式,要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。

比如 129~1024 这个区间的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,即144。那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42%。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02% 和1023 ÷ 9216 ≈ 11.10%。

对齐和映射相关函数的编写

此时有了字节数的对齐规则后,我们就需要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,我们可以将其封装到一个类当中。

//管理对齐和映射等关系
class SizeClass
{
public://获取向上对齐后的字节数static inline size_t RoundUp(size_t bytes);//获取对应哈希桶的下标static inline size_t Index(size_t bytes);
};

需要注意的是,SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。

在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{assert(false);return -1;}
}

此时我们就需要编写一个子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数,最容易想到的就是下面这种写法。

//一般写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{size_t alignSize = 0;if (bytes%alignNum != 0){alignSize = (bytes / alignNum + 1)*alignNum;}else{alignSize = bytes;}return alignSize;
}

除了上述写法,我们还可以通过位运算的方式来进行计算,虽然位运算可能并没有上面的写法容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的。

//位运算写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{return ((bytes + alignNum - 1)&~(alignNum - 1));
}

对于上述位运算,以10字节按8字节对齐为例进行分析。8 − 1 = 7,7就是一个低三位为1其余位为0的二进制序列,将10与7相加,相当于将10字节当中不够8字节的剩余字节数补上了

然后我们再将该值与7按位取反后的值进行与运算,而7按位取反后是一个低三位为0其余位为1的二进制序列,该操作进行后相当于屏蔽了该值的低三位而该值的其余位保持不变,此时得到的值就是10字节按8字节对齐后的值,即16字节。

映射函数的编写

在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用子函数进一步处理

//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{//每个区间有多少个自由链表static size_t groupArray[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + groupArray[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];}else{assert(false);return -1;}
}

此时我们需要编写一个子函数来继续进行处理,容易想到的就是根据对齐数来计算某一字节数对应的下标。

//一般写法
static inline size_t _Index(size_t bytes, size_t alignNum)
{size_t index = 0;if (bytes%alignNum != 0){index = bytes / alignNum;}else{index = bytes / alignNum - 1;}return index;
}

为了提高效率下面也提供了一个用位运算来解决的方法,需要注意的是,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。

//位运算写法
static inline size_t _Index(size_t bytes, size_t alignShift)
{return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}

以10字节按8字节对齐为例进行分析。此时传入的alignShift就是3,将1左移3位后得到的实际上就是对齐数8,8 − 1 = 7 ,即还是让10与7相加。

之后再将该值向右移3位,实际上就是让17除以8,此时相当于屏蔽了该值二进制的低三位,因为除以8得到的值与其二进制的低三位无关,所以我们可以说是将10对齐后的字节数除以了8,此时得到了2,而最后还需要减一是因为数组的下标是从0开始的。

ThreadCache类

按照上述的对齐规则,thread cache中桶的个数,也就是自由链表的个数是208,以及thread cache允许申请的最大内存大小256KB,我们可以将这些数据按照如下方式进行定义。

//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;

现在就可以对ThreadCache类进行定义了,thread cache就是一个存储208个自由链表的数组,目前thread cache就先提供一个Allocate函数用于申请对象就行了,后面需要时再进行增加。

class ThreadCache
{
public://申请内存对象void* Allocate(size_t size);private:FreeList _freeLists[NFREELISTS]; //哈希桶
};

在thread cache申请对象时,通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回即可;但如果此时自由链表为空,那么我们就需要从central cache进行获取了,这里的FetchFromCentralCache函数也是thread cache类中的一个成员函数,在后面再进行具体实现。

//申请内存对象
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}

threadcacheTLS无锁访问

每个线程都有一个各自独享的ThreadCache,那应该如何创建这个ThreadCache?显然不能将这个ThreadCache创建为全局属性,因为全局变量是所有线程共享的,这样就不可避免的需要使用锁来进行控制,会增加了控制成本和代码复杂度,并且效率也会有所降低

要实现每个线程无锁的访问属于独自的ThreadCache,可以使用线程局部存储TLS(Thread Local Storage)。这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性

//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此在申请内存的函数中会包含以下逻辑。

//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{pTLSThreadCache = new ThreadCache;
}

centralcache

centralcache整体设计

当线程申请某一大小的内存时,如果thread cache中对应的自由链表不为空,那么直接取出一个内存块进行返回即可,但如果此时该自由链表为空,那么这时thread cache就需要向central cache申请内存了。

central cache的结构与thread cache是一样的,它们都是哈希桶的结构,并且它们遵循的对齐映射规则都是一样的。这样做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了。

central cache与thread cache的不同之处

central cache与thread cache有两个明显不同的地方,首先,thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是需要加锁的。

但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。

central cache与thread cache的第二个不同之处就是,thread cache的每个桶中挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span。

每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶被切成了对应的大小

centralcache结构设计

页号的类型

每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是2^{32}字节;而在64位平台下,进程地址空间的大小就是字节2^{64}

页的大小一般是4K或者8K,以8K为例。在32位平台下,进程地址空间就可以被分成 2^{32}÷2^{13}=2^{19}个页;在64位平台下,进程地址空间就可以被分成2^{64}÷2^{13}=2^{51}个页。页号本质与地址一样,都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位

由于页号在64位平台下的取值范围是[0,2^{51}) ,因此不能简单的用一个无符号整型来存储页号(只使用32位环境下),这时需要借助条件编译来解决该问题。

#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else//linux
#endif

需要注意的是,在32位下,_WIN32有定义,_WIN64没有定义;而在64位下,_WIN32和_WIN64都有定义。因此在条件编译时,我们应该先判断_WIN64是否有定义,再判断_WIN32是否有定义。

Span的结构

CentralCache的每个桶里挂的是一个个的Span,Span是管理以页为单位的大块内存的,其结构如下:

//管理以页为单位的大块内存
struct Span
{PAGE_ID _pageId = 0;        //大块内存起始页的页号size_t _n = 0;              //页的数量Span* _next = nullptr;      //双链表结构Span* _prev = nullptr;size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数void* _freeList = nullptr;  //切好的小块内存的自由链表
};

对于Span管理的以页为单位的大块内存,需要知道这块内存具体在哪一个位置,以便于之后PageCache进行前后页的合并缓解内存碎片问题,因此Span结构当中会记录所管理大块内存起始页的页号 (具体如何合并在后面讲解)

每一个Span管理多少页并不是固定的,由后面的算法来控制,因此span结构中有一个_num成员来代表着该Span管理的页的数量

每个Span管理的大块内存,都会被切成小内存块挂到当前Span的自由链表中,比如8Byte哈希桶中的Span,会被切成一个个8Byte大小的内存块挂到当前Span的自由链表中,因此Span结构中需要自由链表_freeList来存储小块内存块

Span结构中的_use_count成员记录的是,当前Span中已经分配给TreadCache的小块内存块,当某个Span的_use_count计数变为0时,代表当前Span分配出去的小内存块已经全部还回来了,此CentralCache就可以将这个Span再还给PageCache

每个桶当中的Span是以双链表的形式组织起来的,当需要将某个Span归还给PageCache时,就可以很方便的将该Span从双链表结构中移出。若用单链表结构的话则较为麻烦,因为单链表在删除时需要知道当前结点的前一个结点

双链表结构

CentralCache的每个哈希桶中存储的都是一个双链表结构,对于该双链表结构可以进行封装:

//带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head); //不能删除哨兵位的头结点Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* _head;
public:std::mutex _mtx; //桶锁
};

需要注意的是,从双链表删除的span会还给下一层的page cache,相当于只是把这个span从双链表中移除,因此不需要对删除的span进行delete操作。

central cache的结构

central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208,但central cache每个哈希桶中存储就是我们上面定义的双链表结构。

class CentralCache
{
public://...
private:SpanList _spanLists[NFREELISTS];
};

central cache和thread cache的映射规则一样,有一个好处就是,当thread cache的某个桶没有内存了,就可以直接去central cache对应的哈希桶进行申请就行了。

centralcache核心实现

所有线程使用的都是同一个CentralCache,即在整个项目中只需要有一个CentralCache对象即可,那么可以使用单例模式进行CentralCache的编写

单例模式可以保证项目中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式较为复杂,这里使用饿汉模式即可。

//单例模式
class CentralCache
{
public://提供一个全局访问点static CentralCache* GetInstance(){return &_sInst;}
private:SpanList _spanLists[NFREELISTS];
private:CentralCache() //构造函数私有{}CentralCache(const CentralCache&) = delete; //防拷贝static CentralCache _sInst;
};

为了保证CentralCache类只能创建一个对象,我们需要将central cache的构造函数和拷贝构造函数设置为私有,或者在C++11中也可以在函数声明的后面加上=delete进行修饰。

CentralCache类当中还需要有一个CentralCache类型的静态的成员变量,当程序运行起来后我们就立马创建该对象,在此后的程序中就只有这一个单例了。

CentralCache CentralCache::_sInst;

最后central cache还需要提供一个公有的成员函数,用于获取该对象,此时在整个进程中就只会有一个central cache对象了。

慢开始反馈调节算法

当thread cache向central cache申请内存时,central cache应该给出多少个对象呢?这是一个值得思考的问题,如果central cache给的太少,那么thread cache在短时间内用完了又会来申请;但如果一次性给的太多了,可能thread cache用不完也就浪费了。

鉴于此,我们这里采用了一个慢开始反馈调节算法。当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。

通过下面这个函数,我们就可以根据所需申请的对象的大小计算出具体给出的对象个数,并且可以将给出的对象个数控制到2~512个之间。也就是说,就算thread cache要申请的对象再小,我最多一次性给出512个对象;就算thread cache要申请的对象再大,我至少一次性给出2个对象。

static size_t MoveSize(size_t size)
{assert(size > 0);// [2, 512] 一次批量移动多少个对象的(慢启动)上限值int num = MAX_BYTES / size;if (num < 2) num = 2; //大对象一次批量上限低if (num > 512) num = 512; //小对象一次批量上限高return num;
}

但就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们可以在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在thread cache中的每个自由链表都会有一个自己的_maxSize。

//管理切分好的小对象的自由链表
class FreeList
{
public:size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr; //自由链表size_t _maxSize = 1;
};

此时当thread cache申请对象时,我们会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize的值,那么还会将thread cache中该自由链表的_maxSize的值进行加一。

thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSize增加了,最终就会申请到两个。直到该自由链表中_maxSize的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是计算出的个数。(这有点像网络中拥塞控制的机制)

从中心缓存获取对象

每次thread cache向central cache申请对象时,我们先通过慢开始反馈调节算法计算出本次应该申请的对象的个数,然后再向central cache进行申请。

如果thread cache最终申请到对象的个数就是一个,那么直接将该对象返回即可。为什么需要返回一个申请到的对象呢?因为thread cache要向central cache申请对象,其实由于某个线程向thread cache申请对象但thread cache当中没有,这才导致thread cache要向central cache申请对象。因此central cache将对象返回给thread cache后,thread cache会再将该对象返回给申请对象的线程。

但如果thread cache最终申请到的是多个对象,那么除了将第一个对象返回之外,还需要将剩下的对象挂到thread cache对应的哈希桶当中。

//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节算法//1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完//2、如果你不断有size大小的内存需求,那么batchNum就会不断增长,直到上限size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (batchNum == _freeLists[index].MaxSize()){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum >= 1); //至少有一个if (actualNum == 1) //申请到对象的个数是一个,则直接将这一个对象返回即可{assert(start == end);return start;}else //申请到对象的个数是多个,还需要将剩下的对象挂到thread cache中对应的哈希桶中{_freeLists[index].PushRange(NextObj(start), end);return start;}
}

从中心缓存获取一定数量的对象

这里要从central cache获取n个指定大小的对象,这些对象肯定都是从central cache对应哈希桶的某个span中取出来的,因此取出来的这n个对象是链接在一起的,只需要得到这段链表的头和尾即可,这里可以采用输出型参数进行获取。

//从central cache获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock(); //加锁//在对应哈希桶中获取一个非空的spanSpan* span = GetOneSpan(_spanLists[index], size);assert(span); //span不为空assert(span->_freeList); //span当中的自由链表也不为空//从span中获取n个对象//如果不够n个,有多少拿多少start = span->_freeList;end = span->_freeList;size_t actualNum = 1;while (NextObj(end)&&n - 1){end = NextObj(end);actualNum++;n--;}span->_freeList = NextObj(end); //取完后剩下的对象继续放到自由链表NextObj(end) = nullptr; //取出的一段链表的表尾置空span->_useCount += actualNum; //更新被分配给thread cache的计数_spanLists[index]._mtx.unlock(); //解锁return actualNum;
}

由于central cache是所有线程共享的,所以在访问central cache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉。

在向central cache获取对象时,先是在central cache对应的哈希桶中获取到一个非空的span,然后从这个span的自由链表中取出n个对象即可,但可能这个非空的span的自由链表当中对象的个数不足n个,这时该自由链表当中有多少个对象就给多少就行了。

thread cache实际从central cache获得的对象的个数可能与我们传入的n值是不一样的,因此需要统计本次申请过程中,实际thread cache获取到的对象个数,然后根据该值及时更新这个span中的小对象被分配给thread cache的计数。

需要注意的是,虽然实际申请到对象的个数可能比n要小,但这并不会产生任何影响。因为thread cache的本意就是向central cache申请一个对象,之所以要一次多申请一些对象,是因为这样一来下次线程再申请相同大小的对象时就可以直接在thread cache里面获取了,而不用再向central cache申请对象。

插入一段范围的对象到自由链表

如果thread cache最终从central cache获取到的对象个数是大于一的,那么还需要将剩下的对象插入到thread cache中对应的哈希桶中,为了能让自由链表支持插入一段范围的对象,还需要在FreeList类中增加一个对应的成员函数。

//管理切分好的小对象的自由链表
class FreeList
{
public://插入一段范围的对象到自由链表void PushRange(void* start, void* end){assert(start);assert(end);//头插NextObj(end) = _freeList;_freeList = start;}
private:void* _freeList = nullptr; //自由链表size_t _maxSize = 1;
};

pagecache

pagecache整体设计

page cache与central cache结构的相同之处

page cache与central cache一样,它们都是哈希桶的结构,并且page cache的每个哈希桶中里挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。

PageCache与CentralCache结构的不同之处

CentralCache的映射规则与ThreadCache保持一致,而PageCache的映射规则与它们都不相同PageCache的映射规则采用的是直接定址法,如1号桶挂的都是1页的Span,2号桶挂的都是2页的span,以此类推

CentralCache每个桶中的Span被切成了一个个对应大小的内存块,以供ThreadCache申请。而PageCache当中的Span是没有被进一步切小的,因为PageCache服务的是CentralCache,当CentralCache没有Span时,向PageCache申请某一固定页数的Span,而切分申请到的这个Span由CentralCache完成

page cache当中究竟有多少个桶,这就要看你最大想挂几页的span了,这里就最大挂128页的span,为了让桶号与页号对应起来,我们可以将第0号桶空出来不用,因此需要将哈希桶的个数设置为129。

//page cache中哈希桶的个数
static const size_t NPAGES = 129;

为什么这里最大挂128页的span呢?因为线程申请单个对象最大是256KB,而128页可以被切成4个256KB的对象,因此是足够的。当然,如果想在page cache中挂更大的span也是可以的,根据具体的需求进行设置就行了。

在page cache获取一个n页的span的过程

如果central cache要获取一个n页的span,就可以在page cache的第n号桶中取出一个span返回给central cache即可,但如果第n号桶中没有span了,这时并不是直接转而向堆申请一个n页的span,而是要继续在后面的桶当中寻找span。

直接向堆申请以页为单位的内存时,应该尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时可以将其切小后分配给线程,而当线程将内存释放后又可以将其合并成大块的连续内存。如果向堆申请内存时是小块小块的申请的,那么申请到的内存就不一定是连续的了。

因此,当第n号桶中没有span时,可以继续找第n+1号桶,因为可以将n+1页的span切分成一个n页的span和一个1页的span,这时就可以将n页的span返回,而将切分后1页的span挂到1号桶中。但如果后面的桶当中都没有span,这时就只能向堆申请一个128页的内存块,并将其用一个span结构管理起来,然后将128页的span切分成n页的span和128-n页的span,其中n页的span返回给central cache,而128-n页的span就挂到第128-n号桶中。

也就是说,每次向堆申请的都是128页大小的内存块,central cache要的这些span实际都是由128页的span切分出来的。

page cache的实现方式

当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶就可能同时向page cache申请内存的,所以page cache也是存在线程安全问题的,因此在访问page cache时也必须要加锁。

但是在page cache这里不能使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小后再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。

也就是说,在访问page cache时,可能需要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此在访问page cache时使用没有使用桶锁,而是用一个大锁将整个page cache给锁住。

而thread cache在访问central cache时,只需要访问central cache中对应的哈希桶就行了,因为central cache的每个哈希桶中的span都被切分成了对应大小,thread cache只需要根据自己所需对象的大小访问central cache中对应的哈希桶即可,不会访问其他哈希桶,因此central cache可以用桶锁。

此外,page cache在整个进程中也是只能存在一个的,因此我们也需要将其设置为单例模式。

//单例模式
class PageCache
{
public://提供一个全局访问点static PageCache* GetInstance(){return &_sInst;}
private:SpanList _spanLists[NPAGES];std::mutex _pageMtx; //大锁
private:PageCache() //构造函数私有{}PageCache(const PageCache&) = delete; //防拷贝static PageCache _sInst;
};

PageCache PageCache::_sInst;

pagecache中获取Span

获取一个非空的span

thread cache向central cache申请对象时,central cache需要先从对应的哈希桶中获取到一个非空的span,然后从这个非空的span中取出若干对象返回给thread cache。那central cache到底是如何从对应的哈希桶中,获取到一个非空的span的呢?

首先当然是先遍历central cache对应哈希桶当中的双链表,如果该双链表中有非空的span,那么直接将该span进行返回即可。为了方便遍历这个双链表,可以模拟迭代器的方式,给SpanList类提供Begin和End成员函数,分别用于获取双链表中的第一个span和最后一个span的下一个位置,也就是头结点。

//带头双向循环链表
class SpanList
{
public:Span* Begin(){return _head->_next;}Span* End(){return _head;}
private:Span* _head;
public:std::mutex _mtx; //桶锁
};

但如果遍历双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请内存块了。

那具体是向page cache申请多大的内存块呢?可以根据具体所需对象的大小来决定,就像之前根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,现在是根据对象的大小计算出,central cache一次应该向page cache申请几页的内存块。

可以先根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。也就是说,central cache向page cache申请内存时,要求申请到的内存尽量能够满足thread cache向central cache申请时的上限。

//管理对齐和映射等关系
class SizeClass
{
public://central cache一次向page cache获取多少页static size_t NumMovePage(size_t size){size_t num = NumMoveSize(size); //计算出thread cache一次向central cache申请对象的个数上限size_t nPage = num*size; //num个size大小的对象所需的字节数nPage >>= PAGE_SHIFT; //将字节数转换为页数if (nPage == 0) //至少给一页nPage = 1;return nPage;}
};

//页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;

需要注意的是,当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。

如何找到一个span所管理的内存块呢?首先需要计算出该span的起始地址,可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。

明确了这块内存的起始和结束位置后,就可以进行切分了。根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到span的自由链表中即可。

为什么是尾插呢?因为如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式结构链接起来的,而实际在物理上是连续的,这时把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。

//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{//1、先在spanList中寻找非空的spanSpan* it = spanList.Begin();while (it != spanList.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}//2、spanList中没有非空的span,只能向page cache申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));//计算span的大块内存的起始地址和大块内存的大小(字节数)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;//把大块内存切成size大小的对象链接起来char* end = start + bytes;//先切一块下来去做尾,方便尾插span->_freeList = start;start += size;void* tail = span->_freeList;//尾插while (start < end){NextObj(tail) = start;tail = NextObj(tail);start += size;}NextObj(tail) = nullptr; //尾的指向置空//将切好的span头插到spanListspanList.PushFront(span);return span;
}

需要注意的是,当把span切好后,需要将这个切好的span挂到central cache的对应哈希桶中。因此SpanList类还需要提供一个接口,用于将一个span插入到该双链表中。这里选择的是头插,这样当central cache下一次从该双链表中获取非空span时,一来就能找到。

由于SpanList类之前实现了Insert和Begin函数,这里实现双链表头插就非常简单,直接在双链表的Begin位置进行Insert即可。

//带头双向循环链表
class SpanList
{
public:void PushFront(Span* span){Insert(Begin(), span);}
private:Span* _head;
public:std::mutex _mtx; //桶锁
};

获取一个k页的span

当调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时,如果遍历哈希桶中的双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请若干页的span了,下面就来说说如何从page cache获取一个k页的span。

因为page cache是直接按照页数进行映射的,因此要从page cache获取一个k页的span,就应该直接先去找page cache的第k号桶,如果第k号桶中有span,可以直接头删一个span返回给central cache就行了。这里需要再给SpanList类添加对应的Empty和PopFront函数。

//带头双向循环链表
class SpanList
{
public:bool Empty(){return _head == _head->_next;}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}
private:Span* _head;
public:std::mutex _mtx; //桶锁
};

如果page cache的第k号桶中没有span,就应该继续找后面的桶,只要后面任意一个桶中有一个n页span,就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页的span返回给central cache,再将n-k页的span挂到page cache的第n-k号桶即可。

但如果后面的桶中也都没有span,此时就需要向堆申请一个128页的span了,在向堆申请内存时,直接调用封装的SystemAlloc函数即可。

需要注意的是,向堆申请内存后得到的是这块内存的起始地址,此时需要将该地址转换为页号。由于向堆申请内存时都是按页进行申请的,因此直接将该地址除以一页的大小即可得到对应的页号。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}//检查一下后面的桶里面有没有span,如果有可以将其进行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//尽量避免代码重复,递归调用自己return NewSpan(k);
}

这里说明一下,当向堆申请到128页的span后,需要将其切分成k页的span和128-k页的span,但是为了尽量避免出现重复的代码,最好不要再编写对应的切分代码。可以先将申请到的128页的span挂到page cache对应的哈希桶中,然后再递归调用该函数就行了,此时在往后找span时就一定会在第128号桶中找到该span,然后进行切分。

这里其实有一个问题:当central cache向page cache申请内存时,central cache对应的哈希桶是处于加锁的状态的,那在访问page cache之前我们应不应该把central cache对应的桶锁解掉呢?

这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cache除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞。

因此在调用NewSpan函数之前,需要先将central cache对应的桶锁解掉,然后再将page cache的大锁加上,当申请到k页的span后,需要将page cache的大锁解掉,但此时不需要立刻获取到central cache中对应的桶锁。因为central cache拿到k页的span后还会对其进行切分操作,因此可以在span切好后需要将其挂到central cache对应的桶上时,再获取对应的桶锁。

这里为了让代码清晰一点,只写出了加锁和解锁的逻辑,只需要将这些逻辑添加到之前实现的GetOneSpan函数的对应位置即可。

spanList._mtx.unlock(); //解桶锁
PageCache::GetInstance()->_pageMtx.lock(); //加大锁//从page cache申请k页的spanPageCache::GetInstance()->_pageMtx.unlock(); //解大锁//进行span的切分...spanList._mtx.lock(); //加桶锁//将span挂到central cache对应的哈希桶


threadcache回收内存

当某个线程申请的内存不使用了,可以将其归还给ThreadCache。ThreadCache将该内存块插入到对应哈希桶的自由链表中即可

但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个ThreadCache中却没被使用就会浪费,可以这些内存还给CentralCache。归还后这些内存对其他线程来说也是可申请的,因此当ThreadCache某个桶当中的自由链表太长时可以进行一些处理

若ThreadCache某个桶当中自由链表的长度 大于等于 其一次批量向CentralCache申请的内存块上限值_maxSize,那么此时就把该自由链表当中的这些内存块还给CentralCache

//释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);//找出对应的自由链表桶将对象插入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);//当自由链表长度大于一次批量申请的对象个数时就开始还一段list给central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}

当自由链表的长度大于一次批量申请的对象时,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给CentralCache中对应的Span即可

//释放对象导致链表过长,回收内存到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;//从list中取出一次批量个数的对象list.PopRange(start, end, list.MaxSize());//将取出的对象还给central cache中对应的spanCentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

从上述代码可以看出,FreeList类需要支持用Size函数获取自由链表中对象的个数,还需要支持用PopRange函数从自由链表中取出指定个数的对象。因此需要给FreeList类增加一个对应的PopRange函数,然后再增加一个_size成员变量,该成员变量用于记录当前自由链表中对象的个数,当向自由链表插入或删除对象时,都应该更新_size的值。

//管理切分好的小对象的自由链表
class FreeList
{
public://将释放的对象头插到自由链表void Push(void* obj){assert(obj);//头插NextObj(obj) = _freeList;_freeList = obj;_size++;}//从自由链表头部获取一个对象void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = NextObj(_freeList);_size--;return obj;}//插入一段范围的对象到自由链表void PushRange(void* start, void* end, size_t n){assert(start);assert(end);//头插NextObj(end) = _freeList;_freeList = start;_size += n;}//从自由链表获取一段范围的对象void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);//头删start = _freeList;end = start;for (size_t i = 0; i < n - 1;i++){end = NextObj(end);}_freeList = NextObj(end); //自由链表指向end的下一个对象NextObj(end) = nullptr; //取出的一段链表的表尾置空_size -= n;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList = nullptr; //自由链表size_t _maxSize = 1;size_t _size = 0;
};

而对于FreeList类当中的PushRange成员函数,最好也像PopRange一样给它增加一个参数,表示插入对象的个数,不然这时还需要通过遍历统计插入对象的个数。

因此之前在调用PushRange的地方就需要修改一下,而实际就在一个地方调用过PushRange函数,并且此时插入对象的个数也是很容易知道的。当时thread cache从central cache获取了actualNum个对象,将其中的一个返回给了申请对象的线程,剩下的actualNum-1个挂到了thread cache对应的桶当中,所以这里插入对象的个数就是actualNum-1。


centralcache回收内存

当ThreadCache中某个自由链表太长时,会将自由链表当中的这些内存块还给CentralCache中的Span,但是还给CentralCache的内存块不一定都是属于同一个Span的。CentralCache中的每个哈希桶中可能都不止一个Span,因此当计算出还回的内存应该还给CentralCache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个Span

根据小内存块的地址得到其所在的页号

某个页当中的所有地址除以页的大小都等该页的页号。比如假设一页的大小是100,那么地址0~99都属于第0页,并且除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1

找到小内存块对应的Span

虽然可以通过内存块的地址得到其所在的页号,但还是不能知道这个内存块属于哪个Span。因为一个Span管理的可能是多个页

为了解决这个问题,可以建立页号和Span之间的映射。由于这个映射关系在PageCache进行Span的合并时也需要用到,因此直接将其存放到PageCache中。

所以就需要在PageCache类中添加一个映射关系了,可以用C++当中的unordered_map进行实现,并且添加一个函数接口,用于让CentralCache获取这里的映射关系。(下面代码中只展示了PageCache类当中新增的成员)

//单例模式
class PageCache
{
public://获取从对象到span的映射Span* MapObjectToSpan(void* obj);
private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};

每当PageCache分配Span给CentralCache时,都需要记录一下页号和Span之间的映射关系。此后当ThreadCache还对象给CentralCache时,才知道应该还给哪一个Span

因此当CentralCache在调用NewSpan()接口向PageCache申请k页的Span时,PageCache在返回这个k页的Span给CentralCache之前,应该建立这k个页号与其Span之间的映射关系

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();//建立页号与span的映射,方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}//检查一下后面的桶里面有没有span,如果有可以将其进行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);//建立页号与span的映射,方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//尽量避免代码重复,递归调用自己return NewSpan(k);
}

此时就可以通过小内存块的地址 找到其对应页号 再找到其对应的Span了,直接将该内存块的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的Span即可

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}

central cache回收内存

这时当thread cache还对象给central cache时,就可以依次遍历这些对象,将这些对象插入到其对应span的自由链表当中,并且及时更新该span的_usseCount计数即可。

在thread cache还对象给central cache的过程中,如果central cache中某个span的_useCount减到0时,说明这个span分配出去的对象全部都还回来了,那么此时就可以将这个span再进一步还给page cache。

//将一定数量的对象还给对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock(); //加锁while (start){void* next = NextObj(start); //记录下一个Span* span = PageCache::GetInstance()->MapObjectToSpan(start);//将对象头插到span的自由链表NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--; //更新被分配给thread cache的计数if (span->_useCount == 0) //说明这个span分配出去的对象全部都回来了{//此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并_spanLists[index].Erase(span);span->_freeList = nullptr; //自由链表置空span->_next = nullptr;span->_prev = nullptr;//释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉_spanLists[index]._mtx.unlock(); //解桶锁PageCache::GetInstance()->_pageMtx.lock(); //加大锁PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock(); //解大锁_spanLists[index]._mtx.lock(); //加桶锁}start = next;}_spanLists[index]._mtx.unlock(); //解锁
}

若把某个Span还给PageCache,需先将这个Span从CentralCache对应的双链表中移除,然后再将该Span的自由链表置空,因为PageCache中的Span是不需要切分成一个个的小对象的,以及该Span的前后指针也都应该置空,因为之后要将其插入到PageCache对应的双链表中。但Span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了

在CentralCache还Span给PageCache时也存在锁的问题,此时需要先将CentralCache中对应的桶锁解开,然后加上PageCache的大锁之后再进入PageCache进行相关操作(此时别的线程可以在桶中进行申请,并不会影响该线程归还,解开桶锁之前已经将要归还给PageCache的Span从CentralCache中的双链表中移除,其他线程无法找到该Span)

当处理完毕回到CentralCache时,除了将PageCache的大锁解开,还需立刻获得CentralCache对应的桶锁,然后将还未还完对象继续还给CentralCache中对应的Span


pagecache回收内存

若CentralCache中有某个Span的_useCount减到0了,那么CentralCache就需将这个Span还给PageCache

这个过程看似是非常简单的,PageCache只需将还回来的Span挂到对应的哈希桶上即可。但实际为了缓解内存外碎片问题,PageCache还需尝试将还回来的Span与其他空闲的Span进合并。

page cache进行前后页的合并

合并的过程可以分为向前合并和向后合并:

若还回来的Span的起始页号是num,该Span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应Span是否空闲,若空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止

而在向后合并时,就需要判断第num+n页对应的Span是否空闲,若空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止

因此PageCache在合并Span时需要通过页号获取到对应的Span的,这就是把页号与Span之间的映射关系存储到page cache的原因

但当通过页号找到其对应的Span时,这个Span此时可能挂在PageCache,也可能挂在Central Cache。而在只能合并挂在PageCache的Span,因为挂在CentralCache的Span中的内存块正在被其他线程使用

可是并不能通过Span结构当中的_useCount成员,来判断某个Span是在CentralCache还是在Page Cache。因为当CentralCache刚向PageCache申请到一个Span时,这个Span的_useCount就是等于0的,这时可能正在对该Span进行切分时,PageCache就把这个Span拿去进行合并了,这显然是不合理的

于是,可以在Span结构中增加一个_isUse成员,用于标记这个Span是否正在被使用,而当一个Span结构被创建时默认该Span是没有被使用的

//管理以页为单位的大块内存
struct Span
{PAGE_ID _pageId = 0;        //大块内存起始页的页号size_t _n = 0;              //页的数量Span* _next = nullptr;      //双链表结构Span* _prev = nullptr;size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数void* _freeList = nullptr;  //切好的小块内存的自由链表bool _isUse = false;        //是否在被使用
};

当CentralCache向PageCache申请到一个span时,需立即将该Span中的_isUse改为true

span->_isUse = true;

当CentralCache将某个Span还给PageCache时,也需将该Span的_isUse改为false

span->_isUse = false;

由于在合并PageCache当中的Span时,需要通过页号找到其对应的Span,因此PageCache中的Span也需要建立页号与Span之间的映射关系

与CentralCache中的Span不同的是,在PageCache中的Span只需建立Span的首尾页号与该Span之间的映射关系。因为当一个Span在尝试进行合并时,若是往前合并,那么只需要通过Span的尾页找到这个Span;若是向后合并,那么只需要通过Span的首页找到这个Span。即在进行合并时只需要用到Span与其首尾页之间的映射关系

因此获取k页的Span时,若是将n页的Span切成了一个k页的Span和一个n-k页的Span,除了需要建立k页Span中每个页与该Span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();//建立页号与span的映射,方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}//检查一下后面的桶里面有没有span,如果有可以将其进行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立页号与span的映射,方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//尽量避免代码重复,递归调用自己return NewSpan(k);
}

此时PageCache当中的Span就都与其首尾页之间建立了映射关系,合并代码如下:

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//对span的前后页,尝试进行合并,缓解内存碎片问题//1、向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//前面的页号没有(还未向系统申请),停止向前合并if (ret == _idSpanMap.end()){break;}//前面的页号对应的span正在被使用,停止向前合并Span* prevSpan = ret->second;if (prevSpan->_isUse == true){break;}//合并出超过128页的span无法进行管理,停止向前合并if (prevSpan->_n + span->_n > NPAGES - 1){break;}//进行向前合并span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//将prevSpan从对应的双链表中移除_spanLists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}//2、向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);//后面的页号没有(还未向系统申请),停止向后合并if (ret == _idSpanMap.end()){break;}//后面的页号对应的span正在被使用,停止向后合并Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}//合并出超过128页的span无法进行管理,停止向后合并if (nextSpan->_n + span->_n > NPAGES - 1){break;}//进行向后合并span->_n += nextSpan->_n;//将nextSpan从对应的双链表中移除_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}//将合并后的span挂到对应的双链表当中_spanLists[span->_n].PushFront(span);//建立该span与其首尾页的映射_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;//将该span设置为未被使用的状态span->_isUse = false;
}
  • 若没有通过页号获取到对应的Span,说明对应到该页的内存块还未申请,此时需停止合并
  • 若通过页号获取到了其对应的Span,但该Span处于被使用的状态,也必须停止合并
  • 若合并后大于128页则不能进行合并,因为PageCache无法对大于128页的Span进行管理
  • 在合并Span时,由于这个Span是在PageCache的某个哈希桶的双链表当中,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的Span结构进行delete操作
  • 在合并结束后,将合并后的Span挂入PageCache对应哈希桶的双链表中,并且需要建立该Span与其首位页之间的映射关系,便于以后能合并出更大的Span

大于256KB的大块内存申请问题

申请过程

每个线程的ThreadCache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,可以考虑直接向PageCache申请,但PageCache中最大的页也只有128页,因此若是大于128页的内存申请,则只能直接向堆申请

申请内存的大小申请方式
x ≤ 256KB (32页)向thread cache申请
32页 < x ≤ 128页向page cache申请
x ≥ 128页向堆申请
当申请的内存大于256KB时,虽然不是从thread cache进行获取,但在分配内存时也是需要进行向上对齐的,对于大于256KB的内存我们可以直接按页进行对齐。

而我们之前实现RoundUp函数时,对传入字节数大于256KB的情况直接做了断言处理,因此这里需要对RoundUp函数稍作修改。

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{//大于256KB的按页对齐return _RoundUp(bytes, 1 << PAGE_SHIFT);}
}

现在对于之前的申请逻辑就需要进行修改了,当申请对象的大小大于256KB时,就不用向thread cache申请了,这时先计算出按页对齐后实际需要申请的页数,然后通过调用NewSpan申请指定页数的span即可。

static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES) //大于256KB的内存申请{//计算出对齐后需要申请的页数size_t alignSize = SizeClass::RoundUp(size);size_t kPage = alignSize >> PAGE_SHIFT;//向page cache申请kPage页的spanPageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}

即申请大于256KB的内存时,会直接调用PageCache当中的NewSpan()函数进行申请

因此需要再对NewSpan()函数进行改造,当需要申请的内存页数大于128页时,就直接向堆区申请对应页数的内存块。而如果申请的内存页数是小于128页的,那就在PageCache中进行申请。因此当申请大于256KB的内存调用NewSpan()函数时也是需要加锁的,因为可能是在PageCache中进行申请的

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0);if (k > NPAGES - 1) //大于128页直接找堆申请{void* ptr = SystemAlloc(k);Span* span = new Span;span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;//建立页号与span之间的映射_idSpanMap[span->_pageId] = span;return span;}//先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();//建立页号与span的映射,方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}//检查一下后面的桶里面有没有span,如果有可以将其进行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立页号与span的映射,方便central cache回收小块内存时查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//尽量避免代码重复,递归调用自己return NewSpan(k);
}

释放过程

当释放对象时,我们需要判断释放对象的大小:

释放内存的大小释放方式
x ≤ 256KB (32页)释放给thread cache
32页 < x ≤ 128页释放给page cache
x ≥ 128页释放给堆

当释放内存块时,需先找到该内存块对应的Span。之前在申请大于256KB的内存时,已经给申请到的内存建立Span结构,并建立了起始页号与该Span之间的映射关系。此时就可以通过内存块的起始地址计算出起始页号,进而通过页号找到该对象对应的Span

static void ConcurrentFree(void* ptr, size_t size)
{if (size > MAX_BYTES) //大于256KB的内存释放{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}

PageCache在回收Span时也需要进行判断,若该Span的大小是小于等于128页的,那么直接还给PageCache即可,PageCache会尝试对其进行合并。而若该Span的大小是大于128页的,那么说明该Span是直接向堆区申请的,直接将这块内存释放给堆区,然后将这个Span结构进行delete

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{if (span->_n > NPAGES - 1) //大于128页直接释放给堆{void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}//对span的前后页,尝试进行合并,缓解内存碎片问题//1、向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//前面的页号没有(还未向系统申请),停止向前合并if (ret == _idSpanMap.end()){break;}//前面的页号对应的span正在被使用,停止向前合并Span* prevSpan = ret->second;if (prevSpan->_isUse == true){break;}//合并出超过128页的span无法进行管理,停止向前合并if (prevSpan->_n + span->_n > NPAGES - 1){break;}//进行向前合并span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//将prevSpan从对应的双链表中移除_spanLists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}//2、向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);//后面的页号没有(还未向系统申请),停止向后合并if (ret == _idSpanMap.end()){break;}//后面的页号对应的span正在被使用,停止向后合并Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}//合并出超过128页的span无法进行管理,停止向后合并if (nextSpan->_n + span->_n > NPAGES - 1){break;}//进行向后合并span->_n += nextSpan->_n;//将nextSpan从对应的双链表中移除_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}//将合并后的span挂到对应的双链表当中_spanLists[span->_n].PushFront(span);//建立该span与其首尾页的映射_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;//将该span设置为未被使用的状态span->_isUse = false;
}

向区堆申请内存时调用的系统接口是VirtualAlloc(),则将内存释放的接口为VirtualFree(),而Linux下的brk和mmap对应的释放系统接口为sbrk和unmmap。此时可以将这些释放接口封装成一个叫做SystemFree()的接口,当需要将内存释放给堆时直接调用SystemFree()即可。

//直接将内存还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else//linux下sbrk unmmap等
#endif
}


使用定长内存池配合脱离使用new

tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不可能调用malloc函数的,但当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc函数

为了完全脱离掉malloc函数,之前实现的定长对象内存池就该发挥作用了,代码中使用new时基本都是为Span结构的对象申请空间,而Span对象基本都是在PageCache层创建的,因此可以在PageCache类当中定义一个_spanPool,用于Span对象的申请和释放。

//单例模式
class PageCache
{
public://...
private:ObjectPool<Span> _spanPool;
};

然后将代码中使用new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数

//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);

每个线程第一次申请内存时也都会创建其专属的ThreadCache对象,而这个ThreadCache目前也是new出来的,也需要对其进行替换

//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{static std::mutex tcMtx;static ObjectPool<ThreadCache> tcPool;tcMtx.lock();pTLSThreadCache = tcPool.New();tcMtx.unlock();
}

这里将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建自己的thread cache时,都在个定长内存池中申请内存就行了。

但注意在从该定长内存池中申请内存时需要加锁,防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题。


释放对象时优化为不传对象大小

当使用malloc()函数申请内存时,需要指明申请内存的大小;当使用free()函数释放内存时,只需要传入指向这块内存的指针即可

而目前讲述的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小

若也想在释放对象时不用传入对象的大小,那么就需要建立内存块地址与对象大小之间的关系。由于现在可以通过内存块的地址找到其对应的Span,而Span的自由链表中挂的都是相同大小的小内存块,因此可以在Span结构中再增加一个_objSize成员,该成员代表着这个Span管理的小内存块的大小

//管理以页为单位的大块内存
struct Span
{PAGE_ID _pageId = 0;        //大块内存起始页的页号size_t _n = 0;              //页的数量Span* _next = nullptr;      //双链表结构Span* _prev = nullptr;size_t _objSize = 0;        //切好的小对象的大小size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数void* _freeList = nullptr;  //切好的小块内存的自由链表bool _isUse = false;        //是否在被使用
};

而所有的Span都是从PageCache中获取的,因此每当调用NewSpan()获取到一个k页的Span时,就应该将这个Span的_objSize保存下来

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;

代码中有两处:

  • 在CentralCache获取非空Span时,若CentralCache对应的桶中没有非空的Span,此时会调用NewSpan()获取一个k页的span
  • 另一处是当申请大于256KB内存时,会直接调用NewSpan()获取一个k页的Span

当释放小内存块时,就可以直接从内存块对应的Span中获取到该内存块的大小,准确来说获取到的是对齐以后的大小

static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size > MAX_BYTES) //大于256KB的内存释放{PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}

读取映射关系时的加锁问题

页号与Span之间的映射关系是存储在PageCache类中的,当访问这个映射关系时是需要加锁的,因为STL容器是不保证线程安全的

若是在CentralCache访问这个unordered_map,或是在调用hcfree()函数释放内存时访问这个容器,那么就存在线程安全的问题。因为可能存在某个线程在PageCache中进行某些操作导致unordered_map修改,而其他线程可能也正在访问这个映射关系,因此当在PageCache外部访问这个映射关系时是需要加锁的

实际就是在调用PageCache对外提供访问映射关系的函数时需要加锁,可以考虑使用C++中的unique_lock

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号std::unique_lock<std::mutex> lock(_pageMtx); //构造时加锁,析构时自动解锁auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}


多线程环境下对比malloc测试

在多线程场景下对比malloc进行测试。

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}int main()
{size_t n = 10000;cout << "==========================================================" <<endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" <<endl;return 0;
}

其中测试函数各个参数的含义如下:

  • ntimes:单轮次申请和释放内存的次数。
  • nworks:线程数。
  • rounds:轮次。

在测试函数中,通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后就得到了,nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间。

固定大小内存的申请和释放

先来测试一下固定大小内存的申请和释放:

v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));

此时4个线程执行10轮操作,每轮申请释放10000次,总共申请释放了40万次,运行后可以看到,malloc的效率还是更高的。

由于此时我们申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自thread cache的同一个桶,当thread cache的这个桶中没有对象或对象太多要归还时,也都会访问central cache的同一个桶。此时central cache中的桶锁就不起作用了,因为让central cache使用桶锁的目的就是为了,让多个thread cache可以同时访问central cache的不同桶,而此时每个thread cache访问的却都是central cache中的同一个桶。

不同大小内存的申请和释放

再来测试一下不同大小内存的申请和释放:

v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

由于申请和释放内存的大小是不同的,此时CentralCache当中的桶锁就起作用了,hcmalloc的效率也有了较大增长,但总体来说还是差一点点


针对性能瓶颈使用基数树进行优化

基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。

单层基数树

单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中在以该页号为下标的位置。

最坏的情况下我们需要建立所有页号与其span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应span的指针。

//单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
public:typedef uintptr_t Number;explicit TCMalloc_PageMap1(){size_t size = sizeof(void*) << BITS; //需要开辟数组的大小size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按页对齐后的大小array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申请空间memset(array_, 0, size); //对申请到的内存进行清理}void* get(Number k) const{if ((k >> BITS) > 0) //k的范围不在[0, 2^BITS-1]{return NULL;}return array_[k]; //返回该页号对应的span}void set(Number k, void* v){assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]array_[k] = v; //建立映射}
private:void** array_; //存储映射关系的数组static const int LENGTH = 1 << BITS; //页的数目
};

代码中的非类型模板参数BITS表示存储页号最多需要bit位的个数。在32位下传入的是32-PAGE_SHIFT,在64位下传入的是64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即2^{BITS}

如32位平台下,以一页大小为8K为例,此时页的数目就是2^{32},因此存储页号最多需要19个比特位,此时传入非类型模板参数的值就是32 − 13 = 19 。由于32位平台下指针的大小是4字节,因此该数组的大小就是2^{19} * 4 = 2^{21} =2M,内存消耗不大,是可行的。但若是在64位平台下,此时该数组的大小是2^{51} * 8 = 2^{54} = 2^{24}G,明显是不可行的,对于64位的平台需使用三层基数树。

二层基数树

以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个bit位

而二层基数树实际上就是把这19个比特位分为两次进行映射。如用前5个bit位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的bit位在基数树的第二层进行映射,映射后最终得到该页号对应的Span指针

在二层基数树中,第一层的数组占用2^{5} * 4 =2^{7} Byte空间,第二层的数组最多占用2^{5} * 2^{14} * 4 =  2^{21} = 2M。二层基数树相比一层基数树的好处就是,一层基数树必须一开始就把2M的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组即可

//二层基数树
template <int BITS>
class TCMalloc_PageMap2
{
private:static const int ROOT_BITS = 5;                //第一层对应页号的前5个比特位static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数//第一层数组中存储的元素类型struct Leaf{void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; //第一层数组
public:typedef uintptr_t Number;explicit TCMalloc_PageMap2(){memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理PreallocateMoreMemory(); //直接将第二层全部开辟}void* get(Number k) const{const Number i1 = k >> LEAF_BITS;        //第一层对应的下标const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标if ((k >> BITS) > 0 || root_[i1] == NULL) //页号值不在范围或没有建立过映射{return NULL;}return root_[i1]->values[i2]; //返回该页号对应span的指针}void set(Number k, void* v){const Number i1 = k >> LEAF_BITS;        //第一层对应的下标const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v; //建立该页号与对应span的映射}//确保映射[start,start_n-1]页号的空间是开辟好了的bool Ensure(Number start, size_t n){for (Number key = start; key <= start + n - 1;){const Number i1 = key >> LEAF_BITS;if (i1 >= ROOT_LENGTH) //页号超出范围return false;if (root_[i1] == NULL) //第一层i1下标指向的空间未开辟{//开辟对应空间static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查}return true;}void PreallocateMoreMemory(){Ensure(0, 1 << BITS); //将第二层的空间全部开辟好}
};

因此在二层基数树中有一个Ensure(0函数,当需要建立某一页号与其Span之间的映射关系时,需先调用该Ensure()函数确保用于映射该页号的空间是开辟了的,若没有开辟则会立即开辟。

而在32位平台下,就算将二层基数树第二层的数组全部开辟出来也就消耗了2M的空间,内存消耗也不算太多,因此可以在构造二层基数树时就把第二层的数组全部开辟出来

三层基数树

上述一层基数树和二层基数树都适用于32位平台,而对于64位的平台就需要用三层基数树

三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干bit位分为三次进行映射

只有当要建立某一页号的映射关系时再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数组空间,可以在一定程度上节省内存空间,因此也存在Ensure()函数

//三层基数树
template <int BITS>
class TCMalloc_PageMap3
{
private:static const int INTERIOR_BITS = (BITS + 2) / 3;       //第一、二层对应页号的比特位个数static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二层存储元素的个数static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数static const int LEAF_LENGTH = 1 << LEAF_BITS;         //第三层存储元素的个数struct Node{Node* ptrs[INTERIOR_LENGTH];};struct Leaf{void* values[LEAF_LENGTH];};Node* NewNode(){static ObjectPool<Node> nodePool;Node* result = nodePool.New();if (result != NULL){memset(result, 0, sizeof(*result));}return result;}Node* root_;
public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(){root_ = NewNode();}void* get(Number k) const{const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标//页号超出范围,或映射该页号的空间未开辟if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL){return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回该页号对应span的指针}void set(Number k, void* v){assert(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立该页号与对应span的映射}//确保映射[start,start+n-1]页号的空间是开辟好了的bool Ensure(Number start, size_t n){for (Number key = start; key <= start + n - 1;){const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下标值超出范围return false;if (root_->ptrs[i1] == NULL) //第一层i1下标指向的空间未开辟{//开辟对应空间Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二层i2下标指向的空间未开辟{//开辟对应空间static ObjectPool<Leaf> leafPool;Leaf* leaf = leafPool.New();if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查}return true;}void PreallocateMoreMemory(){}
};

因此当要建立某一页号的映射关系时,需要先确保存储该页映射的数组空间是开辟好了的,也就是调用代码中的Ensure函数,如果对应数组空间未开辟则会立马开辟对应的空间。


使用基数树进行优化代码实现

代码更改

现在我们用基数树对代码进行优化,此时将PageCache类当中的unorder_map用基数树进行替换即可,由于当前是32位平台,因此这里随便用几层基数树都可以。

//单例模式
class PageCache
{
public://...
private://std::unordered_map<PAGE_ID, Span*> _idSpanMap;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
};

需要建立页号与span的映射时,调用基数树中的set函数

_idSpanMap.set(span->_pageId, span);

需要读取某一页号对应的Span时,调用基数树中的get函数

Span* ret = (Span*)_idSpanMap.get(id);

用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号Span* ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;
}

为什么读取基数树映射关系时不需要加锁?

当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的

因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当在插入数据时其底层的结构都有可能会发生变化。如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此在读取映射关系的时候是需要加锁的

而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且不会同时对同一个页进行读取映射和建立映射的操作,因为只有在释放对象时才需要读取映射,而建立映射的操作都是在PageCache中进行的。即读取映射时读取的都是对应Span的_useCount不等于0的页,而建立映射时建立的都是对应Span的_useCount等于0的页,所以不会同时对同一个页进行读取映射和建立映射的操作


项目源码

Project/ConcurrentMemoryPool at 9244e2ac8364c8fb327d5f719ae676a7c0f0f336 · Qingxvan/Project · GitHub项目. Contribute to Qingxvan/Project development by creating an account on GitHub.icon-default.png?t=O83Ahttps://github.com/Qingxvan/Project/tree/9244e2ac8364c8fb327d5f719ae676a7c0f0f336/ConcurrentMemoryPool

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Rust 数据类型
  • 坐牢第三十七天(Qt)
  • 【C++算法】分治——归并
  • 每日必抢小程序下单总结
  • C++——深部解析哈希
  • 助力汽车零部件产业发展,2025 第十二届广州国际汽车零部件加工技术及汽车模具展览会与您相约“羊城”广州
  • 了解elementUI的底层源码, 进行二次开发
  • SpringBoot项目获取统一前缀配置以及获取非确定名称配置
  • python画图|3D surface基础教程
  • 【诉讼流程-健身房-违约-私教课-多次沟通无效-民事诉讼-自我学习-铺平通往法律的阶梯-讲解(1)】
  • tensor 的运算(加法、点乘、矩阵乘法)
  • node.js框架StrongLoop快速入门实战
  • Python编码系列—Python建造者模式:构建复杂对象的优雅之道
  • C++学习笔记(22)
  • llvm后端之函数栈帧
  • 时间复杂度分析经典问题——最大子序列和
  • #Java异常处理
  • 【腾讯Bugly干货分享】从0到1打造直播 App
  • axios 和 cookie 的那些事
  • chrome扩展demo1-小时钟
  • ECS应用管理最佳实践
  • If…else
  • JS学习笔记——闭包
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • Linux快速复制或删除大量小文件
  • mac修复ab及siege安装
  • Python学习之路16-使用API
  • react-native 安卓真机环境搭建
  • React中的“虫洞”——Context
  • Redis的resp协议
  • Redux 中间件分析
  • 浮动相关
  • 关于springcloud Gateway中的限流
  • 回流、重绘及其优化
  • 基于Dubbo+ZooKeeper的分布式服务的实现
  • 记录:CentOS7.2配置LNMP环境记录
  • 猫头鹰的深夜翻译:Java 2D Graphics, 简单的仿射变换
  • 盘点那些不知名却常用的 Git 操作
  • 前端存储 - localStorage
  • 什么软件可以提取视频中的音频制作成手机铃声
  • 首页查询功能的一次实现过程
  • FaaS 的简单实践
  • Nginx实现动静分离
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • #php的pecl工具#
  • #控制台大学课堂点名问题_课堂随机点名
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (附源码)spring boot建达集团公司平台 毕业设计 141538
  • (七)Activiti-modeler中文支持
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (算法)硬币问题
  • (小白学Java)Java简介和基本配置
  • (转)一些感悟
  • (转)原始图像数据和PDF中的图像数据
  • *算法训练(leetcode)第四十七天 | 并查集理论基础、107. 寻找存在的路径