PostgreSQL 的xlog逻辑解析补丁, 可以从xlog中解出数据库中执行的SQL, 但是从测试情况来看目前不支持DDL的解析, 只有DML的解析.
未来PostgreSQL可以基于此增加基于SQL复制的功能.
补丁比较多, 有些是单独提交的, 本文将简单的测试一下.
下载PostgreSQL源码, 注意最好使用补丁释放前一天的版本 :
下载补丁
打补丁
mkdir patch
cd patch
wget http://www.postgresql.org/message-id/attachment/29336/0001-Add-support-for-multiple-kinds-of-external-toast-dat.patch.gz
wget http://www.postgresql.org/message-id/attachment/29337/0002-wal_decoding-Add-pg_xlog_wait_remote_-apply-receive-.patch.gz
wget http://www.postgresql.org/message-id/attachment/29338/0003-wal_decoding-Add-a-new-RELFILENODE-syscache-to-fetch.patch.gz
wget http://www.postgresql.org/message-id/attachment/29339/0004-wal_decoding-Add-RelationMapFilenodeToOid-function-t.patch.gz
wget http://www.postgresql.org/message-id/attachment/29340/0005-wal_decoding-Add-pg_relation_by_filenode-to-lookup-u.patch.gz
wget http://www.postgresql.org/message-id/attachment/29341/0006-wal_decoding-Introduce-InvalidCommandId-and-declare-.patch.gz
wget http://www.postgresql.org/message-id/attachment/29342/0007-wal_decoding-Adjust-all-Satisfies-routines-to-take-a.patch.gz
wget http://www.postgresql.org/message-id/attachment/29343/0008-wal_decoding-Allow-walsender-s-to-connect-to-a-speci.patch.gz
wget http://www.postgresql.org/message-id/attachment/29344/0009-wal_decoding-Add-alreadyLocked-parameter-to-GetOldes.patch.gz
wget http://www.postgresql.org/message-id/attachment/29345/0010-wal_decoding-Log-xl_running_xact-s-at-a-higher-frequ.patch.gz
wget http://www.postgresql.org/message-id/attachment/29346/0011-wal_decoding-copydir-make-fsync_fname-public.patch.gz
wget http://www.postgresql.org/message-id/attachment/29347/0012-wal_decoding-Add-information-about-a-tables-primary-.patch.gz
wget http://www.postgresql.org/message-id/attachment/29348/0013-wal_decoding-Introduce-wal-decoding-via-catalog-time.patch.gz
wget http://www.postgresql.org/message-id/attachment/29349/0014-wal_decoding-test_decoding-Add-a-simple-decoding-mod.patch.gz
wget http://www.postgresql.org/message-id/attachment/29350/0015-wal_decoding-pg_receivellog-Introduce-pg_receivexlog.patch.gz
wget http://www.postgresql.org/message-id/attachment/29351/0016-wal_decoding-test_logical_decoding-Add-extension-for.patch.gz
wget http://www.postgresql.org/message-id/attachment/29352/0017-wal_decoding-design-document-v2.4-and-snapshot-build.patch.gz
wget http://www.postgresql.org/message-id/attachment/29390/0001-wal_decoding-mergme-Fix-pg_basebackup-makefile.patch
wget http://www.postgresql.org/message-id/attachment/29391/0002-wal_decoding-mergme-Fix-test_logical_decoding-Makefi.patch
wget http://www.postgresql.org/message-id/attachment/29564/0001-wal_decoding-mergme-Don-t-use-out-of-scope-local-var.patch
gunzip *.gz
打补丁
tar -zxvf postgresql-bab54e3.tar.gz
cd postgresql-bab54e3
patch -p1 < ../patch/0001-Add-support-for-multiple-kinds-of-external-toast-dat.patch
patch -p1 < ../patch/0002-wal_decoding-Add-pg_xlog_wait_remote_-apply-receive-.patch
patch -p1 < ../patch/0003-wal_decoding-Add-a-new-RELFILENODE-syscache-to-fetch.patch
patch -p1 < ../patch/0004-wal_decoding-Add-RelationMapFilenodeToOid-function-t.patch
patch -p1 < ../patch/0005-wal_decoding-Add-pg_relation_by_filenode-to-lookup-u.patch
patch -p1 < ../patch/0006-wal_decoding-Introduce-InvalidCommandId-and-declare-.patch
patch -p1 < ../patch/0007-wal_decoding-Adjust-all-Satisfies-routines-to-take-a.patch
patch -p1 < ../patch/0008-wal_decoding-Allow-walsender-s-to-connect-to-a-speci.patch
patch -p1 < ../patch/0009-wal_decoding-Add-alreadyLocked-parameter-to-GetOldes.patch
patch -p1 < ../patch/0010-wal_decoding-Log-xl_running_xact-s-at-a-higher-frequ.patch
patch -p1 < ../patch/0011-wal_decoding-copydir-make-fsync_fname-public.patch
patch -p1 < ../patch/0012-wal_decoding-Add-information-about-a-tables-primary-.patch
patch -p1 < ../patch/0013-wal_decoding-Introduce-wal-decoding-via-catalog-time.patch
patch -p1 < ../patch/0014-wal_decoding-test_decoding-Add-a-simple-decoding-mod.patch
patch -p1 < ../patch/0015-wal_decoding-pg_receivellog-Introduce-pg_receivexlog.patch
patch -p1 < ../patch/0016-wal_decoding-test_logical_decoding-Add-extension-for.patch
patch -p1 < ../patch/0017-wal_decoding-design-document-v2.4-and-snapshot-build.patch
patch -p1 < ../patch/0001-wal_decoding-mergme-Fix-pg_basebackup-makefile.patch
patch -p1 < ../patch/0002-wal_decoding-mergme-Fix-test_logical_decoding-Makefi.patch
patch -p1 < ../patch/0001-wal_decoding-mergme-Don-t-use-out-of-scope-local-var.patch
确保没有FAILED的输出.
编译安装
报错
cd postgresql-bab54e3
./configure --prefix=/home/pg94/pgsql9.4devel --with-pgport=1921 --with-perl --with-tcl --with-python --with-openssl --with-pam --without-ldap --with-libxml --with-libxslt --enable-thread-safety --with-wal-blocksize=16 && gmake && gmake install
报错
snapbuild.c:206: error: redefinition of typedef ‘SnapBuild’
../../../../src/include/replication/snapbuild.h:54: error: previous declaration of ‘SnapBuild’ was here
gmake[4]: *** [snapbuild.o] Error 1
修复错误
将src/backend/replication/logical/snapbuild.c中的
SnapBuild结构定义复制到
src/include/replication/snapbuild.h,
同时删除
src/backend/replication/logical/snapbuild.c中的
SnapBuild结构定义.
如下
注释src/include/replication/snapbuild.h中的 SnapBuild 定义 :
重新执行gmake, 正常.
初始化数据库
逻辑复制的相关参数的变化 :
typedef struct SnapBuild
{
/* how far are we along building our first full snapshot */
SnapBuildState state;
/* private memory context used to allocate memory for this module. */
MemoryContext context;
/* all transactions < than this have committed/aborted */
TransactionId xmin;
/* all transactions >= than this are uncommitted */
TransactionId xmax;
/*
* Don't replay commits from an LSN <= this LSN. This can be set
* externally but it will also be advanced (never retreat) from within
* snapbuild.c.
*/
XLogRecPtr transactions_after;
/*
* Don't start decoding WAL until the "xl_running_xacts" information
* indicates there are no running xids with a xid smaller than this.
*/
TransactionId initial_xmin_horizon;
/*
* Snapshot thats valid to see all currently committed transactions that
* see catalog modifications.
*/
Snapshot snapshot;
/*
* LSN of the last location we are sure a snapshot has been serialized to.
*/
XLogRecPtr last_serialized_snapshot;
ReorderBuffer *reorder;
/* variable length data */
/*
* Information about initially running transactions
*
* When we start building a snapshot there already may be transactions in
* progress. Those are stored in running.xip. We don't have enough
* information about those to decode their contents, so until they are
* finished (xcnt=0) we cannot switch to a CONSISTENT state.
*/
struct
{
/*
* As long as running.xcnt all XIDs < running.xmin and > running.xmax
* have to be checked whether they still are running.
*/
TransactionId xmin;
TransactionId xmax;
size_t xcnt; /* number of used xip entries */
size_t xcnt_space; /* allocated size of xip */
TransactionId *xip; /* running xacts array, xidComparator-sorted */
} running;
/*
* Array of transactions which could have catalog changes that committed
* between xmin and xmax
*/
struct
{
/* number of committed transactions */
size_t xcnt;
/* available space for committed transactions */
size_t xcnt_space;
/*
* Until we reach a CONSISTENT state, we record commits of all
* transactions, not just the catalog changing ones. Record when that
* changes so we know we cannot export a snapshot safely anymore.
*/
bool includes_all_transactions;
/*
* Array of committed transactions that have modified the catalog.
*
* As this array is frequently modified we do *not* keep it in
* xidComparator order. Instead we sort the array when building &
* distributing a snapshot.
*
* XXX: That doesn't seem to be good reasoning anymore. Everytime we
* add something here after becoming consistent will also require
* distributing a snapshot. Storing them sorted would potentially make
* it easier to purge as well (but more complicated wrt wraparound?).
*/
TransactionId *xip;
} committed;
} SnapBuild;
注释src/include/replication/snapbuild.h中的 SnapBuild 定义 :
// struct SnapBuild;
// typedef struct SnapBuild SnapBuild;
重新执行gmake, 正常.
cd postgresql-bab54e3
gmake
gmake install
cd contrib
gmake
gmake install
初始化数据库
initdb -E UTF8 -D $PGDATA --locale=C -W -U postgres
逻辑复制的相关参数的变化 :
postgresql.conf
将以上参数配置为 :
#wal_level = minimal # minimal, archive, logical or hot_standby
# (change requires restart)
#max_wal_senders = 0 # max number of walsender processes, including
# both physical and logical replication senders.
# (change requires restart)
#max_logical_slots = 0 # max number of logical replication sender
# and receiver processes. Logical senders
# (but not receivers) also consume a
# max_wal_senders slot.
# (change requires restart)
将以上参数配置为 :
wal_level = logical
max_wal_senders = 16
max_logical_slots = 8
逻辑复制新增的命令
pg_receivellog
新增的几个函数 :
新增的extension :
启动数据库
测试 :
pg_receivellog测试
pg_receivellog receives PostgreSQL logical change stream.
Usage:
pg_receivellog [OPTION]...
Options:
-f, --file=FILE receive log into this file. - for stdout
-n, --no-loop do not loop on connection lost
-v, --verbose output verbose messages
-V, --version output version information, then exit
-?, --help show this help, then exit
Connection options:
-d, --database=DBNAME database to connect to
-h, --host=HOSTNAME database server host or socket directory
-p, --port=PORT database server port number
-U, --username=NAME connect as specified database user
-w, --no-password never prompt for password
-W, --password force password prompt (should happen automatically)
Replication options:
-P, --plugin=PLUGIN use output plugin PLUGIN (defaults to test_decoding)
-s, --status-interval=INTERVAL
time between status packets sent to server (in seconds)
-S, --slot=SLOT use existing replication slot SLOT instead of starting a new one
Action to be performed:
--init initiate a new replication slot (for the slotname see --slot)
--start start streaming in a replication slot (for the slotname see --slot)
--stop stop the replication slot (for the slotname see --slot)
Report bugs to <pgsql-bugs@postgresql.org>.
新增的几个函数 :
digoal=# \df *.*replica*
List of functions
Schema | Name | Result data type | Argument data types |
Type
------------+--------------------------+------------------+-----------------------------------------------------------------------+-
-------
pg_catalog | init_logical_replication | record | slotname name, plugin name, OUT slotname text, OUT xlog_position text |
normal
pg_catalog | stop_logical_replication | integer | name |
normal
(2 rows)
新增的extension :
cat test_logical_decoding--1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_logical_decoding" to load this file. \quit
CREATE FUNCTION start_logical_replication (slotname name, pos text, VARIADIC options text[] DEFAULT '{}', OUT location text, OUT xid bigint, OUT data text) RETURNS SETOF record
AS 'MODULE_PATHNAME', 'start_logical_replication'
LANGUAGE C IMMUTABLE STRICT;
启动数据库
pg_ctl start
测试 :
pg94@db-192-168-100-216-> psql
psql (9.4devel)
Type "help" for help.
digoal=# create extension test_logical_decoding;
CREATE EXTENSION
digoal=# SELECT * FROM init_logical_replication('regression_slot', 'test_decoding');
slotname | xlog_position
-----------------+---------------
regression_slot | 0/18402B8
(1 row)
digoal=# CREATE TABLE foo(id serial primary key, data text);
CREATE TABLE
digoal=# INSERT INTO foo(data) VALUES(1);
INSERT 0 1
digoal=# UPDATE foo SET id = -id, data = ':'||data;
UPDATE 1
digoal=# DELETE FROM foo;
DELETE 1
digoal=# DROP TABLE foo;
DROP TABLE
digoal=# SELECT * FROM start_logical_replication('regression_slot', 'now', 'hide-xids', '0');
location | xid | data
-----------+------+--------------------------------------------------------------------------------
0/18402E8 | 1686 | BEGIN
0/18402E8 | 1686 | COMMIT
0/185EEB0 | 1687 | BEGIN
0/185EEB0 | 1687 | table "foo": INSERT: id[int4]:1 data[text]:1
0/185EEB0 | 1687 | COMMIT
0/185EFC8 | 1688 | BEGIN
0/185EFC8 | 1688 | table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[text]::1
0/185EFC8 | 1688 | COMMIT
0/185F0A0 | 1689 | BEGIN
0/185F0A0 | 1689 | table "foo": DELETE: id[int4]:-1
0/185F0A0 | 1689 | COMMIT
0/185F2A8 | 1690 | BEGIN
0/185F2A8 | 1690 | COMMIT
(13 rows)
digoal=# SELECT * FROM pg_stat_logical_decoding ;
slot_name | plugin | database | active | xmin | restart_decoding_lsn
-----------------+---------------+----------+--------+------+----------------------
regression_slot | test_decoding | 16384 | f | 1686 | 0/1840280
(1 row)
digoal=# SELECT * FROM stop_logical_replication('regression_slot');
stop_logical_replication
--------------------------
0
(1 row)
pg_receivellog测试
配置pg_hba.conf
查询pg_stat_replication
DDL未记录 :
DML有记录
[参考]
local replication postgres trust
host replication postgres 127.0.0.1/32 trust
pg_ctl reload
pg94@db-192-168-100-216-> pg_receivellog -f ./test.logical -d digoal -h $PGDATA -p 1921 -U postgres -P test_decoding -s 1 -S test --init
WARNING: Initiating logical rep from 0/1861E10
pg94@db-192-168-100-216-> pg_receivellog -f ./test.logical -d digoal -h $PGDATA -p 1921 -U postgres -P test_decoding -s 1 -S test --start
WARNING: Starting logical replication
查询pg_stat_replication
digoal=# select * from pg_stat_replication ;
pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | st
ate | sent_location | write_location | flush_location | replay_location | sync_priority | sync_state
-------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+-----
------+---------------+----------------+----------------+-----------------+---------------+------------
30794 | 10 | postgres | pg_receivellog | | | -1 | 2013-07-30 16:38:56.100306+08 | stre
aming | 0/1865AF8 | | | | 0 | async
(1 row)
DDL未记录 :
digoal=# create table t(id int);
CREATE TABLE
pg94@db-192-168-100-216-> cat test.logical
BEGIN 1692
COMMIT 1692
DML有记录
digoal=# insert into t values (1);
INSERT 0 1
pg94@db-192-168-100-216-> cat test.logical
BEGIN 1692
COMMIT 1692
BEGIN 1693
table "t": INSERT: id[int4]:1
COMMIT 1693
digoal=# create table t1(id int);
CREATE TABLE
pg94@db-192-168-100-216-> cat test.logical
BEGIN 1692
COMMIT 1692
BEGIN 1693
table "t": INSERT: id[int4]:1
COMMIT 1693
BEGIN 1694
COMMIT 1694
digoal=# insert into t1 values (1);
INSERT 0 1
pg94@db-192-168-100-216-> cat test.logical
BEGIN 1692
COMMIT 1692
BEGIN 1693
table "t": INSERT: id[int4]:1
COMMIT 1693
BEGIN 1694
COMMIT 1694
BEGIN 1695
table "t1": INSERT: id[int4]:1
COMMIT 1695
[参考]
3. 补丁介绍
By Andres Freund
Hi!
I am rather pleased to announce the next version of the changeset
extraction patchset. Thanks to help from a large number of people I
think we are slowly getting to the point where it is getting
committable.
Since the last submitted version
(20121115002746(dot)GA7692(at)awork2(dot)anarazel(dot)de) a large number of fixes and
the result of good amount of review has been added to the tree. All
bugs known to me have been fixed.
Fixes include:
* synchronous replication support
* don't peg the xmin for user tables, do it only for catalog ones.
* arbitrarily large transaction support by spilling large transactions
to disk
* spill snapshots to disk, so we can restart without waiting for a new
snapshot to be built
* Don't read all WAL from the establishment of a logical slot
* tests via SQL interface to changeset extraction
The todo list includes:
* morph the "logical slot" interface into being "replication slots" that
can also be used by streaming replication
* move some more code from snapbuild.c to decode.c to remove a largely
duplicated switch
* do some more header/comment cleanup & clarification
* move pg_receivellog into its own directory in src/bin or contrib/.
* user/developer level documentation
The patch series currently has two interfaces to logical decoding. One -
which is primarily useful for pg_regress style tests and playing around
- is SQL based, the other one uses a walsender replication connection.
A quick demonstration of the SQL interface (server needs to be started
with wal_level = logical and max_logical_slots > 0):
=# CREATE EXTENSION test_logical_decoding;
=# SELECT * FROM init_logical_replication('regression_slot', 'test_decoding');
slotname | xlog_position
-----------------+---------------
regression_slot | 0/17D5908
(1 row)
=# CREATE TABLE foo(id serial primary key, data text);
=# INSERT INTO foo(data) VALUES(1);
=# UPDATE foo SET id = -id, data = ':'||data;
=# DELETE FROM foo;
=# DROP TABLE foo;
=# SELECT * FROM start_logical_replication('regression_slot', 'now', 'hide-xids', '0');
location | xid | data
-----------+-----+--------------------------------------------------------------------------------
0/17D59B8 | 695 | BEGIN
0/17D59B8 | 695 | COMMIT
0/17E8B58 | 696 | BEGIN
0/17E8B58 | 696 | table "foo": INSERT: id[int4]:1 data[text]:1
0/17E8B58 | 696 | COMMIT
0/17E8CA8 | 697 | BEGIN
0/17E8CA8 | 697 | table "foo": UPDATE: old-pkey: id[int4]:1 new-tuple: id[int4]:-1 data[text]::1
0/17E8CA8 | 697 | COMMIT
0/17E8E50 | 698 | BEGIN
0/17E8E50 | 698 | table "foo": DELETE: id[int4]:-1
0/17E8E50 | 698 | COMMIT
0/17E9058 | 699 | BEGIN
0/17E9058 | 699 | COMMIT
(13 rows)
=# SELECT * FROM pg_stat_logical_decoding ;
slot_name | plugin | database | active | xmin | restart_decoding_lsn
-----------------+---------------+----------+--------+------+----------------------
regression_slot | test_decoding | 12042 | f | 695 | 0/17D58D0
(1 row)
=# SELECT * FROM stop_logical_replication('regression_slot');
stop_logical_replication
--------------------------
0
The walsender interface has the same calls
INIT_LOGICAL_REPLICATION 'slot' 'plugin';
START_LOGICAL_REPLICATION 'slot' restart_lsn [(option value)*];
STOP_LOGICAL_REPLICATION 'slot';
The only difference is that START_LOGICAL_REPLICATION can stream changes
and it can support synchronous replication.
The output seen in the 'data' column is produced by a so called 'output
plugin' which users of the facility can write to suit their needs. They
can be written by implementing 5 functions in the shared object that's
passed to init_logical_replication() above:
* pg_decode_init (optional)
* pg_decode_begin_txn
* pg_decode_change
* pg_decode_commit_txn
* pg_decode_cleanup (optional)
The most interesting function pg_decode_change get's passed a structure
containing old/new versions of the row, the 'struct Relation' belonging
to it and metainformation about the transaction.
The output plugin can rely on syscache lookups et al. to decode the
changed tuple in whatever fashion it wants.
I'd like to invite reviewers to first look at:
* the output plugin interface
* the walsender/SRF interface
* patch 12 which contains most of the code
When reading the code, the information flow during decoding might be
interesting:
---------------
+---------------+
| XLogReader |
+---------------+
|
XLOG Records
|
v
+---------------+
| decode.c |
+---------------+
| |
| |
v |
+---------------+ |
| snapbuild.c | HeapTupleData
+---------------+ |
| |
catalog snapshots |
| |
v v
+---------------+
|reorderbuffer.c|
+---------------+
|
HeapTuple & Metadata
|
v
+---------------+
| Output Plugin |
+---------------+
|
Whatever you want
|
v
+---------------+
| Output Handler|
| |
|WalSnd or SRF |
+---------------+
---------------
Overview of the attached patches:
0001: indirect toast tuples; required but submitted independently
0002: functions for testing; not required,
0003: (tablespace, filenode) syscache; required
0004: RelationMapFilenodeToOid: required, simple
0005: pg_relation_by_filenode() function; not required but useful
0006: Introduce InvalidCommandId: required, simple
0007: Adjust Satisfies* interface: required, mechanical,
0008: Allow walsender to attach to a database: required, needs review
0009: New GetOldestXmin() parameter; required, pretty boring
0010: Log xl_running_xact regularly in the bgwriter: required
0011: make fsync_fname() public; required, needs to be in a different file
0012: Relcache support for an Relation's primary key: required
0013: Actual changeset extraction; required
0014: Output plugin demo; not required (except for testing) but useful
0015: Add pg_receivellog program: not required but useful
0016: Add test_logical_decoding extension; not required, but contains
the tests for the feature. Uses 0014
0017: Snapshot building docs; not required
Greetings,
Andres Freund