当前位置: 首页 > news >正文

【实战项目】高并发内存池(模拟实现mini_tcmalloc)

        本博客主要介绍了从零实现一个高并发内存池的过程。主要包括项目背景、所用的重难点技术、项目整体框架搭建、项目实现细节、项目过程中遇到的问题以及是如何解决的。完成该项目,至少需要掌握的预备知识有:C++数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等。

        项目概述:该项目参考Google的开源项目tcmalloc,模拟实现了一个mini版本的高并发内存池。通过定长内存池、三层缓存内存池的代码结构框架,能够实现在多线程高并发条件下比C++原生malloc、free更高效的获取和回收空间(提高多线程下的效率),并且能够在一定程度上缓解内存碎片问题。

        学习心得:通过该项目的学习,加深了我对操作系统内存管理的理解,熟悉了常见的数据结构(比如链表、哈希桶等),提高了多线程环境下的编程和调试技巧;同时也让我见识了一些大神的解决问题的思路(比如这种自由链表的结构就很巧妙,从内存池中拿取对象时优先从自由链表上拿,提高了效率;再比如形成链表的方式,通过二级指针解引用获取对象前4/8个字节来保存下一个对象的地址)。

        项目源代码已上传gitee:mini_tcmalloc: This is a repository for project code

目录

1. 项目介绍

1.1 背景及概述

1.2 开发环境

1.3 池化技术

1.3.1 内存池

1.3.2 内存碎片

1.4 源码赏析tcmalloc

2. 主要技术点

2.1 TLS机制

2.2 基数树(radix tree)

2.3 windows和Linux下如何直接向堆申请页为单位的大块内存?

3. 整体框架

3.1 项目框架

3.2 代码部署

4. 项目实现

4.1定长内存池的实现

4.2 thread cache 申请内存

4.3 central cache 申请内存

4.4 page cache 申请内存

4.5 thread cache 回收内存

4.6 central cache 回收内存

4.7 page cache 回收内存

4.8 代码优化

4.8.1 大于256KB的大块内存申请问题

4.8.2 使用定长内存池配合脱离使用new

4.8.3 释放对象时优化为不传对象大小

4.8.4 多线程环境下对比malloc测试

4.9 打包成静态库

5. 项目总结


1. 项目介绍

1.1 背景及概述

当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc和free、new和delete)。现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,而tcmalloc就是在多线程高并发的场景下更胜一筹。

tcmalloc是全球大厂google开源的,可以认为当时顶尖的C++高手写出来的,他的知名度也是非常高的,不少公司都在用它,Go语言直接用它做了自己内存分配器。

本项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个mini版本的高并发内存池,目的就是学习tcamlloc的精华,mini_tcmalloc

1.2 开发环境

开发环境Visual Studio 2019
主要技术C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等
实现的功能高并发内存池,实现在多线程条件下比malloc、free更高效的获取和回收堆空间(提高多线程下的效率),并且能够在一定程度上解决内存碎片问题。

1.3 池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。

在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

1.3.1 内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

1.3.2 内存碎片

内存碎片分为外碎片和内碎片。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。

外碎片如下所示:

1.4 源码赏析tcmalloc

Github:standard-project/ConcurrentMemoryPool at main · chenlong-cxy/standard-project · GitHub

2. 主要技术点

2.1 TLS机制

TSL机制(thread local storage):线程局部存储机制,是为了保证某些全局变量是某个线程才能访问,保持了数据的线程独立性。

比如,一个进程中全局变量是每个线程共享的,但是TSL机制可以让全局变量对特定的线程有不同的意义。这样,多线程在创建thread cache时,避免了加锁。这样就可以通过定义一个全局变量thread cache,而让每个线程都获取自己的thread cache。

定义方式,在变量前面加上thread。eg:

_declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

p就是ThreadCache类型的指针,并且定义成了线程局部变量。其中,_declspec(thread)的前缀是Microsoft添加给Visual C++编译器的一个修改符。它告诉编译器,对应的变量应该放入可执行文件或DLL文件中它的自己的节中。

