当前位置: 首页 > news >正文

MogDB逻辑解码与pg_recvlogical

MogDB逻辑解码与pg_recvlogical

概述

谈到逻辑解码需要先从流复制开始说起。流复制的最重要的一个用途就是实现数据库的热备,数据库的主备同步方式是物理级别的数据同步。但实际应用场景中仅仅通过物理赋值是无法满足业务需求的,因此提供了逻辑复制的功能。

逻辑复制主要解决了以物理赋值无法解决的一些问题,例如:

  • 指定库或部分表的复制需求

  • 将多个数据库实例的数据汇聚到同一个目标库

  • 将一个库的数据分发到多个不同的库

  • 不同的版本之间的复制

  • 不同库名之间的表同步

逻辑复制的关键是将WAL日志的内容进行逻辑解码成特定的格式,如json,SQL等。pg_recvlogical 客户端工具就是逻辑解码的一种典型应用,它将WAL日志解码为json格式,保存在指定文件或标准输出stdout中。

逻辑复制约束

需要修改wal_level,在MogDB中wal_level有如下取值:

  • minimal
    • 优点:一些重要操作(包括创建表、创建索引、簇操作和表的复制)都能安全的跳过,这样就可以使操作变得更快。
    • 缺点:WAL仅提供从数据库服务器崩溃或者紧急关闭状态恢复时所需要的基本信息,无法用WAL归档日志恢复数据。
  • archive
    • 这个参数增加了WAL归档需要的日志信息,从而可以支持数据库的归档恢复。
  • hot_standby
    • 这个参数进一步增加了在备机上运行的SQL查询的信息,这个参数只能在数据库服务重新启动后生效。
    • 为了在备机上开启只读查询,wal_level必须在主机上设置成hot_standby ,并且备机必须打开hot_standby参数。
  • logical
    • 这个参数表示WAL日志支持逻辑复制。
      默认值: minimal

须知

  • 如果需要启用WAL日志归档和主备机的数据流复制,必须将此参数设置为archive或者hot_standby。
  • 如果此参数设置为minimal,archive_mode必须设置为off,hot_standby必须设置为off,max_wal_senders参数设置为0,且需为单机环境,否则将导致数据库无法启动。
  • 如果此参数设置为archive,hot_standby必须设置为off,否则将导致数据库无法启动。但是,hot_standby在双机环境中不能设置为off,具体参见hot_standby参数说明。

注意:

  • 与PostgreSQL的wal_level略有不同,PostgreSQL只包括3种级别:minimal, replica, or logical
  • PostgreSQL中wal_level的默认值是replica,它会写入足够的数据以支持WAL归档和复制,包括在后备服务器上运行只读查询。minimal会去掉除从崩溃或者立即关机中进行恢复所需的信息之外的所有记录。最后,logical会增加支持逻辑解码所需的信息。每个层次包括所有更低层次记录的信息。这个参数只能在服务器启动时设置。

postgres.conf配置

wal_level = logical                     # minimal, archive, hot_standby or logical
                                        # (change requires restart)
max_wal_senders = 10            # max number of walsender processes
                                # (change requires restart)
wal_keep_segments = 16          # in logfile segments, 16MB each; 0 disables
max_replication_slots = 8       # max number of replication slots.i

注:修改配置后需要重启MogDB

创建replication用户

MogDB=#CREATE ROLE pub_sub_user WITH SYSADMIN REPLICATION LOGIN PASSWORD 'pub_sub@123';
NOTICE:  The encrypted password contains MD5 ciphertext, which is not secure.
CREATE ROLE
MogDB=#

注:需要有SYSADMIN 和REPLICATION 权限

pg_hba.conf配置

# replication privilege.
#local   replication     omm                                trust
#host    replication     omm        127.0.0.1/32            trust
#host    replication     omm        ::1/128                 trust

host all all 0.0.0.0/0 md5
host replication all 0.0.0.0/0 md5

场景限制

  • 目前不支持DDL解析,只能解析DML(INSERT、UPDATE、DELETE,TRUNCATE);
  • TEMPORARY表和UNLOGGED表不会被复制;
  • 表必须有主键或唯一约束,否则像update或delete操作无法被复制;
  • 序列不被复制;
  • 大对象不被复制;
  • 新增加的表,不会自动加入订阅,需要在订阅端进行刷新;

