[pgsql]postgresql利用pgq进行同步数据

Skytools包含三个组件:pgq、londiste、walmgr。

Pgq提供SQL API,由异步处理机制去灵活调用。用于解决实时事务的异步批处理问题。

Pgq由producer、ticker、consumer组成。Producer将events推送到queue中,ticker负责对批量queue制定相应处理规则,consumer从queue中获取events。

Londiste是基于pgq的事件传输功能的一个数据库复制工具,由python语言编写。

Walmgr是一个包含了WAL归档、基础备份及数据库运行时备份恢复功能的脚本。由python语言编写。

#安装python
wget https://www.python.org/ftp/python/3.4.3/Python-3.4.3.tar.xz
tar xf Python-3.4.3.tar.xz
cd Python-3.4.3
./configure –prefix=/usr/local/python
make
make install (make altinstall)

+++++++++++++++++++++++++++++++++++++++++++++++++++++

tar xf Python-2.7.9.tar.xz
cd Python-2.7.9
./configure –prefix=/usr/local/python27
make
make install
(make altinstall)
+++++++++++++++++++++++++++++++++++++++++++++++++++++

#安装psycopg2
cd /usr/local/
tar xvzf psycopg2-2.6.tar.gz
cd psycopg2-2.6
export C_INCLUDE_PATH=/usr/local/pgsql/include
export LIBRARY_PATH=/usr/local/pgsql/lib

PATH=$PATH:/usr/local/pgsql/bin/

/usr/local/python/bin/python3 setup.py build_ext -R /usr/local/pgsql/lib -I /usr/local/pgsql/include –pg-config /usr/local/pgsql/bin/pg_config

/usr/local/python/bin/python3 setup.py install build_ext -R /usr/local/pgsql/lib -I /usr/local/pgsql/include –pg-config /usr/local/pgsql/bin/pg_config

running install_egg_info
Writing /usr/local/python/lib/python3.4/site-packages/psycopg2-2.6-py3.4.egg-info
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
cd /usr/local/
tar xvzf psycopg2-2.6.tar.gz
cd psycopg2-2.6
/usr/local/python27/bin/python2.7 setup.py build_ext -R /usr/local/pgsql/lib -I /usr/local/pgsql/include –pg-config=/usr/local/pgsql/bin/pg_config

/usr/local/python27/bin/python2.7 setup.py install build_ext -R /usr/local/pgsql/lib -I /usr/local/pgsql/include –pg-config=/usr/local/pgsql/bin/pg_config

running install_egg_info
Writing /usr/local/python27/lib/python2.7/site-packages/psycopg2-2.6-py2.7.egg-info

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

在包含自 psycopg/psycopgmodule.c:27 的文件中:
./psycopg/psycopg.h:30:20: 错误:Python.h:没有那个文件或目录

#rm -rf /usr/bin/python
#ln -s /usr/local/python/bin/python3 /usr/bin/python

#安装skytools(python3不成功)
tar xvzf skytools-3.2.tar.gz
cd skytools-3.2

./configure –with-python=/usr/local/python27/bin/python2 –with-pgconfig=/usr/local/pgsql/bin/pg_config –prefix=/usr/local/skytools
make
make install
ls /usr/local/skytools
mkdir -p /usr/local/skytools/londiste3/pid
mkdir -p /usr/local/skytools/londiste3/log
chown postgres:postgres /usr/local/skytools/londiste3

export PYTHONPATH=/usr/local/skytools/lib/python2.7/site-packages:$PYTHONPATH
#配置master
#创建provider进程配置文件
cd /usr/local/skytools/londiste3
cat db_p.ini

[londiste3]
job_name = l3_db_p
db = host=192.168.232.234 port=5432 user=postgres password=postgres dbname=db_p
queue_name = replika
logfile = /usr/local/skytools/londiste3/log/l3_db_p.log
pidfile = /usr/local/skytools/londiste3/pid/l3_db_p.pid

#ondiste3 error with import pkgloader
export PYTHONPATH=/usr/local/skytools/lib/python2.7/site-packages:$PYTHONPATH

/usr/local/skytools/bin/londiste3 db_p.ini create-root node1 ‘host=192.168.232.234 port=5432 user=postgres password=postgres dbname=db_p’