_declspec(thread)后面的变量必须声明为一个全局变量或静态变量,不能声明为一个类型的局部变量。

2.2 基数树(radix tree)

1. 基数树(radix tree)是将long整数与指针键值相关联的机制,它存储有效率,并且可快速查询,用于整数值与指针的映射,对于长整型数据的映射,如何解决Hash冲突和Hash表大小的设计是一个很头疼的问题,利用radix树可以根据一个长整型(比如一个长ID)快速查找到其对应的对象指针。这比用hash映射来的简单,也更节省空间,使用hash映射hash函数难以设计,不恰当的hash函数可能增大冲突,或浪费空间。

2. 基数树的空间使用更加灵活,只有当需要用到某节点时才会去创建它,可以参考tcmalloc的pagemap(key是释放内存的pageid,value是该pageid对应的span)以及内核的页高速缓存中的基数树(key是相对文件起始位置的第几页,value是对应的页描述符)。

本项目中使用了基数树代替map/unordered_map来建立pageid和span指针之间的映射关系,提高了效率。因为读取radix tree中的映射关系时,不需要加锁。原因如下:

        当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的。因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。

  而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在page cache进行的。也就是说,读取映射时读取的都是对应span的_useCount不等于0的页,而建立映射时建立的都是对应span的_useCount等于0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。

2.3 windows和Linux下如何直接向堆申请页为单位的大块内存?

  • VirtualAlloc
  • brk和mmap
inline static void* SystemAlloc(size_t kpage) {
#ifdef _WIN32
    void* ptr = VirtualAlloc(0, kpage*(1 << PAGE_SHIFT),
    MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
    //brk mmap等
#endif
    if (ptr == nullptr) throw std::bad_alloc();
    return ptr; 
}

inline static void SystemFree(void* ptr) {
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);
#else
    //sbrk unmmap等
#endif
}

3. 整体框架

本项目实现的内存池需要考虑以下几方面的问题

  • 1. 性能问题。
  • 2. 多线程环境下,锁竞争问题。
  • 3. 内存碎片问题。

3.1 项目框架

Concurrent memory pool主要由以下3个部分构成:

1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。

2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。

3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

每个线程都有一个属于自己的thread cache,也就意味着线程在thread cache申请内存时是不需要加锁的,而一次性申请大于256KB内存的情况是很少的,因此大部分情况下申请内存时都是无锁的,这也就是这个高并发内存池高效的地方。

每个线程的thread cache会根据自己的情况向central cache申请或归还内存,这就避免了出现单个线程的thread cache占用太多内存,而其余thread cache出现内存吃紧的问题。多线程的thread cache可能会同时找central cache申请内存,此时就会涉及线程安全的问题,因此在访问central cache时是需要加锁的,但central cache实际上是一个哈希桶的结构,只有当多个线程同时访问同一个桶时才需要加锁,所以这里的锁竞争也不会很激烈。

central cache主要起到一个居中调度的作用,每个线程的thread cache需要内存时从central cache获取,而当thread cache的内存多了就会将内存还给central cache,其作用类似于一个中枢,因此取名为中心缓存。

page cache就负责提供以页为单位的大块内存,当central cache需要内存时就会去向page cache申请,而当page cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块。

3.2 代码部署

 主要编写了如上代码,其中

  1. ObjectPool.h  实现的是定长内存池
  2. Common.h 实现的是一些公共接口,包括管理小块对象的freeList、管理span的spanList等
  3. ThreadCache.h ThreadCache.cpp 实现的是三层缓存的第一层thread cache
  4. CentralCache.h CentralCache.cpp 实现的是三层缓存的第二层central cache
  5. PageCache.h PageCache.cpp  实现的是三层缓存的第三层page cache
  6. PageMap.h  实现的是基数树,用于代替map,实现page_ID与span地址之间的映射(radix tree避免了加锁,比map/unordered_map效率高)
  7. ConcurrentAlloc.h 提供对外的接口,ConcurrentAlloc和ConcurrentFree,类似malloc和free
  8. UnitTest.cpp 用于模块测试
  9. Benchmark.cpp 用于最终的性能测试

