zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

PostgreSQL通过解析日志,获取数据库增量变化,pg_recvlogical

数据库postgresql日志 获取 解析 通过 变化 增量
2023-09-27 14:26:06 时间

1.首先用该工具来看我们的日志变化,需要先将test_decoding插件编译并安装(进入contrib,编译安装即可)

 

创建一个slot

SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding’);

 

获取slot记录的变更日志:

postgres=# insert into test values(1, 'test', now());

INSERT 0 1

postgres=# insert into test values(1, 'test', now());

INSERT 0 1

 

只获取变化,不消费slot

postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);

    lsn    | xid |                                                             data

-----------+-----+-------------------------------------------------------------------------------------------------------------------------------

 0/16637F8 | 572 | BEGIN 572

 0/16637F8 | 572 | table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 11:52:08.147527'

 0/1663B20 | 572 | COMMIT 572

 0/1663B58 | 573 | BEGIN 573

 0/1663B58 | 573 | table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 11:52:09.67556'

 0/1663BD8 | 573 | COMMIT 573

(6 rows)

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          572 | 0/16637C0   | 0/16637F8

(1 row)

 

可以看到xmin还是572,且能再次查询到变化:

 

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          573 | 0/1663B20   | 0/1663CF0

(1 row)

 

postgres=# select pg_logical_slot_get_changes('regression_slot', NULL, null); pg_logical_slot_get_changes

-----------------------------

(0 rows)

 

 

2.远程使用pg_recvlogical工具来回放:

 

创建slot,当然也可以使用原来有的:

pg_recvlogical --create-slot -S test_logical -h pg36 -d postgres -p 5433

 

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          574 | 0/1663CB8   | 0/1663CF0

 test_logical    | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          574 | 0/1663CF0   | 0/1663D28

(2 rows)

 

-bash-4.1$ pg_recvlogical  -S test_logical -h pg36 -d postgres -p 5433 --start -f -

Password:

BEGIN 574

table public.test: DELETE: (no-tuple-data)

table public.test: DELETE: (no-tuple-data)

table public.test: DELETE: (no-tuple-data)

COMMIT 574

 

BEGIN 575

table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 14:03:02.985599'

COMMIT 575

 

如果slot已经建立好了,而远端的pg_recvlogical停止,停止的这段时间产生的数据库变化是不会被消费的,启动后可以接收到所有的变化。

 

3.可以设置从什么时候开始接收回放日志:

postgres=# select pg_current_wal_lsn();

 pg_current_wal_lsn

--------------------

 0/1664FB8

(1 row)

 

postgres=# delete from test where id = 1;

DELETE 7

postgres=# select txid_current();

 txid_current

--------------

          585

(1 row)

 

postgres=# select txid_current();

 txid_current

--------------

          586

(1 row)

 

postgres=#

postgres=#

postgres=# insert into test values(1, 'test 2', now());

INSERT 0 1

postgres=# select pg_current_wal_lsn();

 pg_current_wal_lsn

--------------------

 0/16652C0

(1 row)

 

postgres=# insert into test values(1, 'test 3', now());

INSERT 0 1

 

-bash-4.1$ pg_recvlogical  -S test_logical -h pg36 -d postgres -p 5433 --startpos=0/16652C0 --start -f -

Password:

BEGIN 588

table public.test: INSERT: id[integer]:1 info[text]:'test 3' crt_time[timestamp without time zone]:'2019-05-27 14:37:31.246539'

COMMIT 588

 

可以看到,只是从0/16652C0开始进行接收的

 

 

4.不用的时候,删除复制槽

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          574 | 0/1663CB8   | 0/1663CF0

 test_logical    | test_decoding | logical   |  13237 | postgres | f         | t      |      13155 |      |          589 | 0/1665420   | 0/1665458

(2 rows)

 

postgres=# select pg_drop_replication_slot('regression_slot');

 pg_drop_replication_slot

--------------------------

 

(1 row)

 

postgres=# select pg_drop_replication_slot('test_logical');

 pg_drop_replication_slot

--------------------------

 

(1 row)

 

postgres=# select * from pg_replication_slots ;

 slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

(0 rows)

 

4.可以使用wal2json工具来解析日志,目前使用的是test_decoding