2015-05-21 11:36:41,535 2057 INFO plpgsql is installed
2015-05-21 11:36:41,537 2057 INFO Installing pgq
2015-05-21 11:36:41,551 2057 INFO Reading from /usr/local/skytools/share/skytools3/pgq.sql
2015-05-21 11:36:42,384 2057 INFO pgq.get_batch_cursor is installed
2015-05-21 11:36:42,385 2057 INFO Installing pgq_ext
2015-05-21 11:36:42,386 2057 INFO Reading from /usr/local/skytools/share/skytools3/pgq_ext.sql
2015-05-21 11:36:42,555 2057 INFO Installing pgq_node
2015-05-21 11:36:42,556 2057 INFO Reading from /usr/local/skytools/share/skytools3/pgq_node.sql
2015-05-21 11:36:42,732 2057 INFO Installing londiste
2015-05-21 11:36:42,733 2057 INFO Reading from /usr/local/skytools/share/skytools3/londiste.sql
2015-05-21 11:36:42,987 2057 INFO londiste.global_add_table is installed
2015-05-21 11:36:43,130 2057 INFO Initializing node
2015-05-21 11:36:43,134 2057 INFO Location registered
2015-05-21 11:36:43,278 2057 INFO Node “node1” initialized for queue “replika” with type “root”
2015-05-21 11:36:43,283 2057 INFO Done

/usr/local/pgsql/bin/psql -U postgres db_p

db_p=# \dn
List of schemas
Name | Owner
———-+———-
londiste | postgres
pgq | postgres
pgq_ext | postgres
pgq_node | postgres
public | postgres

db_p=# set search_path to londiste,pgq,pgq_ext,pgq_node;
db_p=# \d+
List of relations
Schema | Name | Type | Owner | Size | Description
———-+————————-+———-+———-+————+————-
londiste | applied_execute | table | postgres | 8192 bytes |
londiste | pending_fkeys | table | postgres | 8192 bytes |
londiste | seq_info | table | postgres | 8192 bytes |
londiste | seq_info_nr_seq | sequence | postgres | 8192 bytes |
londiste | table_info | table | postgres | 8192 bytes |
londiste | table_info_nr_seq | sequence | postgres | 8192 bytes |
pgq | batch_id_seq | sequence | postgres | 8192 bytes |
pgq | consumer | table | postgres | 16 kB |
pgq | consumer_co_id_seq | sequence | postgres | 8192 bytes |
pgq | event_1 | table | postgres | 8192 bytes |
pgq | event_1_0 | table | postgres | 8192 bytes |
pgq | event_1_1 | table | postgres | 8192 bytes |
pgq | event_1_2 | table | postgres | 8192 bytes |
pgq | event_1_id_seq | sequence | postgres | 8192 bytes |
pgq | event_1_tick_seq | sequence | postgres | 8192 bytes |
pgq | event_template | table | postgres | 8192 bytes |
pgq | queue | table | postgres | 16 kB |
pgq | queue_queue_id_seq | sequence | postgres | 8192 bytes |
pgq | retry_queue | table | postgres | 8192 bytes |
pgq | subscription | table | postgres | 8192 bytes |
pgq | subscription_sub_id_seq | sequence | postgres | 8192 bytes |
pgq | tick | table | postgres | 16 kB |
pgq_ext | completed_batch | table | postgres | 8192 bytes |
pgq_ext | completed_event | table | postgres | 8192 bytes |
pgq_ext | completed_tick | table | postgres | 8192 bytes |
pgq_ext | partial_batch | table | postgres | 8192 bytes |
pgq_node | local_state | table | postgres | 16 kB |
pgq_node | node_info | table | postgres | 16 kB |
pgq_node | node_location | table | postgres | 16 kB |
pgq_node | subscriber_info | table | postgres | 8192 bytes |
(30 rows)

#启动worker
/usr/local/skytools/bin/londiste3 -d db_p.ini worker
ps -ef | grep lond

root 2145 1 0 11:42 ? 00:00:00 /usr/local/python27/bin/python2 /usr/local/skytools/bin/londiste3 -d db_p.ini worker

#配置pgq ticker

cat pgqd.ini

[pgqd]
logfile = /usr/local/skytools/londiste3/log/pgqd.log
pidfile = /usr/local/skytools/londiste3/pid/pgqd.pid
database_list = db_p

#启动ticker daemon
#export LD_LIBRARY_PATH=/usr/local/pgsql/lib:$LD_LIBRARY_PATH

#配置文件丢失[pgqd]
#ERROR load_init_file: value without section: logfile
#FATAL @pgqd.c:77 in function load_config(): failed to read config