4. 项目实现

4.1定长内存池的实现

malloc其实就是一个通用的内存池,在什么场景下都可以使用,但这也意味着malloc在什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们可以将其性能做到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。

我们可以通过实现定长内存池来熟悉一下对简单内存池的控制,其次,这个定长内存池后面会作为高并发内存池的一个基础组件。
定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现“定长”,因此我们可以通过使用模板参数来实现“定长”,比如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。

对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理,但仅用一个指针肯定是不够的,我们还需要用一个变量来记录这块内存的长度。

其次,释放回来的定长内存块也需要被管理,我们可以将这些释放回来的定长内存块链接成一个链表,这里我们将管理释放回来的内存块的链表叫做自由链表,为了能找到这个自由链表,我们还需要一个指向自由链表的指针。

因此,定长内存池当中包含三个成员变量:

  • _memory:指向大块内存的指针。
  • _remainBytes:大块内存切分过程中剩余字节数。
  • _freeList:还回来过程中链接的自由链表的头指针。

(1)当我们申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此如果自由链表当中有内存块的话,就直接从自由链表头删一个内存块进行返回即可。如果自由链表当中没有内存块,那么我们就在大块内存中切出定长的内存块进行返回,当内存块切出后及时更新_memory指针的指向,以及_remainBytes的值即可。

(2)对于还回来的定长内存块,我们可以用自由链表将其链接起来,但我们并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。

整体代码如下:

template<class T>
class ObjectPool
{
private:
	char* _memory = nullptr; // 指向大块内存的指针
	size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数
	void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针

public:
	T* New()
	{
		T* obj = nullptr;
		// 优先把还回来内存块对象,再次重复利用
		if (_freeList)
		{
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else
		{
			// 剩余内存不够一个对象大小时,则重新开大块空间
			if (_remainBytes < sizeof(T))
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes >> 13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();//抛异常机制
				}
			}

			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize;//大块空间的指针往前移
			_remainBytes -= objSize;//剩余的大块空间的个数减少
		}

		new(obj)T;//定位new,显示调用T的构造函数初始化(为了给new出来的空间初始化)
		return obj;
	}

	void Delete(T* obj)
	{
		obj->~T();// 显示调用析构函数清理对象,这是清理对象里面的内容

		//将要删除的obj对象头插到freeList链表中
		*(void**)obj = _freeList;//obj头4个字节的位置存上freeList的地址
		_freeList = obj;
	}
};

性能对比测试:

//下面为定长内存池的性能测试代码
struct TreeNode
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;
	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};
void TestObjectPool()
{
	//(1)new
	// 申请释放的轮次
	const size_t Rounds = 5;
	// 每轮申请释放多少次
	const size_t N = 100000;
	std::vector<TreeNode*> v1;
	v1.reserve(N);
	size_t begin1 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}
	size_t end1 = clock();

	//(2)自己设计的定长内存池
	std::vector<TreeNode*> v2;
	v2.reserve(N);
	ObjectPool<TreeNode> TNPool;//自己设计的定长内存池的用法
	size_t begin2 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();

	cout << "new cost time: " << end1 - begin1 << endl;
	cout << "object pool cost time: " << end2 - begin2 << endl;//消耗的时间要少于上面的
}

 可以看到在这个过程中,定长内存池消耗的时间比malloc/free消耗的时间要短。这就是因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下定长内存池的效率更高。

4.2 thread cache 申请内存

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

因此thread cache实际就是一个数组,数组中存储的就是一个个的自由链表,至于这个数组中到底存储了多少个自由链表,就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则了。

