当前位置: 首页 > news >正文

go-redis源码解析:连接池原理

1. 执行命令的入口方法

redis也是通过hook执行命令,initHooks时,会将redis的hook放在第一个

img

通过hook调用到process方法,process方法内部再调用_process

img

2. 线程池初始化

redis在新建单客户端、sentinel客户端、cluster客户端等,都会newConnPool初始化线程池

2.1.1. NewClient方式初始化连接池

// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {opt.init()c := Client{baseClient: &baseClient{opt: opt,},}c.init()// 初始化线程池c.connPool = newConnPool(opt, c.dialHook)return &c
}

2.1.2. NewFailoverClient方式初始化连接池

// NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple
// goroutines.
// zhmark 2024/6/13 NewFailoverClient
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {if failoverOpt.RouteByLatency {panic("to route commands by latency, use NewFailoverClusterClient")}if failoverOpt.RouteRandomly {panic("to route commands randomly, use NewFailoverClusterClient")}sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))copy(sentinelAddrs, failoverOpt.SentinelAddrs)// todo:2024/6/26 有问题,每次都是换成1、3、2// 将 sentinelAddrs 切片中的元素顺序随机打乱,实现随机化效果rand.Shuffle(len(sentinelAddrs), func(i, j int) {//交换 sentinelAddrs 中第 i 个和第 j 个元素sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]})failover := &sentinelFailover{opt:           failoverOpt,sentinelAddrs: sentinelAddrs,}opt := failoverOpt.clientOptions()// 初始化赋值连接建立函数opt.Dialer = masterReplicaDialer(failover)opt.init()var connPool *pool.ConnPoolrdb := &Client{baseClient: &baseClient{opt: opt,},}rdb.init()
// 初始化线程池connPool = newConnPool(opt, rdb.dialHook)rdb.connPool = connPoolrdb.onClose = failover.Closefailover.mu.Lock()// 关闭老的有问题的地址连接//如:发现新读取的主节点地址和本地保存的不一样,将之前和老的主节点连接断开// addr是新的master地址failover.onFailover = func(ctx context.Context, addr string) {_ = connPool.Filter(func(cn *pool.Conn) bool {// 如果连接的远程地址与 addr 不同,则返回 true,表示要关闭此连接;否则返回 false,表示保留该连接return cn.RemoteAddr().String() != addr})}failover.mu.Unlock()return rdb
}

2.1. NewClusterClient方式初始化线程池

cluster模式和上面的NewClient、NewFailoverClient不一样。cluster模式new的时候不会初始化连接池,而是等执行命令时,获取所有节点,每个节点新建一个redisClient,每个client单独一个连接池

2.1.1. 初始化NewClusterClient时不会新建连接池

// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {// 初始化opt,其中会初始化NewClient方法opt.init()c := &ClusterClient{opt:   opt,nodes: newClusterNodes(opt),}// 获取所有主从节点信息,并保存在本地c.state = newClusterStateHolder(c.loadState)// 保存命令详情c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)c.cmdable = c.Processc.initHooks(hooks{dial:       nil,process:    c._process,pipeline:   c.processPipeline,txPipeline: c.processTxPipeline,})return c
}

2.1.2. 执行命令时,通过cmdNode执行到NewClient,初始化线程池

img

通过clOpt的NewClient方法,初始化client,进而初始化线程池

img

2.1.3. 然而clOpt的NewClient方法什么时候初始化赋值的呢

在NewClusterClient方法的opt.init()中

img

img

3. 如何新建连接

总览图

img

3.1.1. 第一次执行命令时,go-redis会先通过cmdNode方法,获取所有的节点信息

img

3.1.2. 底层调用到ClusterSlots方法,触发redis.go中_process方法,内部调用_withConn方法,通过getConn方法获取可用连接

img

3.1.3. getConn方法内部发现无可用连接,则会调用newConn

3.1.4. newConn内部,调用连接池的dialConn方法触发调用

img

3.1.5. dialConn调用配置项的Dialer方法

img

3.1.6. p.cfg.Dialer在newConnPool时候初始化的,通过Dialer方法,触发dialer

img

3.1.7. 而dialer是newClient时传入的dialhook,至此直接触发了dialhook

img

3.1.8. sentinel模式也是在NewFailoverClient时传入的dialhook

img

3.1.9. redis自己的dialHook内部,执行的是opt的Dialer方法

img

3.1.10. 此Dialer方法是在NewClient中opt.init()初始化方法中赋值的,如果没有自定义,就用默认的建连方法

img

3.1.11. 默认的建连方法很简单,调用go底层的net建立连接

img

3.1.12. sentinel模式不一样,NewFailoverClient方法有自定义建连方法

img

3.1.13. 里面实现了读写分离

img

4. 闲置连接如何关闭

看是否有配置MinIdleConnsMaxIdleConns。如果有配置了MinIdleConns,那么在NewConnPool、popIdle、removeConn时,都会调用checkMinIdleConns补充创建最低闲置连接数

// Minimum number of idle connections which is useful when establishing
// new connection is slow.
// Default is 0. the idle connections are not closed by default.
MinIdleConns int
// Maximum number of idle connections.
// Default is 0. the idle connections are not closed by default.
MaxIdleConns int

img

每次执行完方法,会释放连接

img

img

5. 如何控制闲置连接数大小

6. 如何控制总连接数

poolSize:控制最大并发量

turn可能为0,闲置连接数为最大poolSize

img

img

img

7. 如何保持连接池内的连接健康

每次Get连接时,会检查连接是否健康