/usr/local/skytools/bin/pgqd -d pgqd.ini
2015-05-21 14:00:13.934 3876 LOG Starting pgqd 3.2

tail -f /usr/local/skytools/londiste3/log/l3_db_p.log
tail -f /usr/local/skytools/londiste3/log/pgqd.log

#配置slave
#创建provider进程配置文件
cd /usr/local/skytools/londiste3
cat db_s.ini

[londiste3]
job_name = l3_db_s
db = host=192.168.232.235 port=5432 user=postgres password=postgres dbname=db_s
queue_name = replika
logfile = /usr/local/skytools/londiste3/log/l3_db_s.log
pidfile = /usr/local/skytools/londiste3/pid/l3_db_s.pid

{注:queue_name必须一致}

export PYTHONPATH=/usr/local/skytools/lib/python2.7/site-packages:$PYTHONPATH
/usr/local/skytools/bin/londiste3 db_s.ini create-leaf node2 ‘host=192.168.232.235 port=5432 user=postgres password=postgres dbname=db_s’ –provider=’host=192.168.232.234 port=5432 user=postgres password=postgres dbname=db_p’

/usr/local/pgsql/bin/psql -U postgres db_s
db_s=# \dn
db_s=# set search_path to londiste,pgq,pgq_ext,pgq_node;
db_s=# \d+
#启动worker
/usr/local/skytools/bin/londiste3 -d db_s.ini worker

/usr/local/skytools/bin/londiste3 db_s.ini status
Queue: replika Local node: node2

node1 (root)
| Tables: 0/0/0
| Lag: 2h54m7s, Tick: 1
+–: node2 (leaf)
Tables: 0/0/0
Lag: 2h54m7s, Tick: 1

/usr/local/skytools/bin/londiste3 db_s.ini members
Member info on node2@replika:
node_name dead node_location
————— ————— ————————————————————————–
node1 False host=192.168.232.234 port=5432 user=postgres password=postgres dbname=db_p
node2 False host=192.168.232.235 port=5432 user=postgres password=postgres dbname=db_s
#测试
【londiste1】
/usr/local/pgsql/bin/psql -U postgres db_p
db_p=# create table t1 (id int primary key,name varchar(20));

/usr/local/skytools/bin/londiste3 db_p.ini add-table public.t1
/usr/local/skytools/bin/londiste3 db_p.ini tables
Tables on node
table_name merge_state table_attrs
————— ————— —————
public.t1 ok

db_p=# \d t1
Table “public.t1”
Column | Type | Modifiers
——–+———————–+———–
id | integer | not null
name | character varying(20) |
Indexes:
“t1_pkey” PRIMARY KEY, btree (id)
Triggers:
_londiste_replika AFTER INSERT OR DELETE OR UPDATE ON t1 FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga(‘replika’)
_londiste_replika_truncate AFTER TRUNCATE ON t1 FOR EACH STATEMENT EXECUTE PROCEDURE pgq.sqltriga(‘replika’)

{此时,同步表会自动添加两个触发器}

【londiste2】
/usr/local/pgsql/bin/psql -U postgres db_s
db_s=# create table t1 (id int primary key,name varchar(20));
/usr/local/skytools/bin/londiste3 db_s.ini add-table public.t1
/usr/local/skytools/bin/londiste3 db_s.ini tables

Tables on node
table_name merge_state table_attrs
————— ————— —————
public.t1 None

{开始时为None,同步完成后变为ok}

【londiste1】

db_p=# insert into t1 values (1,’lsk’);

INSERT 0 1

【londiste2】

db_s=# select * from t1 ;

id | name

—-+——

1 | lsk

(1 row)

{测时可以看到数据已经同步,不过该方式的同步速度比较慢,不会将主库端的变更立刻在备库端体现,需要等待一段时间}

同步不成功(启动时加入 export PGUSER=postgres)
2015-05-21 15:12:07.190 4821 ERROR connection error: PQconnectPoll
2015-05-21 15:12:07.190 4821 ERROR libpq: FATAL: role “root” does not exist
2015-05-21 15:17:40.678 5029 ERROR crm: ERROR: function pgq.version() does not exist
LINE 1: select pgq.version()
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.

删除crm中的pgq模式
【级联复制模式】
londiste3 db1.ini create-root node1 ‘host=192.168.100.30 port=5432 user=postgres password=highgo dbname=db1’

