1. citus介绍
citus是PostgreSQL数据库中的一种轻量级的分库分表解决方案。citus不是一个单独的程序,它是PostgreSQL数据库中的一个插件,可以使用create extension安装此插件。
每个citus集群有多个PostgreSQL数据库实例组成,数据库实例分为两类:
- master节点,通常有一台。master节点只存储分库分表的元数据,不存储实际的数据。
- worker节点,通常有多台。worker节点存储实际的分片数据(shard)。
用户只连接master节点,即用户把SQL发到master节点,然后master节点解析SQL,把SQL分解成各个子SQL发送到底层的worker节点,完成SQL的处理。
例如用户发一条如下的SQL到master节点:
SELECT reponse_type, avg(reponse_time) as responsetime_avg
FROM events
WHERE reponse_timestamp > '2013-11-01 23:20:00'
AND reponse_timestamp < '2013-11-01 23:30:00'
GROUP BY response_type
ORDER BY reponsetime_avg DESC;
master节点会把这条SQL分解成如下的子SQL发到底层的各个worker上:
SELECT reponse_type, sum(reponse_time), count(response_time)
FROM events_XXX
WHERE reponse_timestamp > '2013-11-01 23:20:00'
AND reponse_timestamp < '2013-11-01 23:30:00'
GROUP BY response_type
ORDER BY reponsetime_avg DESC;
上面的SQL中的“events_XXX”中的“XXX”代表分片号。当各个workder把数据返给master之后,master再做一次聚合运算,然后把结果返回用户。
一些更多的信息可以见citus的官方网站为:https://www.citusdata.com/
2. 安装citus
2.1 从二进制包安装
citus是一个插件,在ubuntu下只需要使用apt-get安装即可:
aptitude install postgresql-9.6 postgresql-9.6-citus
装完之后,可以检查一下安装上的包:
root@vds01:~# dpkg -l |grep postgresql-9.6
ii postgresql-9.6 9.6.2-1.pgdg14.04+1 amd64 object-relational SQL database, version 9.6 server
ii postgresql-9.6-citus 6.0.1.PGDG-1.pgdg14.04+1 amd64 sharding and distributed joins for PostgreSQL
2.2 编译安装
如果只是编译安装citus,而PostgreSQL数据库软件是通过操作系统包安装的,这时需要安装PostgreSQL开发包:
aptitude install postgresql-server-dev-9.6
然后从github下载源代码:
git clone https://github.com/citusdata/citus.git
编译源代码:
cd citus
./configure
make
在make时会报错:
/usr/include/postgresql/9.6/server/libpq/libpq-be.h:36:27: fatal error: gssapi/gssapi.h: No such file or directory
#include <gssapi/gssapi.h>
^
compilation terminated.
make[1]: *** [commands/transmit.o] Error 1
make[1]: Leaving directory `/data/citus/src/backend/distributed'
make: *** [extension] Error 2
安装libkrb5-dev包解决上面的问题:
aptitude install libkrb5-dev
然后再编译就没有问题了。
3. 创建citus集群
3.1 集群规划
因为本次安装只是做一个演示,所以我们把所有的数据库实例都建在一台机器上。本次创建三个实例:
实例名称 | 操作系统用户名 | 数据目录 | 端口 |
---|---|---|---|
实例01 | pg01 | /data/pg01 | 5432 |
实例02 | pg02 | /data/pg02 | 9702 |
实例03 | pg03 | /data/pg03 | 9703 |
3.2 建操作系统用户
groupadd -g 501 pg01
useradd -g pg01 -m -s /bin/bash -u 501 pg01
groupadd -g 502 pg02
useradd -g pg02 -m -s /bin/bash -u 502 pg02
groupadd -g 503 pg03
useradd -g pg03 -m -s /bin/bash -u 503 pg03
在pg01用户的.profile文件中添加如下内容:
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGDATA=/data/pg01
export PGPORT=5432
在pg02用户的.profile文件中添加如下内容:
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGDATA=/data/pg02
export PGPORT=9702
在pg03用户的.profile文件中添加如下内容:
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGDATA=/data/pg03
export PGPORT=9703
3.3 建数据库实例
分别在三个操作系统用户pg01、pg02、pg03下建三个数据库实例。
建的方法为:
initdb
然后分别修改各个实例下的postgresql.conf配置文件。
其中pg01需要修改的配置项如下:
listen_addresses = '*'
port = 5432
unix_socket_directories = '/tmp'
shared_preload_libraries = 'citus'
logging_collector = on
其中pg02需要修改的配置项如下:
listen_addresses = '*'
port = 9702
unix_socket_directories = '/tmp'
shared_preload_libraries = 'citus'
logging_collector = on
其中pg03需要修改的配置项如下:
listen_addresses = '*'
port = 9703
unix_socket_directories = '/tmp'
shared_preload_libraries = 'citus'
logging_collector = on
注意上面配置项中的“shared_preload_libraries = ‘citus’”,这一行就是为了装载citus插件。
创建完成后,就可以启动这三个数据库实例了:
启动pg01实例,在root用户下:
su - pg01
pg_ctl start
启动pg02实例,在root用户下:
su - pg02
pg_ctl start
启动pg03实例,在root用户下:
su - pg03
pg_ctl start
3.4 配置citus
在pg01、pg02、pg03这三个实例中分别执行下面的命令创建citus extension:
create extension citus;
为了方便后续的操作,创建一个用户citusr,以后就会用citusr用户测试citus的功能:
create user citusr superuser;
上面是为了方便,所以把用户创建成超级用户。实际上也可以创建成普通用户。
然后在pg01即master节点上,添加两个worker节点(即pg02和pg03):
SELECT * from master_add_node('localhost', 9702);
SELECT * from master_add_node('localhost', 9702);
增加完之后可以用下面的命令看是否增加成功:
postgres=# SELECT * FROM master_get_active_worker_nodes();
node_name | node_port
-----------+-----------
localhost | 9703
localhost | 9702
(2 rows)
做完以上操作之后,就可以创建表以及试用citus的功能了。
3.5 创建表以及测试citus的功能
在citus中有两类表:
- Distributed Tables:即分片表。表的内容通过hash分在各个worker节点中
- Reference Tables:即广播表,在每个分片中都复制一份。
下面我们创建两个分片表,注意这时使用我们之前建的用户citusr连接pg01:
pg01@vds01:~$ psql -Ucitusr postgres
psql (9.6.2)
Type "help" for help.
postgres=#
先按照与单机PostgreSQL数据库相同的方式建这两张表:
create table t01(id int, id2 int, t text);
create table t02(id int, id2 int, t text);
然后用下面的语句把这两张表定义为分片表:
select create_distributed_table('t01', 'id2');
select create_distributed_table('t02', 'id2', colocate_with=>'t01');
用下面的语句造一些临时数据,发现不支持:
postgres=# insert into t01 select id, id, lpad(id::text, 5, id::text) from generate_series(1,100) as t(id);
ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
从上面知道citus不支持insert 语句后面的SELECT另一张表的这种语法。
不过我们可以通过下面的方法来造数据:
copy (select id, id, lpad(id::text, 5, id::text) from generate_series(1,10000) as t(id)) to '/tmp/t01.txt';
copy t01 from '/tmp/t01.txt';
copy t02 from '/tmp/t01.txt';
从上面知道citus是支持copy命令的。
我们使用explain查看执行计划:
postgres=# explain select a.t, b.t from t01 a, t02 b where a.id2=b.id2 and a.id2=5;
QUERY PLAN
-------------------------------------------------------------------------------
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=9702 dbname=postgres
-> Nested Loop (cost=0.00..11.61 rows=1 width=12)
-> Seq Scan on t01_102050 a (cost=0.00..5.80 rows=1 width=10)
Filter: (id2 = 5)
-> Seq Scan on t02_102082 b (cost=0.00..5.80 rows=1 width=10)
Filter: (id2 = 5)
(11 rows)
发现上面的执行计划与原生的PostgreSQL数据库有所不同。
如果我们join时不使用分布键,会发生什么? 试一试:
postgres=# explain select a.t, b.t from t01 a, t02 b where a.id=b.id2 and a.id2=5;
ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker".
提示在“real time”执行器下不支持跨节点join,改成“task-tracker”可以支持,试一试:
postgres=# Set citus.task_executor_type to "task-tracker"; SET
postgres=# select a.t, b.t from t01 a, t02 b where a.id=b.id2 and a.id2=5;
t | t
-------+-------
55555 | 55555
(1 row)
Time: 9843.828 ms
从上面可以看出,可以执行,就是有些慢。
我们建两张广播表:
create table t03(id int, id2 int, t text);
create table t04(id int, id2 int, t text);
select create_reference_table('t03');
select create_reference_table('t04');
postgres=# copy t03 from '/tmp/t01.txt';
COPY 10000
Time: 34.356 ms
postgres=# copy t04 from '/tmp/t01.txt';
COPY 10000
Time: 30.163 ms
两张广播表join一下:
postgres=# select a.t, b.t from t03 a, t04 b where a.id=b.id2 and a.id2=5;
t | t
-------+-------
55555 | 55555
(1 row)
Time: 3.432 ms
postgres=# explain select a.t, b.t from t03 a, t04 b where a.id=b.id2 and a.id2=5;
QUERY PLAN
---------------------------------------------------------------------------------------
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=9703 dbname=postgres
-> Hash Join (cost=180.01..372.52 rows=1 width=12)
Hash Cond: (b.id2 = a.id)
-> Seq Scan on t04_102237 b (cost=0.00..155.00 rows=10000 width=10)
-> Hash (cost=180.00..180.00 rows=1 width=10)
-> Seq Scan on t03_102236 a (cost=0.00..180.00 rows=1 width=10)
Filter: (id2 = 5)
(12 rows)
Time: 13.071 ms
两张广播表join没有问题。
试一试,分片表与广播表join一下:
postgres=# select a.t, b.t from t01 a, t03 b where a.id=b.id2 and a.id2=5;
t | t
-------+-------
55555 | 55555
(1 row)
Time: 4.964 ms
postgres=# explain select a.t, b.t from t01 a, t03 b where a.id=b.id2 and a.id2=5;
QUERY PLAN
-------------------------------------------------------------------------------------
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=9702 dbname=postgres
-> Hash Join (cost=5.81..198.32 rows=1 width=12)
Hash Cond: (b.id2 = a.id)
-> Seq Scan on t03_102236 b (cost=0.00..155.00 rows=10000 width=10)
-> Hash (cost=5.80..5.80 rows=1 width=10)
-> Seq Scan on t01_102178 a (cost=0.00..5.80 rows=1 width=10)
Filter: (id2 = 5)
(12 rows)
Time: 15.099 ms
分片表与广播表join没有问题。
试一下分片表的聚合函数:
postgres=# select id, avg(id2) from t01 group by id limit 5;
id | avg
------+-----------------------
2848 | 2848.0000000000000000
251 | 251.0000000000000000
3565 | 3565.0000000000000000
2026 | 2026.0000000000000000
6158 | 6158.0000000000000000
(5 rows)
Time: 1108.943 ms
postgres=# explain select id, avg(id2) from t01 group by id limit 5;
QUERY PLAN
-----------------------------------------------------------------------------------
Distributed Query into pg_merge_job_0056
Executor: Task-Tracker
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9702 dbname=postgres
-> HashAggregate (cost=7.55..10.72 rows=317 width=20)
Group Key: id
-> Seq Scan on t01_102172 t01 (cost=0.00..5.17 rows=317 width=8)
Master Query
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: intermediate_column_56_0
-> Seq Scan on pg_merge_job_0056 (cost=0.00..0.00 rows=0 width=0)
(14 rows)
Time: 17.838 ms
也是没有问题。
好的,先试到这儿,祝大家愉快。
不错