func (p *ConnPool) Get(ctx context.Context, isReadCmd bool) (*Conn, error) {if p.closed() {return nil, ErrClosed}// 排队if err := p.waitTurn(ctx); err != nil {return nil, err}for {p.connsMu.Lock()// 获取一个可用的连接cn, err := p.popIdle(isReadCmd)p.connsMu.Unlock()if err != nil {p.freeTurn()return nil, err}if cn == nil {break}// 读请求走replica,只是多一层保护if p.cfg.ReadMode == _const.READ_MODE_REPLICA {if isReadCmd && cn.remoteType != REMOTE_TYPE_REPLICA {continue}// 写请求不走replicaif !isReadCmd && cn.remoteType == REMOTE_TYPE_REPLICA {continue}}if !p.isHealthyConn(cn) {_ = p.CloseConn(cn)continue}atomic.AddUint32(&p.stats.Hits, 1)return cn, nil}atomic.AddUint32(&p.stats.Misses, 1)// zhmark 2024/6/18 如果连接池里没有可用的连接,那么新建连接newcn, err := p.newConn(ctx, true, isReadCmd)if err != nil {p.freeTurn()return nil, err}return newcn, nil
}

img

7.1. isHealthyConn内方法解析

// zhmark 2024/7/8 连接关键检查,维护连接池连接健康
func (p *ConnPool) isHealthyConn(cn *Conn) bool {now := time.Now()// ConnMaxLifetime 默认为0if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime {return false}// ConnMaxIdleTime Default is 30 minutes. -1 disables idle timeout checkif p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {return false}if connCheck(cn.netConn) != nil {return false}cn.SetUsedAt(now)return true
}

7.1.1. 连接使用时长检验

    1. ConnMaxLifetime默认为0,如果配置了ConnMaxLifetime,那么如果当前时间离连接创建时间超过ConnMaxLifetime,则会判定连接为不健康,进而关闭连接

7.1.2. 连接空闲时长检验

    1. ConnMaxIdleTime,默认为30分钟,如果连接超过ConnMaxIdleTime时间未使用,则会判定连接为不健康

7.1.3. 检查底层网络连接状态

func connCheck(conn net.Conn) error {// Reset previous timeout._ = conn.SetDeadline(time.Time{})sysConn, ok := conn.(syscall.Conn)if !ok {return nil}rawConn, err := sysConn.SyscallConn()if err != nil {return err}var sysErr errorif err := rawConn.Read(func(fd uintptr) bool {var buf [1]byten, err := syscall.Read(int(fd), buf[:])switch {case n == 0 && err == nil:sysErr = io.EOFcase n > 0:sysErr = errUnexpectedReadcase err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:sysErr = nildefault:sysErr = err}return true}); err != nil {return err}return sysErr
}

8. 如何实时监控连接池状态

PoolStats

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 请编写函数,判断一字符串是否是回文,若是回文函数返回值为1,否则返回值为0,回文是顺读和倒读都一样的字符串
  • 代码随想录算法训练营第30天 | 第八章 贪心算法05
  • kubernetes集群证书过期问题解决
  • 【二】Ubuntu24虚拟机在Mac OS的VMware Fusion下无法联网问题
  • C#用链表和数组分别实现堆栈
  • AE-图层
  • 数据泄露时代的安全之道:访问认证的重要性
  • [leetcode hot 150]第二十三题,合并K个升序链表
  • SMU Summer 2024 Contest Round 2
  • Redis 客户端命令大全
  • 力扣第一题
  • c++入门基础篇(上)
  • Day65 代码随想录打卡|回溯算法篇---组合总和II
  • 53-3 内网代理5 - frp搭建二级代理
  • Pytest中的钩子函数
  • (三)从jvm层面了解线程的启动和停止
  • 【笔记】你不知道的JS读书笔记——Promise
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • JavaScript标准库系列——Math对象和Date对象(二)
  • Js基础——数据类型之Null和Undefined
  • magento 货币换算
  • Markdown 语法简单说明
  • MySQL的数据类型
  • 产品三维模型在线预览
  • 名企6年Java程序员的工作总结,写给在迷茫中的你!
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 我的面试准备过程--容器(更新中)
  • 我有几个粽子,和一个故事
  • 系统认识JavaScript正则表达式
  • 3月7日云栖精选夜读 | RSA 2019安全大会:企业资产管理成行业新风向标,云上安全占绝对优势 ...
  • ionic异常记录
  • ​​​【收录 Hello 算法】9.4 小结
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • ​虚拟化系列介绍(十)
  • # Apache SeaTunnel 究竟是什么?
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (el-Date-Picker)操作(不使用 ts):Element-plus 中 DatePicker 组件的使用及输出想要日期格式需求的解决过程
  • (poj1.3.2)1791(构造法模拟)
  • (TOJ2804)Even? Odd?
  • (深入.Net平台的软件系统分层开发).第一章.上机练习.20170424
  • (实战篇)如何缓存数据
  • (学习日记)2024.02.29:UCOSIII第二节
  • (一)VirtualBox安装增强功能
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • (译) 函数式 JS #1:简介
  • (自用)网络编程
  • ****三次握手和四次挥手
  • *上位机的定义
  • .apk文件,IIS不支持下载解决
  • .a文件和.so文件
  • .net 7和core版 SignalR
  • .NET 解决重复提交问题
  • .net 写了一个支持重试、熔断和超时策略的 HttpClient 实例池
  • .NET/C# 获取一个正在运行的进程的命令行参数