londiste3 db2.ini create-branch node2 ‘host=192.168.100.31 port=5432 user=postgres password=highgo dbname=db2′ –provider=’host=192.168.100.30 port=5432 user=postgres password=highgo dbname=db1’

londiste3 db3.ini create-branch node3 ‘host=192.168.100.24 port=5432 user=postgres password=highgo dbname=db3′ –provider=’host=192.168.100.30 port=5432 user=postgres password=highgo dbname=db1’

londiste3 db4.ini create-branch node4 ‘host=192.168.100.25 port=5432 user=postgres password=highgo dbname=db4′ –provider=’host=192.168.100.31 port=5432 user=postgres password=highgo dbname=db2’

londiste3 db5.ini create-branch node5 ‘host=192.168.100.20 port=5432 user=postgres password=highgo dbname=db5′ –provider=’host=192.168.100.24 port=5432 user=postgres password=highgo dbname=db3′

londiste3 db1.ini status

Queue: replika Local node: node1

node1 (root)

| Tables: 0/0/0

| Lag: 24s, Tick: 17, NOT UPTODATE

+–: node2 (branch)

| | Tables: 0/0/0

| | Lag: 4h3m29s, Tick: 1, NOT UPTODATE

| +–: node4 (branch)

| Tables: 0/0/0

| Lag: 4h1m7s, Tick: 1, NOT UPTODATE

+–: node3 (branch)

| Tables: 0/0/0

| Lag: 4h3m29s, Tick: 1, NOT UPTODATE

+–: node5 (branch)

Tables: 0/0/0

Lag: 3h25m5s, Tick: 1, NOT UPTODATE

将node4的provider更改为node3,如下:
londiste3 db4.ini change-provider –provider=node3

takeover
使node3接管node5,如下:
londiste3 db3.ini takeover node5

子节点接管root节点,如下:
londiste3 db2.ini takeover node1

【合并复制模式】
同一台服务器,三个数据库 part1,part1,full1
三个role:root1,root2,full

#创建数据库
create database full1;
create database part1;
create database part2;

#配置ticker
cat pgqd-full-part.ini
[pgqd]
database_list = part1,part2,full1
logfile = /usr/local/skytools/londiste3/log/pgqd.log
pidfile = /usr/local/skytools/londiste3/pid/pgqd.pid

#数据库连接进程配置
vim part1.ini

[londiste3]
job_name = l3_part1
db = dbname=part1
queue_name = l3_part1_q
logfile = /usr/local/skytools/londiste3/log/%(job_name)s.log
pidfile = /usr/local/skytools/londiste3/pid/%(job_name)s.pid

vim part2.ini

[londiste3]
job_name = l3_part2
db = dbname=part2
queue_name = l3_part2_q
logfile = /usr/local/skytools/londiste3/log/%(job_name)s.log
pidfile = /usr/local/skytools/londiste3/pid/%(job_name)s.pid
vim part1_full1.ini

[londiste3]
job_name = l3_part1_full1
db = dbname=full1
queue_name = l3_part1_q
logfile = /usr/local/skytools/londiste3/log/%(job_name)s.log
pidfile = /usr/local/skytools/londiste3/pid/%(job_name)s.pid
vim part2_full1.ini

[londiste3]
job_name = l3_part2_full1
db = dbname=full1
queue_name = l3_part2_q
logfile = /usr/local/skytools/londiste3/log/%(job_name)s.log
pidfile = /usr/local/skytools/londiste3/pid/%(job_name)s.pid

#创建root节点1
/usr/local/skytools/bin/londiste3 part1.ini create-root part1_root dbname=part1
#创建root节点2
/usr/local/skytools/bin/londiste3 part2.ini create-root part2_root dbname=part2

#创建leaf节点1
/usr/local/skytools/bin/londiste3 part1_full1.ini create-leaf merge_part1_full1 dbname=full1 –provider=dbname=part1

#创建leaf节点2
/usr/local/skytools/bin/londiste3 part2_full1.ini create-leaf merge_part2_full1 dbname=full1 –provider=dbname=part2

#启动tricker
/usr/local/skytools/bin/pgqd -d pgqd-full-part.ini

2015-05-21 17:08:38.771 8299 LOG Starting pgqd 3.2

#启动worker
/usr/local/skytools/bin/londiste3 -d part1_full1.ini worker
/usr/local/skytools/bin/londiste3 -d part2_full1.ini worker