其中对齐规则如下。但由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片

 申请内存的过程:

  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  2. 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

注意:

每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。
要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。

class ThreadCache
{
public:
	// 申请和释放内存对象
	void* Allocate(size_t size);
	void Deallocate(void* ptr, size_t size);

	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);

	// 释放对象时,链表过长时,回收内存回到中心缓存
	void ListTooLong(FreeList& list, size_t size);
private:
	FreeList _freeLists[NFREELIST];
};

// TLS thread local storage(线程局部存储、线程本地存储)
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
// thread 用于声明一个线程本地变量, _declspec(thread)的前缀是Microsoft添加给Visual C++编译器的一个修改符。
void* ThreadCache::Allocate(size_t size)
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);
	size_t index = SizeClass::Index(size);

	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);
	}
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	// 慢开始反馈调节算法
	// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
	// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
	// 3、size越大,一次向central cache要的batchNum就越小
	// 4、size越小,一次向central cache要的batchNum就越大
	size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (_freeLists[index].MaxSize() == batchNum)
	{
		_freeLists[index].MaxSize() += 1;
	}

	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum > 0);

	if (actualNum == 1)
	{
		assert(start == end);
		return start;
	}
	else
	{
		_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
		return start;
	}
}

4.3 central cache 申请内存

central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。

thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是需要加锁的。但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。

span的结构:

每个span管理的都是一个以页为单位的大块内存,且每个span中的页数不同。

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;        //大块内存起始页的页号
	size_t _n = 0;              //页的数量

	Span* _next = nullptr;      //双链表结构
	Span* _prev = nullptr;

	size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  //切好的小块内存的自由链表
};

根据起始页号及页的数量,可以计算span的起始地址和终止地址:
若页号为2,页数为n,则
span的起始地址为:start = 2 * 页大小(8192)
span的终止地址为:end = start + (页数n * 页大小)

申请内存的过程:

  1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
  2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
  3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count

central cache的结构:

central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208,但central cache每个哈希桶中存储就是我们上面定义的双链表结构。
而central cache和page cache在整个进程中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。单例模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式相对较复杂,我们这里使用饿汉模式就足够了。

thread cache向central cache中申请的对象个数如何确定呢?用慢开始反馈调节算法

当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。通过函数,可以将给出的对象个数控制到2~512个之间。但就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们可以在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1。此时当thread cache申请对象时,我们会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize的值,那么还会将thread cache中该自由链表的_maxSize的值进行加一。(这有点像网络中拥塞控制的机制)

// 单例模式
class CentralCache
{
public:
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}
	// 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t byte_size);
	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
	// 将一定数量的对象释放到span跨度
	void ReleaseListToSpans(void* start, size_t byte_size);
private:
	SpanList _spanLists[NFREELIST];
private:
	CentralCache()
	{}
	CentralCache(const CentralCache&) = delete;
	static CentralCache _sInst;
};
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
	// 查看当前的spanlist中是否有还有未分配对象的span
	Span* it = list.Begin();
	while (it != list.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
	list._mtx.unlock();

	// 走到这里说没有空闲span了,只能找page cache要
	PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	span->_isUse = true;
	span->_objSize = size;
	PageCache::GetInstance()->_pageMtx.unlock();

	// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span

	// 计算span的大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;
	char* end = start + bytes;

	// 把大块内存切成自由链表链接起来
	// 1、先切一块下来去做头,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	int i = 1;
	while (start < end)
	{
		++i;
		NextObj(tail) = start;
		tail = NextObj(tail); // tail = start;
		start += size;
	}

	NextObj(tail) = nullptr;

	// 切好span以后,需要把span挂到桶里面去的时候,再加锁
	list._mtx.lock();
	list.PushFront(span);

	return span;
}

// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();

	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span);
	assert(span->_freeList);

	// 从span中获取batchNum个对象
	// 如果不够batchNum个,有多少拿多少
	start = span->_freeList;
	end = start;
	size_t i = 0;
	size_t actualNum = 1;
	while (i < batchNum - 1 && NextObj(end) != nullptr)
	{
		end = NextObj(end);
		++i;
		++actualNum;
	}
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;
	span->_useCount += actualNum;

	 条件断点
	int j = 0;
	void* cur = start;
	while (cur)
	{
		cur = NextObj(cur);
		++j;
	}

	if (j != actualNum)
	{
		int x = 0;
	}

	_spanLists[index]._mtx.unlock();
	return actualNum;
}

4.4 page cache 申请内存

page cache也是一个哈希桶结构,但是映射关系跟之前是不一样的。是按照span中页数来分类的。page cache的每个哈希桶中里挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。

 

 申请内存:

  1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
  2. 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
  3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。

注意:

当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶就可能同时向page cache申请内存的,所以page cache也是存在线程安全问题的,因此在访问page cache时也必须要加锁。

但是在page cache这里我们不能使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小后再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。

也就是说,在访问page cache时,我们可能需要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此我们在访问page cache时使用没有使用桶锁,而是用一个大锁将整个page cache给锁住。

central cache向page cache申请span的页数如何确定?

可以根据具体所需对象的大小来决定,就像之前我们根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,现在我们是根据对象的大小计算出,central cache一次应该向page cache申请几页的内存块。

根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。

class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}
	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);
	// 释放空闲span回到Pagecache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);
	// 获取一个K页的span
	Span* NewSpan(size_t k);
	std::mutex _pageMtx;
private:
	SpanList _spanLists[NPAGES];
	ObjectPool<Span> _spanPool;
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	//std::map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
	PageCache()
	{}
	PageCache(const PageCache&) = delete;
	static PageCache _sInst;
};
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);

	// 大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		//Span* span = new Span;
		Span* span = _spanPool.New();

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		//_idSpanMap[span->_pageId] = span;
		_idSpanMap.set(span->_pageId, span);

		return span;
	}

	// 先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			//_idSpanMap[kSpan->_pageId + i] = kSpan;
			_idSpanMap.set(kSpan->_pageId + i, kSpan);
		}

		return kSpan;
	}

	// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			//Span* kSpan = new Span;
			Span* kSpan = _spanPool.New();

			// 在nSpan的头部切一个k页下来
			// k页span返回
			// nSpan再挂到对应映射的位置
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);
			// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
			// 进行的合并查找
			//_idSpanMap[nSpan->_pageId] = nSpan;
			//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
			_idSpanMap.set(nSpan->_pageId, nSpan);
			_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);

			// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				//_idSpanMap[kSpan->_pageId + i] = kSpan;
				_idSpanMap.set(kSpan->_pageId + i, kSpan);
			}

			return kSpan;
		}
	}

	// 走到这个位置就说明后面没有大页的span了
	// 这时就去找堆要一个128页的span
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

4.5 thread cache 回收内存

释放内存的过程:

  1. 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
  2. 当链表的长度过长,则回收一部分内存对象到central cache。
void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	// 找对映射的自由链表桶,对象插入进入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	// 当链表长度大于一次批量申请的内存时就开始还一段list给central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}
}

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	list.PopRange(start, end, list.MaxSize());

	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

4.6 central cache 回收内存

释放内存:

  1. 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时use_count--。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

如何根据对象的地址得到对象所在的页号?

当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。但是需要注意的是,还给central cache的这些对象不一定都是属于同一个span的。central cache中的每个哈希桶当中可能都不止一个span,因此当我们计算出还回来的对象应该还给central cache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。

某个页当中的所有地址除以页的大小都等该页的页号。比如我们这里假设一页的大小是100,那么地址0~99都属于第0页,它们除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1。

如何找到一个对象对应的span?

