Cross-Cluster Federated Query
SynxDB Elastic has supported cross-cluster federated queries, allowing you to query data across multiple PostgreSQL database clusters. This feature enables cross-cluster data pushdown, allowing queries to be executed locally in multiple clusters, and then the results are aggregated. The pushdown capability effectively reduces data transfer and improves query performance and processing efficiency.
This feature is implemented through the MPP foreign data wrapper postgres_fdw
, which allows remote PostgreSQL databases to be treated as shards in SynxDB Elastic, enabling data sharing and federated queries across homogenous systems. Each remote PostgreSQL table can be viewed as an external table, and multiple remote tables can be added to SynxDB Elastic for operations. In this way, you can access and query data from other clusters within one cluster without complex data import or migration operations. This provides great convenience for scenarios that involve handling similar data across multiple clusters.
User scenarios
Data integration and analysis: In some companies, data is often spread across multiple database clusters of the same type. This feature helps companies integrate data across clusters for unified analysis and processing. The ability to perform cross-cluster federated queries allows businesses to quickly merge data from different units and generate unified reports and analysis results. This integration reduces the complexity of manual data importing and processing, while improving the real-time aspect of data analysis.
Simplified data management: The cross-cluster query feature reduces the need for duplicating data across databases, enabling data in each cluster to be directly involved in queries, avoiding the maintenance of redundant data. By treating remote databases as external shards, you can easily manage and access these shards without worrying about data synchronization. At the same time, this also reduces the potential risks and maintenance costs associated with cross-cluster data synchronization.
Efficient distributed querying: By pushing down queries to reduce network traffic and bottlenecks caused by centralized processing, this feature is especially suited for large-scale distributed data analyses. When handling big data, distributed querying can significantly reduce the computing and communication load, enhancing the system’s overall processing capacity. Especially when performing complex analytical calculations across multiple clusters, pushdown queries effectively utilize the computing resources of each cluster, improving processing efficiency.
Usages
This section explains how to perform cross-cluster federated queries using the postgres_fdw
extension.
Prerequisites
Ensure that all clusters involved in the federated query can communicate with each other. This includes network connectivity and secure access configurations to make sure data can be transmitted smoothly between the clusters without restrictions.
Step 1. Create a foreign data wrapper
Load the
postgres_fdw
foreign data wrapper in SynxDB Elastic.CREATE EXTENSION postgres_fdw;
Create a remote server object, for example,
testserver
.CREATE SERVER testserver FOREIGN DATA WRAPPER postgres_fdw (host '<remote_server_IP>', dbname '<remote_database_name>', port '<remote_port_number>');
You can create multiple server objects as needed, for example, creating
mpps1
,mpps2
, andmpps3
for the remote databasesfdw1
,fdw2
, andfdw3
:CREATE SERVER mpps1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'fdw1', port '7000'); CREATE SERVER mpps2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'fdw2', port '7000'); CREATE SERVER mpps3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'fdw3', port '7000');
These commands allow you to define different server objects for different remote databases, making it easier to create external tables and perform queries later.
Step 2. Create a user mapping
Create a user mapping for each server. You need to fill in the actual database username and password in the OPTIONS
section.
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps1 OPTIONS(user 'postgres', password 'xxx');
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps2 OPTIONS (user 'postgres', password 'xxx');
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps3 OPTIONS (user 'postgres', password 'xxx');
User mapping links the current database user with the remote database user, allowing access to remote data when running queries. Depending on your needs, you can also specify different access permissions for each server.
Step 3. Create a foreign table and add shards
Create a foreign table fs1
, and add the table t1
from the remote server mpps1
as a foreign shard:
CREATE FOREIGN TABLE fs1 (
a int,
b text
) SERVER mpps1 OPTIONS (schema_name 'public', table_name 't1', mpp_execute 'all segments');
ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs1;
With these commands, you can add one or more tables from remote databases as foreign shards to an external table. The data from these shards automatically participates in queries. This allows you to access and manage remote shard data just like you would with local tables.
Step 4. Perform Cross-Cluster Federated Query
Single table query
For single table queries, you can directly query the foreign table fs1
. The database system distributes the query to the remote cluster for execution.
Check the query plan.
EXPLAIN (COSTS OFF) SELECT * FROM fs1;
The execution plan shows
Foreign Scan on fs1
, which indicates that the query operation has been distributed to the remote server for execution.QUERY PLAN --- Gather Motion 1:1 (slice1; segments: 1) -> Foreign Scan on fs1 Optimizer: Postgres query optimizer (3 rows)
Run the query.
SELECT * FROM fs1;
The returned data is from the remote table
t1
.a | b ---+------ 1 | fdw1 (1 row)
From the output, you can see that the data is only from the
mpps1
server.Add other remote servers as shards.
ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs1; ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs1;
Check the query plan again.
EXPLAIN (COSTS OFF) SELECT * FROM fs1;
QUERY PLAN --- Gather Motion 3:1 (slice1; segments: 3) -> Foreign Scan on fs1 Optimizer: Postgres query optimizer (3 rows)
Run the query again.
SELECT * FROM fs1;
Returned result:
a | b ---+------ 1 | fdw2 1 | fdw1 1 | fdw3 (3 rows)
The result shows that the newly added shards
mpps2
andmpps3
have successfully participated in the query.
Multi-table join
For multi-table join operations, the optimizer automatically generates an appropriate execution plan based on the number of segments in the tables.
Create the foreign table
fs2
.CREATE FOREIGN TABLE fs2 ( a int, b text ) SERVER mpps1 OPTIONS (schema_name 'public', table_name 't2', mpp_execute 'all segments');
Add segments.
ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs2; ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs2; ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs2;
Execute the join query between the two tables.
EXPLAIN (COSTS OFF) SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a;
The query plan shows that the optimizer has chosen
Hash Join
, and the data from both tables is redistributed based on key values, allowing the join operation to be performed across different nodes.QUERY PLAN --- Gather Motion 3:1 (slice1; segments: 3) -> Hash Join Hash Cond: (fs1.a = fs2.a) -> Redistribute Motion 3:3 (slice2; segments: 3) Hash Key: fs1.a -> Foreign Scan on fs1 -> Hash -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: fs2.a -> Foreign Scan on fs2 Optimizer: Postgres query optimizer (11 rows)
Run the actual join query.
SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a;
a | b | a | b ---+------+---+------ 1 | fdw1 | 1 | fdw2 1 | fdw1 | 1 | fdw1 1 | fdw1 | 1 | fdw3 1 | fdw2 | 1 | fdw2 1 | fdw2 | 1 | fdw1 1 | fdw2 | 1 | fdw3 1 | fdw3 | 1 | fdw2 1 | fdw3 | 1 | fdw1 1 | fdw3 | 1 | fdw3 (9 rows)
The returned results show that all matching rows between
fs1
andfs2
are successfully joined.
Join pushdown with gp_foreign_server condition
To further optimize query performance, you can add the gp_foreign_server
condition in the join to enable join pushdown.
Check the query plan.
EXPLAIN (COSTS OFF) SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a AND fs1.gp_foreign_server = fs2.gp_foreign_server;
The query plan shows that the
Foreign Scan
operation has pushed the join down to the remote server, reducing the burden on local computing.QUERY PLAN --- Gather Motion 3:1 (slice1; segments: 3) -> Foreign Scan Relations: (fs1) INNER JOIN (fs2) Optimizer: Postgres query optimizer (4 rows)
Run the query.
SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a AND fs1.gp_foreign_server = fs2.gp_foreign_server;
The returned result:
a | b | a | b ---+------+---+------ 1 | fdw3 | 1 | fdw3 1 | fdw1 | 1 | fdw1 1 | fdw2 | 1 | fdw2 (3 rows)
By adding the
gp_foreign_server
condition, the query is pushed down to the remote server, making fewer matching rows and greatly improving query efficiency.
Aggregate pushdown
Aggregate queries can be pushed down to the remote server to greatly reduce data transfer, improving query efficiency.
Run an aggregate query.
SELECT count(*) FROM fs1, fs2 WHERE fs1.a = fs2.a;
The returned result:
count --- 9 (1 row)
Check the query plan.
EXPLAIN (COSTS OFF) SELECT count(*) FROM fs1, fs2 WHERE fs1.a = fs2.a AND fs1.gp_foreign_server = fs2.gp_foreign_server;
The query plan shows that the aggregate operation has been pushed down to the remote server, reducing the aggregation load on the local node.
QUERY PLAN --- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Foreign Scan Relations: Aggregate on ((fs1) INNER JOIN (fs2)) Optimizer: Postgres query optimizer (5 rows)
By pushing the aggregation operation to the remote server, the database system can leverage the remote server’s computing power to perform part of the aggregation work, thus improving overall query performance.