当前位置: 首页 > news >正文

golang工程组件——redigo使用(redis协议,基本命令,管道,事务,发布订阅,stream)

redisgo

redis 与 client 之间采用请求回应模式,一个请求包对应一个回应包;但是也有例外,pub/sub 模 式下,client 发送 subscribe 命令并收到回应包后,之后被动接收 redis 的发布包;所以若需要使 用 pub/sub 模式,那么需要在 client 下创建额外一条连接与 redis 交互;

在这里插入图片描述

Redis 协议图

在这里插入图片描述

redis 协议采用特殊字符( \r\n )来分割有意义的数据,redis 使用的是二进制安全字符串(用长 度来标识字符串的长度),所以 redis 可以用来存储二进制数据(比如经过压缩算法加密的数 据);

例如

set key val
# "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\nval\r\n"

命令执行

Do(commandName string, args ...interface{}) (reply interface{}, err error)

redisgo参数转换

Redis TypeGo Type
errorredis.Error
integerint64
simple stringstring
bulk string[]byte or nil if value not present
array[]interface{} or nil if value not present

参数转换处理

  • 单个string[]byte直接传递;
  • intfloat需要转成 string
  • 也可用redis.Args类型来处理,提供了方法 Add 添加单个元素;提供了方法 AddFlat添加了 对 map[interface{}]interface{}以及结构体的处理;

返回值转换处理

redisgo返回值有多种

  • 接收单个返回值处理
  • 接收多个不同类型的返回值处理

redis.Values+redis.Scan

  • 接收单个结构体

redis.Values+redis.ScanStruct

  • 接收多个结构体

redis.Values+redis.ScanSlice

案例

链接认证

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}}

set,get,list操作

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}if false {conn.Do("set", "test_hello", 1)rpy, err := redis.Int(conn.Do("get", "test_hello"))if err != nil {panic(err)}fmt.Println(rpy, reflect.TypeOf(rpy))}if false {args := redis.Args{"test_list"}.Add("test1").Add("test2")conn.Do("lpush", args...)res, _ := redis.Strings(conn.Do("lrange", "test_list", 0, -1))fmt.Println(res, reflect.TypeOf(res))}if false {//conn.Do("del", "test_list")// 阻塞popvals, err := redis.Strings(conn.Do("brpop", redis.Args{}.Add("test_list").Add(20)...))if err != redis.ErrNil {fmt.Println(vals, err)}}if false {// 按map形式返回vals, err := redis.StringMap(conn.Do("brpop", redis.Args{}.Add("test_list").Add(20)...))if err != redis.ErrNil {fmt.Println(vals, err)}}
}

变量读取

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}// 取元素到对应的变量上if false { // redis.Values + redis.Scanconn.Do("del", "list")conn.Do("lpush", "list", "aaabb", 100)vals, _ := redis.Values(conn.Do("lrange", "list", 0, -1))var name stringvar score intredis.Scan(vals, &score, &name)fmt.Println(name, score)}}