一个span管理的可能是多个页。为了解决这个问题,我们可以建立页号和span之间的映射。由于这个映射关系在page cache进行span的合并时也需要用到,因此我们直接将其存放到page cache里面。可以先用C++当中的unordered_map进行实现,后续需要优化。

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();
	while (start)
	{
		void* next = NextObj(start);

		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		NextObj(start) = span->_freeList;
		span->_freeList = start;
		span->_useCount--;

		// 说明span的切分出去的所有小块内存都回来了
		// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
		if (span->_useCount == 0)
		{
			_spanLists[index].Erase(span);
			span->_freeList = nullptr;
			span->_next = nullptr;
			span->_prev = nullptr;

			// 释放span给page cache时,使用page cache的锁就可以了
			// 这时把桶锁解掉
			_spanLists[index]._mtx.unlock();

			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanLists[index]._mtx.lock();
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock();
}

4.7 page cache 回收内存

释放内存:

  1. 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片

span的前后合并

合并的过程可以分为向前合并和向后合并。因此page cache在合并span时,是需要通过页号获取到对应的span的,这就是我们要把页号与span之间的映射关系存储到page cache的原因。但需要注意的是,当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span当中的对象正在被其他线程使用。鉴于此,我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时我们默认该span是没有被使用的。

由于在合并page cache当中的span时,需要通过页号找到其对应的span,而一个span是在被分配给central cache时,才建立的各个页号与span之间的映射关系,因此page cache当中的span也需要建立页号与span之间的映射关系。与central cache中的span不同的是,在page cache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。

需要注意的是,在向前或向后进行合并的过程中:

  • 如果没有通过页号获取到其对应的span,说明对应到该页的内存块还未申请,此时需要停止合并。
  • 如果通过页号获取到了其对应的span,但该span处于被使用的状态,那我们也必须停止合并。
  • 如果合并后大于128页则不能进行本次合并,因为page cache无法对大于128页的span进行管理。
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	// 大于128 page的直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);

		return;
	}

	// 对span前后的页,尝试进行合并,缓解内存碎片问题
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		//auto ret = _idSpanMap.find(prevId);
		 前面的页号没有,不合并了
		//if (ret == _idSpanMap.end())
		//{
		//	break;
		//}
		auto ret = (Span*)_idSpanMap.get(prevId);
		if (ret == nullptr)
		{
			break;
		}
		// 前面相邻页的span在使用,不合并了
		Span* prevSpan = ret;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		// 合并出超过128页的span没办法管理,不合并了
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;
		_spanLists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);
	}

	// 向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		/*auto ret = _idSpanMap.find(nextId);
		if (ret == _idSpanMap.end())
		{
			break;
		}*/
		auto ret = (Span*)_idSpanMap.get(nextId);
		if (ret == nullptr)
		{
			break;
		}
		Span* nextSpan = ret;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		span->_n += nextSpan->_n;
		_spanLists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}

	_spanLists[span->_n].PushFront(span);
	span->_isUse = false;
	//_idSpanMap[span->_pageId] = span;
	//_idSpanMap[span->_pageId+span->_n-1] = span;
	_idSpanMap.set(span->_pageId, span);
	_idSpanMap.set(span->_pageId + span->_n - 1, span);
}

4.8 代码优化

4.8.1 大于256KB的大块内存申请问题

4.8.2 使用定长内存池配合脱离使用new

        tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不能调用malloc函数的,我们当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc。

  为了完全脱离掉malloc函数,此时我们之前实现的定长内存池就起作用了,代码中使用new时基本都是为Span结构的对象申请空间,而span对象基本都是在page cache层创建的,因此我们可以在PageCache类当中定义一个_spanPool,用于span对象的申请和释放。

4.8.3 释放对象时优化为不传对象大小

        当我们使用malloc函数申请内存时,需要指明申请内存的大小;而当我们使用free函数释放内存时,只需要传入指向这块内存的指针即可。

  而我们目前实现的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小。如果我们也想做到,在释放对象时不用传入对象的大小,那么我们就需要建立对象地址与对象大小之间的映射。由于现在可以通过对象的地址找到其对应的span,而span的自由链表中挂的都是相同大小的对象。因此我们可以在Span结构中再增加一个_objSize成员,该成员代表着这个span管理的内存块被切成的一个个对象的大小。