pg_recvlogical工作原理

pg_recvlogical可以作为观察wal日志具体变化的工具。

整体架构

image.png

如上图:

  1. pg_recvlogical通过libpq与MogDB server建立链接
  2. 通过CREATE_REPLICATION_SLOT 建立逻辑复制槽
  3. 运行START_REPLICATION ,循环处理wal流,并进行解码
  4. 将解码数据写入文件或者在stdout屏幕数据
  5. pg_logical_slot_get_changes、pg_logical_slot_peek_changes可以查看修改情况
  6. DROP_REPLICATION_SLOT 销毁复制槽

pg_recvlogical主流程

image.png

  • 设置插件
    • pg_recvlogical默认使用 mppdb_decoding
plugin = pg_strdup("mppdb_decoding");
  • pg_recvlogical也可以通过命令行参数 --plugin 设置插件 如:–plugin = wal2json

image.png

通过ouput/decode插件处理pg_recvlogical可以将wal变更内容输出为特定格式(TXT、JISON)。

  • 命令行处理
pg_recvlogical receives logical change stream.

Usage:
  pg_recvlogical [OPTION]...

Options:
  -f, --file=FILE        receive log into this file. - for stdout
  -n, --no-loop          do not loop on connection lost
  -v, --verbose          output verbose messages
  -V, --version          output version information, then exit
  -?, --help             show this help, then exit

Connection options:
  -d, --dbname=DBNAME    database to connect to
  -h, --host=HOSTNAME    database server host or socket directory
  -p, --port=PORT        database server port number
  -U, --username=NAME    connect as specified database user
  -w, --no-password      never prompt for password
  -W, --password         force password prompt (should happen automatically)

Replication options:
  -F  --fsync-interval=INTERVAL
                         frequency of syncs to the output file (in seconds, defaults to 10)
  -o, --option=NAME[=VALUE]
                         Specify option NAME with optional value VAL, to be passed
                         to the output plugin
  -P, --plugin=PLUGIN    use output plugin PLUGIN (defaults to mppdb_decoding)
  -s, --status-interval=INTERVAL
                         time between status packets sent to server (in seconds, defaults to 10)
  -S, --slot=SLOT        use existing replication slot SLOT instead of starting a new one
  -I, --startpos=PTR     Where in an existing slot should the streaming start
  -r, --raw              parallel decoding output raw results without converting to text format

Action to be performed:
      --create           create a new replication slot (for the slotname see --slot)
      --start            start streaming in a replication slot (for the slotname see --slot)
      --drop             drop the replication slot (for the slotname see --slot)
  • CREATE_REPLICATION_SLOT
  • DROP_REPLICATION_SLOT
  • START_REPLICATION

以上3个命令通过词法和语法解析src/backend/replication/repl_scanner.lsrc/gausskernel/storage/replication/repl_gram.y

image.png

/* CREATE_REPLICATION_SLOT SLOT slot [%X/%X] */
 create_replication_slot:
			/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL [init_slot_lsn] */
 			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL RECPTR
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_PHYSICAL;
 					cmd->slotname = $2;
 					cmd->init_slot_lsn = $4;
 					$$ = (Node *) cmd;
 				}
			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
 				{
					CreateReplicationSlotCmd *cmd;
					cmd = makeNode(CreateReplicationSlotCmd);
					cmd->kind = REPLICATION_KIND_LOGICAL;
					cmd->slotname = $2;
					cmd->plugin = $4;
					$$ = (Node *) cmd;
 				}
 			;
 
 /* DROP_REPLICATION_SLOT slot */
 drop_replication_slot:
 			K_DROP_REPLICATION_SLOT IDENT
 				{
 					DropReplicationSlotCmd *cmd;
 					cmd = makeNode(DropReplicationSlotCmd);
 					cmd->slotname = $2;
 					$$ = (Node *) cmd;
 				}
 			;
  • 加载插件

output插件都要实现_PG_output_plugin_init 函数,和必要的回调函数。

image.png

回调函数集合:

/*
 * Output plugin callbacks
 */