结构体读取

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}if true {var p1, p2 struct {Name string `redis:"name"`Age  string `redis:"age"`Sex  string `redis:"sex"`}p1.Age = "18"p1.Name = "chaochaoyu"p1.Sex = "male"// age 18 name mark sex maleargs1 := redis.Args{}.Add("role:10001").AddFlat(&p1)if _, err := conn.Do("hmset", args1...); err != nil {fmt.Println(err)return}m := map[string]string{"name": "quxiansen","age":  "20","sex":  "female",}args2 := redis.Args{}.Add("role:10002").AddFlat(m)if _, err := conn.Do("hmset", args2...); err != nil {fmt.Println(err)return}for _, id := range []string{"role:10001", "role:10002"} {v, err := redis.Values(conn.Do("HGETALL", id))if err != nil {fmt.Println(err)return}if err := redis.ScanStruct(v, &p2); err != nil {fmt.Println(err)return}fmt.Printf("%+v\n", p2)}}

reidsgo—管道以及事务

管道

redis pipeline 是一个客户端提供的,而不是服务端提供的;一次发送多条命令,减少与 redis-server 之间的网络交互;

在这里插入图片描述

type Conn interface {// Close closes the connection.Close() error// Err returns a non-nil value when the connection is not usable.Err() error// Do sends a command to the server and returns the received reply.Do(commandName string, args ...interface{}) (reply interface{}, errerror)// Do = Send + Flush + Receive// Send writes the command to the client's output buffer.Send(commandName string, args ...interface{}) error// Flush flushes the output buffer to the Redis server.Flush() error// Receive receives a single reply from the Redis serverReceive() (reply interface{}, err error)
}
代码使用
// 批量发送,批量接收
c.Send(cmd1, ...)
c.Send(cmd2, ...)
c.Send(cmd3, ...)
c.Flush() // 将上面的三个命令发送出去
c.Receive() // cmd1 的返回值
c.Receive() // cmd2 的返回值
c.Receive() // cmd3 的返回值
// 如果不需要关注返回值
c.Send(cmd1, ...)
c.Send(cmd2, ...)
c.Send(cmd3, ...)
c.Do("")
// 如果只关注最后一个命令的返回值
c.Send(cmd1, ...)
c.Send(cmd2, ...)
c.Do(cmd3, ...)
reids 网络事件处理

r在这里插入图片描述
edis 是单线程处理逻辑;网络事件处理以及命令处理都是在这个线程当中进行的; 每条连接都对应着一个读缓冲区,线程需要轮询每条连接,从连接的读缓冲区中分割出一个个有意义的数据包,每条连接的读缓冲区相当于一个队列;线程会交错执行活跃连接的命令

客户端批量发送测试
package mainimport ("fmt""github.com/garyburd/redigo/redis""math/rand""time"
)func main() {c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")c.Close()})()///paas.237// 密码认证if _, authErr := c.Do("AUTH", "paas.237"); authErr != nil {fmt.Println("Redis auth error", authErr)return}if false {c.Send("del", "set", "list", "zset")c.Send("sadd", "set", "aa", "bb", "cc")c.Send("lpush", "list", 10001, 10002, 10003)c.Send("smembers", "set")c.Send("lrange", "list", 0, -1)c.Flush()c.Receive() // delc.Receive() // saddc.Receive() // lpushmbrs, err := redis.Strings(c.Receive()) // smembersif err != redis.ErrNil {fmt.Println(mbrs)}lsts, err := redis.Ints(c.Receive()) // lrangeif err != redis.ErrNil {fmt.Println(lsts)}}if false {c.Send("del", "set", "list", "zset")c.Send("sadd", "set", "aa", "bb", "cc")c.Send("lpush", "list", 10001, 10002, 10003)// do里面有flush和所有receivec.Do("")}if true {rand.Seed(time.Now().UnixNano())c.Send("del", "set", "list", "zset")c.Send("sadd", "set", "aa", "bb", "cc"){args := redis.Args{}.Add("zset")args = args.Add(rand.Intn(100)).Add("xiaoming")args = args.Add(rand.Intn(100)).Add("xiaohong")args = args.Add(rand.Intn(100)).Add("xiaohuang")c.Send("zadd", args...)}{args := redis.Args{}.Add("zset")args = args.Add(0).Add(-1).Add("withscores")// 只关注最后一个的返回值vals, err := redis.Values(c.Do("zrange", args...))fmt.Printf("vals:%v\n", vals)if err != nil {panic(err)}//返回值是反过来的,name要放前面var rets []struct {Name  stringScore int}if err = redis.ScanSlice(vals, &rets); err != nil {panic(err)}fmt.Println(rets)}}
}

事务

虽然redis是单线程,对于一条链接请求队列是线性执行的,如果有多条链接,那么redis线程执行命令的顺序是根据各个队列中先后顺序来的。如果要实现事务,需要保证执行事务包含的命令中间时不插入别的会干扰需要操作数据的命令。

redis事务操作

MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;

MULTI

开启事务

EXEC

提交事务

DISCARD

取消事务

WATCH

检测key的变动,若在事务请求时,key变动则取消事务;在事务开启前调用,乐观锁实现(cas); 若被取消则事务返回 nil ;

WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC
reids ACID特性
  • 原子性:事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis 不支持回滚;即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直 到将事务队列中的所有命令都执行完毕为止。
  • 一致性:事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一致性(有命令出错后续命令也会执行)而不是异常后的一致性;所以redis也不满足;
  • 隔离性:事务的操作不被其他用户操作所打断;redis命令执行是串行的,redis事务天然具备隔 离性;
  • 持久性:redis只有在 aof 持久化策略的时候,并且需要在 redis.conf 中 appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

lua实现原子性操作

lua 脚本实现原子性;

redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当 某个脚本正在执行的时候,不会有其他命令或者脚本被执行;

lua 脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local key = KEYS[1];local s = redis.call("get",key);redis.call("set", key, s*2);return s*2'
"8f7d021dcc386a422e0febe38befdc6084357610"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "8f7d021dcc386a422e0febe38befdc6084357610"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.
执行lua脚本
EVAL
# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]
EVALSHA
# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

eg:

script load 'local key = KEYS[1];local s = redis.call("get",key);redis.call("set", key, s*2);return s*2'
"dbb7d4a2a615df353820f35ffa710a45fa1c4ec0"
set score 100
# 有一个参数
evalsha dbb7d4a2a615df353820f35ffa710a45fa1c4ec0 1 score
应用
# 1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load);
# 2: 项目中若需要热更新,通过redis-cli script flush;然后可以通过订阅发布功能通知所有服务器重新加载lua脚本;
# 3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

事务建议用lua脚本

go操作reids执行lua脚本

double.lua

local key = KEYS[1]
local default = ARGV[1]
if redis.call("exists", key) == 0 thenredis.call("set", key, default)
end
local val = redis.call("get", key)
redis.call("set", key, val*2)
return val*2

script.go

package mainimport ("fmt""github.com/garyburd/redigo/redis""io/ioutil"
)func main() {c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")c.Close()})()///ssr-paas.237// 密码认证if _, authErr := c.Do("AUTH", "sss-paas.237"); authErr != nil {fmt.Println("Redis auth error", authErr)return}var data []bytedata, err = ioutil.ReadFile("script/double.lua")if err != nil {fmt.Println("load double.lua error")return}// 设置一个参数script := redis.NewScript(1, string(data))script.Load(c)if true {c.Send("set", "score", 1000)rpy, _ := redis.Int(script.Do(c, "score"))fmt.Println(rpy)}if true {rpy, _ := redis.Int(script.Do(c, "bbb", 500))fmt.Println(rpy)}}

发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块;

生产者生产一次消息,由redis负责将消息复制到多个消息队列中,每个消息队列由相应的消费者进行消费;

它是分布式系统中常用的一种解耦方式,用于将多个消费者的逻辑进行拆分。多个消费者的逻辑就可以放到不同的子系统中完成;

在这里插入图片描述

# 订阅频道
subscribe 频道
psubscribe new.car
# 订阅模式频道
psubscribe 频道
psubscribe new.*
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容
subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'aaa bbb ccc'

go订阅频道实现

package mainimport ("fmt""github.com/garyburd/redigo/redis"
)func main() {sp, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))if err != nil {return}defer sp.Close()spc := redis.PubSubConn{Conn: sp}if false {// c.Do  =  c.Send + c.Flush + c.Receivespc.Subscribe("news.it")  // send + flushfor {switch v := spc.Receive().(type) {case redis.Message:fmt.Printf("%s: message: %s\n", v.Channel, v.Data)case redis.Subscription: // 是否注册成功的消息fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)case error:return}}}if false {// it showbz carsspc.PSubscribe("news.*")for {switch v := spc.Receive().(type) {case redis.PMessage:fmt.Printf("%s: pmessage: %s\n", v.Channel, v.Data)case redis.Subscription: // 是否注册成功的消息fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)case error:return}}}
}

注意

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接用于处理发布订阅;

在这里插入图片描述

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

stream

*在这里插入图片描述
多播可持久化队列。

  • 一个消息链表将加入的消息都串起来,每个消息都有一个唯一的消息ID和对应的内容;消息都是持久化的,redis 重启后,内容还在;

  • 每个 stream 对象通过一个 key 来唯一索引;每个 stream 都可以挂多个消费组(consumergroup),每个消费组会有个游标 last_delivered_id 在 stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。

  • stream 在第一次使用 xadd 命令后自动创建;而消费组不会自动创建,需要通过命令 xgroup create 进行创建,并且需要指定从 stream 的某个消息 ID 开始消费;

  • 每个消费组都是相互独立的,互相不受影响;也就是同一份 stream 内部的消息会被每个消费组都消费到;

  • 同一个消费组可以挂接多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标往前移动;

  • 消费者内部会有一个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。当客户端 ack 一条消息后,pending_ids 将会删除该消息 ID;它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没有被处理;

命令
# 向 stream 追加消息, ID可以填* 让redis生成全局唯一
XADD key ID field string [field string ...]
# 从 stream 中删除消息
XDEL key ID [ID ...]
# 获取 stream 中消息列表,会自动过滤已经删除的消息
XRANGE key start end [COUNT count]
# 获取 stream 消息长度
XLEN key
# 删除 stream 中所有消息
DEL key
# 独立消费
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 创建消费者
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
# 消费消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# > 意味着消费者希望只接收从未发送给任何其他消费者的消息。最新的消息
# 任意其他id 发送待处理的消息
# 确认消费消息
XACK key group ID [ID ...]
#创建stream_test stream 并由redis生成id
XADD stream_test * message "hello word"
"1626511386012-o"
#创建消费组g1 从0开始消费
XGROUP CREATE stream g1 0-0XLEN stream
(integer) 1
xrange stream - +#生成一个消费者,消费一个
xreadgroup group g1 consumer1 count 1 streams stream_test 0 # > 是获取最新消息, 0代表是任意ID# ACK该消息
xack stream_test g1 "1626511386012-o"

相关文章:

  • Spring-AOP不生效之内部方法调用
  • 【Android】画面卡顿优化列表流畅度一
  • uboot - 驱动开发 - dw watchdog
  • 无梯度强化学习:使用遗传算法进化代理
  • 【服务配置文件详解】补充rsyslog服务的配置文件翻译解读
  • 【hcie-cloud】【5】华为云Stack规划设计之华为云Stack标准化配置、缩略语【下】
  • 前端Vue 结合xlxs库实现解析excel文件,并动态组装表头!
  • CMakeFiles文件夹有什么用
  • Xmake v2.8.5 发布,支持链接排序和单元测试
  • 5G毫米波通信中的关键技术
  • 3D物理模拟和视觉特效软件SideFX Houdini mac中文介绍
  • 什么是记忆能力与泛化能力
  • Mac电脑Visio文件编辑查看软件推荐Visio Viewer for Mac
  • C++多态特性
  • 揭秘南卡开放式耳机创新黑科技,核心技术剑指用户痛点
  • #Java异常处理
  • 【技术性】Search知识
  • Apache Spark Streaming 使用实例
  • ESLint简单操作
  • GraphQL学习过程应该是这样的
  • httpie使用详解
  • Java 23种设计模式 之单例模式 7种实现方式
  • java 多线程基础, 我觉得还是有必要看看的
  • JavaScript设计模式之工厂模式
  • java小心机(3)| 浅析finalize()
  • Mysql数据库的条件查询语句
  • OSS Web直传 (文件图片)
  • RxJS 实现摩斯密码(Morse) 【内附脑图】
  • 机器学习中为什么要做归一化normalization
  • 前端工程化(Gulp、Webpack)-webpack
  • 设计模式(12)迭代器模式(讲解+应用)
  • 应用生命周期终极 DevOps 工具包
  • 用Node EJS写一个爬虫脚本每天定时给心爱的她发一封暖心邮件
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • ​软考-高级-信息系统项目管理师教程 第四版【第14章-项目沟通管理-思维导图】​
  • ​中南建设2022年半年报“韧”字当头,经营性现金流持续为正​
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • #QT(智能家居界面-界面切换)
  • (12)Hive调优——count distinct去重优化
  • (8)STL算法之替换
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (Mirage系列之二)VMware Horizon Mirage的经典用户用例及真实案例分析
  • (ZT)一个美国文科博士的YardLife
  • (博弈 sg入门)kiki's game -- hdu -- 2147
  • (简单) HDU 2612 Find a way,BFS。
  • (每日持续更新)jdk api之FileReader基础、应用、实战
  • (三)Honghu Cloud云架构一定时调度平台
  • (四)模仿学习-完成后台管理页面查询
  • (译) 函数式 JS #1:简介
  • *上位机的定义
  • .Net 8.0 新的变化
  • .net 程序发生了一个不可捕获的异常
  • .NET 动态调用WebService + WSE + UsernameToken
  • @data注解_一枚 架构师 也不会用的Lombok注解,相见恨晚
  • @JoinTable会自动删除关联表的数据