4.8.4 多线程环境下对比malloc测试

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

此时我们发现,本项目实现的内存池比原生的malloc和free的效率要低不少。通过在VS编译器中带有的性能分析的工具的分析,我们发现,原因主要是MapObjectToSpan函数中的锁导致了性能低下。

因此当前项目的瓶颈点就在锁竞争上面,需要解决调用MapObjectToSpan函数访问映射关系时的加锁问题。tcmalloc当中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁。

基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中在以该页号为下标的位置。

优化以后性能要高于malloc。

4.9 打包成静态库

实际Google开源的tcmalloc是会直接用于替换malloc的,不同平台替换的方式不同。比如基于Unix的系统上的glibc,使用了weak alias的方式替换;而对于某些其他平台,需要使用hook的钩子技术来做。

对于我们当前实现的项目,可以考虑将其打包成静态库或动态库。

5. 项目总结

下面记录了在本项目的过程中,遇到的问题以及是如何解决的?

1. Windows下如何直接申请大块内存?
直接向堆申请内存空间,在Windows下,可以调用VirtualAlloc函数;
在Linux下,可以调用brk或mmap函数。

2. 自由链表结构的设计?
并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。
那么是否需要采用条件编译来区分32位机器和64位机器呢?

不需要。我们得知道,32位平台下指针的大小是4个字节,64位平台下指针的大小是8个字节。而指针指向数据的类型,决定了指针解引用后能向后访问的空间大小,因此我们这里需要的是一个指向指针的指针,这里使用二级指针就行了。
当我们需要访问一个内存块的前4/8个字节时,我们就可以先该内存块的地址先强转为二级指针,由于二级指针存储的是一级指针的地址,二级指针解引用能向后访问一个指针的大小,因此在32位平台下访问的就是4个字节,在64位平台下访问的就是8个字节,此时我们访问到了该内存块的前4/8个字节。

void*& NextObj(void* ptr)
{
	return (*(void**)ptr);
}

3. 哈希桶的映射规则?

thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。

让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。

但如果所有的字节数都按照8字节进行对齐的话,那么我们就需要建立256 × 1024 ÷ 8 = 32768 个桶,这个数量还是比较多的,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐(具体见上文),可以将空间浪费率控制到百分之十左右。

4. 慢开始反馈调节算法

当thread cache向central cache申请内存时,central cache应该给出多少个对象呢?如果central cache给的太少,那么thread cache在短时间内用完了又会来申请;但如果一次性给的太多了,可能thread cache用不完也就浪费了。如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。

但就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们可以在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1。此时当thread cache申请对象时,我们会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize的值,那么还会将thread cache中该自由链表的_maxSize的值进行加一。

5. 根据小块对象地址-》页号-》span

(1)通过地址除以页大小等于页号。某个页当中的所有地址除以页的大小都等该页的页号。比如我们这里假设一页的大小是100,那么地址0~99都属于第0页,它们除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1。

(2)采用map建立页号和span之间的映射,但是访问时需要频繁加锁,使用radix tree优化,避免了加锁访问。

6. span回收的时候,不单单是回收span插入到pageCache的桶的链表中去

(1)需要进行前后合并(寻找其他空闲的span),有助于减少外部碎片。

(2)内部碎片是由于对齐数决的,无法避免。

7. 大于256KB的对象内存的申请

对于大于256KB的内存,我们可以考虑直接向page cache申请,但page cache中最大的页也就只有128页,因此如果是大于128页的内存申请,就只能直接向堆申请了。

8. 配合定长内存池来脱离使用new

tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不能调用malloc函数的,我们当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc。