#测试
/usr/local/pgsql/bin/psql -d “part1” -c “create table mydata (id int4 primary key, data text)”
/usr/local/pgsql/bin/psql -d “part2” -c “create table mydata (id int4 primary key, data text)”

#root节点加入同步表
/usr/local/skytools/bin/londiste3 part1.ini add-table mydata
/usr/local/skytools/bin/londiste3 part2.ini add-table mydata

/usr/local/pgsql/bin/psql -d “full1” -c “select * from londiste.table_info order by queue_name”
nr | queue_name | table_name | local | merge_state | custom_snapshot | dropped_ddl | table_attrs | dest_table
—-+————+—————+——-+————-+—————–+————-+————-+————
1 | l3_part1_q | public.mydata | f | | | | |
2 | l3_part2_q | public.mydata | f | | | | |
(2 rows)
{看到两个queue已经添加}

#插入测试数据
/usr/local/pgsql/bin/psql part1
part1=# INSERT INTO mydata VALUES (1,’lianshunke1′);
part1=# \c part2
part2=# INSERT INTO mydata VALUES (2,’lianshunke2’);

#在full1中创建并合并同步表
/usr/local/skytools/bin/londiste3 part1_full1.ini add-table mydata –create –merge-all

/usr/local/pgsql/bin/psql -d “full1” -c “select * from londiste.table_info order by queue_name”
nr | queue_name | table_name | local | merge_state | custom_snapshot | dropped_ddl | table_attrs | dest_table
—-+————+—————+——-+————-+—————–+——————————————————+————-+————
1 | l3_part1_q | public.mydata | t | catching-up | 49228:49228: | ALTER TABLE public.mydata ADD CONSTRAINT mydata_pkey+| |
| | | | | | PRIMARY KEY (id); | |
2 | l3_part2_q | public.mydata | t | in-copy | | | |
(2 rows)
#拓扑情况
/usr/local/skytools/bin/londiste3 part1.ini status
Queue: l3_part1_q Local node: part1_root

part1_root (root)

| Tables: 1/0/0

| Lag: 1m0s, Tick: 33, NOT UPTODATE

+–: merge_part1_full1 (leaf)

Tables: 1/0/0

Lag: 1m0s, Tick: 33

/usr/local/skytools/bin/londiste3 part2.ini status

Queue: l3_part2_q Local node: part2_root

part2_root (root)
| Tables: 1/0/0
| Lag: 13s, Tick: 79, NOT UPTODATE
+–: merge_part2_full1 (leaf)
Tables: 1/0/0
Lag: 28m11s, Tick: 21
ERR: l3_part2_full1: [ev_id=9,ev_txid=49748] duplicate key value violates unique constraint “mydata_pkey”

#同步表状态
/usr/local/skytools/bin/londiste3 part1.ini tables

#node状态
/usr/local/skytools/bin/londiste3 part1.ini members

#同步状态比较
/usr/local/skytools/bin/londiste3 part1.ini compare

【分割复制模式】
同一台服务器,三个数据库 part_root,part_part0,part_part1
三个role root,leaf1,leaf2

#创建数据库
#创建配置模式与配置表
【part_part0】
part_part0=# create schema partconf;
part_part0=# CREATE TABLE partconf.conf (part_nr integer,max_part integer,db_code bigint,is_primary boolean,max_slot integer,cluster_name text);
part_part0=# insert into partconf.conf(part_nr, max_part) values(0,1);
【part_part1】
part_part1=# create schema partconf;
part_part1=# CREATE TABLE partconf.conf (part_nr integer,max_part integer,db_code bigint,is_primary boolean,max_slot integer,cluster_name text);
part_part1=# insert into partconf.conf(part_nr, max_part) values(1,1);

【part_root】
part_root=# create schema partconf;

#创建函数
cd /usr/local/pgsql/share/extension/
/usr/local/pgsql/bin/psql part_root < hashlib–1.0.sql
cd /usr/local/skytools-3.2
/usr/local/python27/bin/python2 setup_pkgloader.py build
/usr/local/python27/bin/python2 setup_pkgloader.py install
/usr/local/python27/bin/python2 setup_skytools.py build
/usr/local/python27/bin/python2 setup_skytools.py install

参与资料:
https://wiki.postgresql.org/wiki/Londiste_Tutorial#The_ticker_daemon
http://initd.org/psycopg/
http://www.cnblogs.com/top5/archive/2009/11/06/1597156.html
http://my.oschina.net/lianshunke/blog/201558

发表评论

电子邮件地址不会被公开。 必填项已用*标注