Cross-Cluster Federated Query

SynxDB Elastic has supported cross-cluster federated queries, allowing you to query data across multiple PostgreSQL database clusters. This feature enables cross-cluster data pushdown, allowing queries to be executed locally in multiple clusters, and then the results are aggregated. The pushdown capability effectively reduces data transfer and improves query performance and processing efficiency.

This feature is implemented through the MPP foreign data wrapper postgres_fdw, which allows remote PostgreSQL databases to be treated as shards in SynxDB Elastic, enabling data sharing and federated queries across homogenous systems. Each remote PostgreSQL table can be viewed as an external table, and multiple remote tables can be added to SynxDB Elastic for operations. In this way, you can access and query data from other clusters within one cluster without complex data import or migration operations. This provides great convenience for scenarios that involve handling similar data across multiple clusters.

User scenarios

  • Data integration and analysis: In some companies, data is often spread across multiple database clusters of the same type. This feature helps companies integrate data across clusters for unified analysis and processing. The ability to perform cross-cluster federated queries allows businesses to quickly merge data from different units and generate unified reports and analysis results. This integration reduces the complexity of manual data importing and processing, while improving the real-time aspect of data analysis.

  • Simplified data management: The cross-cluster query feature reduces the need for duplicating data across databases, enabling data in each cluster to be directly involved in queries, avoiding the maintenance of redundant data. By treating remote databases as external shards, you can easily manage and access these shards without worrying about data synchronization. At the same time, this also reduces the potential risks and maintenance costs associated with cross-cluster data synchronization.

  • Efficient distributed querying: By pushing down queries to reduce network traffic and bottlenecks caused by centralized processing, this feature is especially suited for large-scale distributed data analyses. When handling big data, distributed querying can significantly reduce the computing and communication load, enhancing the system’s overall processing capacity. Especially when performing complex analytical calculations across multiple clusters, pushdown queries effectively utilize the computing resources of each cluster, improving processing efficiency.

Usages

This section explains how to perform cross-cluster federated queries using the postgres_fdw extension.

Prerequisites

  • Ensure that all clusters involved in the federated query can communicate with each other. This includes network connectivity and secure access configurations to make sure data can be transmitted smoothly between the clusters without restrictions.

Step 1. Create a foreign data wrapper

  1. Load the postgres_fdw foreign data wrapper in SynxDB Elastic.

    CREATE EXTENSION postgres_fdw;
    
  2. Create a remote server object, for example, testserver.

    CREATE SERVER testserver FOREIGN DATA WRAPPER postgres_fdw (host '<remote_server_IP>', dbname '<remote_database_name>', port '<remote_port_number>');
    

    You can create multiple server objects as needed, for example, creating mpps1, mpps2, and mpps3 for the remote databases fdw1, fdw2, and fdw3:

    CREATE SERVER mpps1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'fdw1', port '7000');
    CREATE SERVER mpps2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'fdw2', port '7000');
    CREATE SERVER mpps3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'fdw3', port '7000');
    

    These commands allow you to define different server objects for different remote databases, making it easier to create external tables and perform queries later.

Step 2. Create a user mapping

Create a user mapping for each server. You need to fill in the actual database username and password in the OPTIONS section.

CREATE USER MAPPING FOR CURRENT_USER SERVER mpps1 OPTIONS(user 'postgres', password 'xxx');
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps2 OPTIONS (user 'postgres', password 'xxx');
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps3 OPTIONS (user 'postgres', password 'xxx');

User mapping links the current database user with the remote database user, allowing access to remote data when running queries. Depending on your needs, you can also specify different access permissions for each server.

Step 3. Create a foreign table and add shards

Create a foreign table fs1, and add the table t1 from the remote server mpps1 as a foreign shard:

CREATE FOREIGN TABLE fs1 (
    a int,
    b text
) SERVER mpps1 OPTIONS (schema_name 'public', table_name 't1', mpp_execute 'all segments');

ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs1;

With these commands, you can add one or more tables from remote databases as foreign shards to an external table. The data from these shards automatically participates in queries. This allows you to access and manage remote shard data just like you would with local tables.

Step 4. Perform Cross-Cluster Federated Query

Single table query

For single table queries, you can directly query the foreign table fs1. The database system distributes the query to the remote cluster for execution.

  1. Check the query plan.

    EXPLAIN (COSTS OFF) SELECT * FROM fs1;
    

    The execution plan shows Foreign Scan on fs1, which indicates that the query operation has been distributed to the remote server for execution.

              QUERY PLAN
    ---
    Gather Motion 1:1  (slice1; segments: 1)
       ->  Foreign Scan on fs1
    Optimizer: Postgres query optimizer
    (3 rows)
    
  2. Run the query.

    SELECT * FROM fs1;
    

    The returned data is from the remote table t1.

    a |  b
    ---+------
    1 | fdw1
    (1 row)
    

    From the output, you can see that the data is only from the mpps1 server.

  3. Add other remote servers as shards.

    ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs1;
    ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs1;
    
  4. Check the query plan again.

    EXPLAIN (COSTS OFF) SELECT * FROM fs1;
    
                QUERY PLAN
    ---
    Gather Motion 3:1  (slice1; segments: 3)
       ->  Foreign Scan on fs1
    Optimizer: Postgres query optimizer
    (3 rows)
    
  5. Run the query again.

    SELECT * FROM fs1;
    

    Returned result:

    a |  b
    ---+------
    1 | fdw2
    1 | fdw1
    1 | fdw3
    (3 rows)
    

    The result shows that the newly added shards mpps2 and mpps3 have successfully participated in the query.