为了完全脱离掉malloc函数,此时我们之前实现的定长内存池就起作用了,代码中使用new时基本都是为Span结构的对象申请空间,而span对象基本都是在page cache层创建的,因此我们可以在PageCache类当中定义一个_spanPool,用于span对象的申请和释放。

9. 释放对象时传入对象的大小

目前实现的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小。原因是:

  • 如果释放的是大于256KB的对象,需要根据对象的大小来判断这块内存到底应该还给page cache,还是应该直接还给堆。
  • 如果释放的是小于等于256KB的对象,需要根据对象的大小计算出应该还给thread cache的哪一个哈希桶

要想做到在释放对象时不用传入对象的大小,那么我们就需要建立对象地址与对象大小之间的映射。因此我们可以在Span结构中再增加一个_objSize成员,该成员代表着这个span管理的内存块被切成的一个个对象的大小。

10. TLS机制优化

每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?我们不能将这个thread cache创建为全局的(前提时threadCache是单例模式),因为全局变量是所有线程共享的,这样就需要锁来控制,增加了代码复杂度,是程序运行效率降低。

要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。

相关文章:

  • 173.CI/CD(一):gitlab配置,jenkins的安装配置,jenkins实现基础的CI/CD,Sonarqube代码质量检测,Harbor镜像仓库
  • FastAPI 学习之路(二十七)安全校验
  • office32位和64位有什么区别
  • 猿创征文|python求解四位数 青少年编程电子学会python编程等级考试三级真题解析2021年03月
  • 当事人胜诉后,所预交受理费是否应予退还
  • uniapp的拨打电话,下拉和上划
  • 如何确定 RMAN 的多路复用级别
  • Java excel poi 读取已有文件 ,动态插入一列数据
  • Java项目:SSM企业工资管理系统
  • Python基于Django的汽车销售网站
  • 花好月圆时,邀你一起来读诗!
  • 在Slicer中添加点、直线和曲线
  • Win7下安装Docker(虚拟机win7)
  • JavaWeb对于JSP内置对象及其作用域的深入理解与运用
  • HarmonyOS(鸿蒙系统)物联网开发教程——环境搭建
  • 收藏网友的 源程序下载网
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • 2019年如何成为全栈工程师?
  • Angular4 模板式表单用法以及验证
  • css的样式优先级
  • GraphQL学习过程应该是这样的
  • Java 9 被无情抛弃,Java 8 直接升级到 Java 10!!
  • MQ框架的比较
  • MYSQL 的 IF 函数
  • mysql中InnoDB引擎中页的概念
  • Promise面试题,控制异步流程
  • react 代码优化(一) ——事件处理
  • Redis的resp协议
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • Vue 重置组件到初始状态
  • 不用申请服务号就可以开发微信支付/支付宝/QQ钱包支付!附:直接可用的代码+demo...
  • 大数据与云计算学习:数据分析(二)
  • 观察者模式实现非直接耦合
  • 警报:线上事故之CountDownLatch的威力
  • 面试遇到的一些题
  • 使用agvtool更改app version/build
  • 一文看透浏览器架构
  • gunicorn工作原理
  • ​​​​​​​sokit v1.3抓手机应用socket数据包: Socket是传输控制层协议,WebSocket是应用层协议。
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • #控制台大学课堂点名问题_课堂随机点名
  • #我与虚拟机的故事#连载20:周志明虚拟机第 3 版:到底值不值得买?
  • $NOIp2018$劝退记
  • %check_box% in rails :coditions={:has_many , :through}
  • (13)[Xamarin.Android] 不同分辨率下的图片使用概论
  • (3)选择元素——(17)练习(Exercises)
  • (4)(4.6) Triducer
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (八)五种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (八十八)VFL语言初步 - 实现布局
  • (附源码)springboot家庭装修管理系统 毕业设计 613205
  • (规划)24届春招和25届暑假实习路线准备规划
  • (论文阅读40-45)图像描述1
  • (三)c52学习之旅-点亮LED灯
  • (转)Google的Objective-C编码规范