typedef struct OutputPluginCallbacks {
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeAbortCB abort_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
} OutputPluginCallbacks;

主要的几个回调函数包括(mppdb_decoding为例):

  • pg_decode_begin_txn --开始事务
  • pg_decode_change --增、删、改、truncate
  • pg_decode_commit_txn --提交事务

演示

创建复制槽

 pg_recvlogical --create -S test_slot -d postgres

image.png

启动recvlogical

pg_recvlogical --start -S test_slot -d postgres -f -
-- “-f -” 表示输出到stdout(屏幕)

image.png

注:pg_recvlogical会不停地每个5s检查复制槽是否有更新

插入数据

MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

MogDB=#insert into pub_sub (name) values('test pg_recvlogical 01');
INSERT 0 1
MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
 location  |  xid  |                                                                                                                 data

-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------
 0/48C60D8 | 63062 | BEGIN 63062
 0/48C60D8 | 63062 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["23"
,"'test pg_recvlogical 01'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
 0/48C62B8 | 63062 | COMMIT 63062 (at 2022-09-01 11:14:24.539429+00) CSN 47645
(3 rows)

MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

MogDB=#insert into pub_sub (name) values('test pg_recvlogical 02');
INSERT 0 1
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
ERROR:  replication slot "test_slot" is already active
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
 location  |  xid  |                                                                                                                 data

-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------
 0/48C6548 | 63063 | BEGIN 63063
 0/48C6548 | 63063 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24"
,"'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
 0/48C6728 | 63063 | COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
(3 rows)

image.png

pg_recvlogical输出如下

BEGIN 63063
{"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24","'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646

因为pg_recvlogical默认使用mppdb_decoding插件,mppdb_decoding输出格式为json。

格式化后的json:

image.png

销毁复制槽

pg_recvlogical --drop -S test_slot -d postgres

image.png

换插件

  • wal2json
$ pg_recvlogical --create -S test_slot -d postgres --plugin=wal2json
$ pg_recvlogical --start -S test_slot -d postgres -f -

wal2json 没有显示BEGIN和COMMIT

image.png

警告:有概率会报一下错误,疑似BUG。

image.png

  • pgoutput
$ pg_recvlogical --create -S test_slot -d postgres --plugin=pgoutput
$ pg_recvlogical --start -S test_slot -d postgres -f -

更换为pgoutput 后回报版本错误:FATAL: client sent proto_version=0 but we only support protocol 1 or higher

image.png

SQL方式演示

MogDB=#SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'mppdb_decoding');
    slotname     | xlog_position
-----------------+---------------
 regression_slot | 0/48D3488
(1 row)


MogDB=#select * from pg_replication_slots;
    slot_name    |     plugin     | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn | dummy_standby
-----------------+----------------+-----------+--------+----------+--------+------+--------------+-------------+---------------
 user_sub        | pgoutput       | logical   |  15016 | postgres | t      |      |        63071 | 0/48D3438   | f
 wal2json        | wal2json       | logical   |  15016 | postgres | f      |      |        61212 | 0/40EBA88   | f
 test_slot       | mppdb_decoding | logical   |  15016 | postgres | f      |      |        61212 | 0/48C9A90   | f
 regression_slot | mppdb_decoding | logical   |  15016 | postgres | f      |      |        63071 | 0/48D3408   | f
(4 rows)

MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

MogDB=#CREATE TABLE data(id serial primary key, data text);
NOTICE:  CREATE TABLE will create implicit sequence "data_id_seq" for serial column "data.id"
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "data_pkey" for table "data"
CREATE TABLE


MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location  |  xid  |                           data
-----------+-------+-----------------------------------------------------------
 0/48C9D30 | 63070 | BEGIN 63070
 0/48D32E8 | 63070 | COMMIT 63070 (at 2022-09-01 11:51:40.602993+00) CSN 47650
(2 rows)

MogDB=#BEGIN;
BEGIN
MogDB=#INSERT INTO data(data) VALUES('1');
INSERT 0 1
MogDB=#INSERT INTO data(data) VALUES('2');
INSERT 0 1
MogDB=#COMMIT
#;
COMMIT
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location  |  xid  |                                                                                               data

-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------
 0/48E0CD0 | 63073 | BEGIN 63073
 0/48E0CD0 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["1","'1'"],"old_key
s_name":[],"old_keys_type":[],"old_keys_val":[]}
 0/48E1090 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["2","'2'"],"old_key
s_name":[],"old_keys_type":[],"old_keys_val":[]}
 0/48E11E0 | 63073 | COMMIT 63073 (at 2022-09-01 11:58:19.799037+00) CSN 47653
(4 rows)


MogDB=#SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
--------------------------

(1 row)


总结

逻辑复制/解码相对于物理复制更加灵活,也可以根据实际业务需要开发对应的逻辑解码插件,甚至可以当做ETL来使用。总的来说逻辑解码是对用户非常友好的接口。以上通过对MogDB逻辑复制、逻辑解码的原理和部分代码进行分析,利用pg_recvlogical和SQL演示逻辑解码的过程,希望对大家理解MogDB逻辑解码有所帮助。

相关文章:

  • OS--学习笔记:进程管理
  • CentOS8 安装Yapi
  • Git 详细教程之四: Git 对 GitHub 的配置和基本操作
  • 海外众筹:产品出海kickstarter海外众筹流程
  • JVM阶段(4)-回收策略
  • 万字长文保姆级教你制作自己的多功能QQ机器人
  • 365天深度学习 | 第7周:咖啡豆识别
  • 深入剖析JavaScript(二)——异步编程
  • 工业智能网关BL110应用之七: 支持 Modbus ,MQTT,opc 等协议,上传到阿里华为云等LOT
  • c和指针-struct结构
  • 计算机网络 二、网络协议
  • 容器编排工具鉴赏- docker-compose 、Kubernetes、OpenShift、Docker Swarm
  • 【论文笔记】—低光图像增强—Supervised—URetinex-Net—2022-CVPR
  • .NET Entity FrameWork 总结 ,在项目中用处个人感觉不大。适合初级用用,不涉及到与数据库通信。
  • 12c++呵呵老师【变量,定时器和事件】
  • 【翻译】Mashape是如何管理15000个API和微服务的(三)
  • 【剑指offer】让抽象问题具体化
  • Android单元测试 - 几个重要问题
  • Bootstrap JS插件Alert源码分析
  • Fastjson的基本使用方法大全
  • GitUp, 你不可错过的秀外慧中的git工具
  • Javascript弹出层-初探
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • 产品三维模型在线预览
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 前端之Sass/Scss实战笔记
  • 山寨一个 Promise
  • 线上 python http server profile 实践
  • 说说我为什么看好Spring Cloud Alibaba
  • ​ssh免密码登录设置及问题总结
  • ​软考-高级-信息系统项目管理师教程 第四版【第14章-项目沟通管理-思维导图】​
  • ###STL(标准模板库)
  • #传输# #传输数据判断#
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (4)事件处理——(2)在页面加载的时候执行任务(Performing tasks on page load)...
  • (cos^2 X)的定积分,求积分 ∫sin^2(x) dx
  • (搬运以学习)flask 上下文的实现
  • (定时器/计数器)中断系统(详解与使用)
  • (机器学习-深度学习快速入门)第三章机器学习-第二节:机器学习模型之线性回归
  • (三分钟)速览传统边缘检测算子
  • (心得)获取一个数二进制序列中所有的偶数位和奇数位, 分别输出二进制序列。
  • (转)大型网站的系统架构
  • (自用)learnOpenGL学习总结-高级OpenGL-抗锯齿
  • .apk 成为历史!
  • .gitignore文件---让git自动忽略指定文件
  • .Net CoreRabbitMQ消息存储可靠机制
  • .NET Core日志内容详解,详解不同日志级别的区别和有关日志记录的实用工具和第三方库详解与示例
  • .NET 应用架构指导 V2 学习笔记(一) 软件架构的关键原则
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • .NET下的多线程编程—1-线程机制概述
  • .NET中GET与SET的用法
  • [2010-8-30]
  • [20150707]外部表与rowid.txt
  • [20160902]rm -rf的惨案.txt
  • [android]-如何在向服务器发送request时附加已保存的cookie数据