Multi-table join

For multi-table join operations, the optimizer automatically generates an appropriate execution plan based on the number of segments in the tables.

  1. Create the foreign table fs2.

    CREATE FOREIGN TABLE fs2 (
       a int,
       b text
       )
    SERVER mpps1
    OPTIONS (schema_name 'public', table_name 't2', mpp_execute 'all segments');
    
  2. Add segments.

    ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs2;
    ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs2;
    ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs2;
    
  3. Execute the join query between the two tables.

    EXPLAIN (COSTS OFF) SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a;
    

    The query plan shows that the optimizer has chosen Hash Join, and the data from both tables is redistributed based on key values, allowing the join operation to be performed across different nodes.

                            QUERY PLAN
    ---
    Gather Motion 3:1  (slice1; segments: 3)
       ->  Hash Join
             Hash Cond: (fs1.a = fs2.a)
             ->  Redistribute Motion 3:3  (slice2; segments: 3)
                   Hash Key: fs1.a
                   ->  Foreign Scan on fs1
             ->  Hash
                   ->  Redistribute Motion 3:3  (slice3; segments: 3)
                         Hash Key: fs2.a
                         ->  Foreign Scan on fs2
    Optimizer: Postgres query optimizer
    (11 rows)
    
  4. Run the actual join query.

    SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a;
    
    a |  b   | a |  b
    ---+------+---+------
    1 | fdw1 | 1 | fdw2
    1 | fdw1 | 1 | fdw1
    1 | fdw1 | 1 | fdw3
    1 | fdw2 | 1 | fdw2
    1 | fdw2 | 1 | fdw1
    1 | fdw2 | 1 | fdw3
    1 | fdw3 | 1 | fdw2
    1 | fdw3 | 1 | fdw1
    1 | fdw3 | 1 | fdw3
    (9 rows)
    

    The returned results show that all matching rows between fs1 and fs2 are successfully joined.

Join pushdown with gp_foreign_server condition

To further optimize query performance, you can add the gp_foreign_server condition in the join to enable join pushdown.

  1. Check the query plan.

    EXPLAIN (COSTS OFF) SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a AND fs1.gp_foreign_server = fs2.gp_foreign_server;
    

    The query plan shows that the Foreign Scan operation has pushed the join down to the remote server, reducing the burden on local computing.

                QUERY PLAN
    ---
    Gather Motion 3:1  (slice1; segments: 3)
       ->  Foreign Scan
             Relations: (fs1) INNER JOIN (fs2)
    Optimizer: Postgres query optimizer
    (4 rows)
    
  2. Run the query.

    SELECT * FROM fs1, fs2 WHERE fs1.a = fs2.a AND fs1.gp_foreign_server = fs2.gp_foreign_server;
    

    The returned result:

    a |  b   | a |  b
    ---+------+---+------
    1 | fdw3 | 1 | fdw3
    1 | fdw1 | 1 | fdw1
    1 | fdw2 | 1 | fdw2
    (3 rows)
    

    By adding the gp_foreign_server condition, the query is pushed down to the remote server, making fewer matching rows and greatly improving query efficiency.

Aggregate pushdown

Aggregate queries can be pushed down to the remote server to greatly reduce data transfer, improving query efficiency.

  1. Run an aggregate query.

    SELECT count(*) FROM fs1, fs2 WHERE fs1.a = fs2.a;
    

    The returned result:

    count
    ---
       9
    (1 row)
    
  2. Check the query plan.

    EXPLAIN (COSTS OFF) SELECT count(*) FROM fs1, fs2 WHERE fs1.a = fs2.a AND fs1.gp_foreign_server = fs2.gp_foreign_server;
    

    The query plan shows that the aggregate operation has been pushed down to the remote server, reducing the aggregation load on the local node.

                            QUERY PLAN
    ---
    Finalize Aggregate
       ->  Gather Motion 3:1  (slice1; segments: 3)
             ->  Foreign Scan
                   Relations: Aggregate on ((fs1) INNER JOIN (fs2))
    Optimizer: Postgres query optimizer
    
    (5 rows)
    

    By pushing the aggregation operation to the remote server, the database system can leverage the remote server’s computing power to perform part of the aggregation work, thus improving overall query performance.