Push Down Aggregation Operations

Aggregation pushdown is an optimization technique that moves the aggregation operation closer to the data source. SynxDB supports pushing down aggregation operations, which means computing the aggregation operator before the join operator.

In suitable scenarios, aggregation pushdown can significantly reduce the size of the input set for the join or aggregation operator, thereby improving the operator’s execution performance.

Note

  • In the optimizer logic of the native PostgreSQL kernel, aggregation operations in each query are always performed after all join operations are completed (excluding subqueries). Therefore, SynxDB introduces the aggregation pushdown feature, allowing it to choose to perform aggregation operations in advance in suitable scenarios.

  • To determine if the execution plan chosen by the optimizer has applied aggregation pushdown optimization, observe the positional relationship between Aggregation and Join in the execution plan tree. If an execution plan first performs a Partial Aggregation and then a Join operation, and finally a Final Aggregation, it indicates that the optimizer has applied aggregation pushdown.

Usage example

Before using this aggregation pushdown optimization, you need to manually enable the GUC parameter gp_enable_agg_pushdown.

In addition, you need to manually set optimizer=off to disable the GPORCA optimizer, as this optimization is currently effective only in the PostgreSQL optimizer.

The following is a usage example of aggregation pushdown optimization.

-- Create two tables, t1 and t2
CREATE TABLE t1(id INT, val1 INT);
CREATE TABLE t2(id INT, val2 INT);

SET OPTIMIZER=OFF; -- Disable the GPORCA optimizer
SET gp_enable_agg_pushdown=ON; -- Enable the GUC parameter

-- Execute a query with aggregation and join operations
EXPLAIN (COSTS OFF) SELECT id, SUM(val1) FROM t1 NATURAL JOIN t2 GROUP BY id;
                     QUERY PLAN
-----------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Finalize GroupAggregate
         Group Key: t1.id
         ->  Sort
               Sort Key: t1.id
               ->  Hash Join
                     Hash Cond: (t2.id = t1.id)
                     ->  Seq Scan on t2
                     ->  Hash
                           ->  Partial HashAggregate
                                 Group Key: t1.id
                                 ->  Seq Scan on t1
Optimizer: Postgres query optimizer
(13 rows)

From the execution plan result in the example above, you can see that before performing the HashJoin operation, SynxDB performs an aggregation operation on the t1 table based on the id column in advance. This aggregation operation does not compromise the correctness of the statement’s execution result and is likely to reduce the amount of data entering the HashJoin, thereby improving the efficiency of the statement’s execution.

Use cases

A significant query performance improvement is expected when using aggregation pushdown in the following scenarios.

Scenario one

Scenario description: Each piece of data in one table corresponds to multiple pieces of data in another table, and the two tables need to be joined for group aggregation.

For example, you need to join the order table (order_tbl) and the order line item table (order_line_tbl) and sum the prices of the corresponding order items for each order, that is, calculate the total amount SUM(price) for each order:

SELECT o.order_id, SUM(price)
  FROM order_tbl o, order_line_tbl ol
  WHERE o.order_id = ol.order_id
  GROUP BY o.order_id;
  • Execution method in the native PostgreSQL optimizer: The native PostgreSQL optimizer can only perform the aggregation operation after joining the two tables. Because each order item in the order_line_tbl table must have corresponding order information in the order_tbl table, this Join operator will not filter out any data.

  • Execution method in SynxDB: Assuming each order contains an average of 10 order items, the data volume is expected to decrease by a factor of 10 after aggregation by the Aggregation operator. With aggregation pushdown enabled, the database will first aggregate the data in order_line_tbl by order_id. This reduces the amount of data passed to the Join operator by a factor of 10, thus significantly reducing the overhead of the Join operator. The corresponding execution plan is as follows:

    EXPLAIN SELECT o.order_id, SUM(price)
    FROM order_tbl o, order_line_tbl ol
    WHERE o.order_id = ol.order_id
    GROUP BY o.order_id;
                                            QUERY PLAN
    -----------------------------------------------------------------------------------------------
    Gather Motion 3:1  (slice1; segments: 3)  (cost=712.89..879.56 rows=10000 width=12)
    ->  Finalize HashAggregate  (cost=712.89..746.23 rows=3333 width=12)
            Group Key: o.order_id
            ->  Hash Join  (cost=617.00..696.23 rows=3333 width=12)
                Hash Cond: (ol.order_id = o.order_id)
                ->  Partial HashAggregate  (cost=538.00..571.38 rows=3338 width=12)
                        Group Key: ol.order_id
                        ->  Seq Scan on order_line_tbl ol  (cost=0.00..371.33 rows=33333 width=8)
                ->  Hash  (cost=37.33..37.33 rows=3333 width=4)
                        ->  Seq Scan on order_tbl o  (cost=0.00..37.33 rows=3333 width=4)
    Optimizer: Postgres query optimizer
    

A similar scenario is: joining the project table (project) and the experiment table (experiment) to calculate the total experiment cost for each project project over the past year. The corresponding reference SQL statement is as follows:

SELECT proj_name, sum(cost)
    FROM experiment e, project p
    WHERE e.e_pid = p.p_pid AND e.start_time > now() - interval '1 year'
    GROUP BY proj_name;

For this query, with aggregation pushdown enabled, SynxDB will pre-aggregate the experiment table by the e_pid column, gathering the information for the same project first.

However, if this query also has many filters on the project table, it might currently cause the Join’s selectivity to be too high, leading to inefficient execution. Therefore, aggregation pushdown is temporarily not suitable for this situation.

Scenario two

Scenario description: The Join operator in the query statement significantly increases the result set size, and the final result needs to be calculated by grouping.

For example, to join the person_1 table with the person_2 table to find out how many different pairs each identical name can form between the two tables, the SQL query is as follows:

SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;

In this example, if a certain name appears X times in the p1 table and Y times in the p2 table, then this name will appear X*Y times in the final result. If a large amount of data is in this situation, the final result set after the Join might be very large.

In the example above, if the aggregation operation is pushed down to the p1 or p2 side in advance, each name will appear at most once on that side after aggregation, which can effectively reduce the overhead of the Join operator and the size of the input set that the subsequent Aggregation operator needs to process. The corresponding execution plan is as follows:

EXPLAIN SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=1758.62..1925.23 rows=9997 width=12)
   ->  Finalize HashAggregate  (cost=1758.62..1791.94 rows=3332 width=12)
         Group Key: p1.name
         ->  Hash Join  (cost=762.93..1592.17 rows=33290 width=12)
               Hash Cond: (p2.name = p1.name)
               ->  Seq Scan on p2  (cost=0.00..371.33 rows=33333 width=4)
               ->  Hash  (cost=637.97..637.97 rows=9997 width=12)
                     ->  Partial HashAggregate  (cost=538.00..637.97 rows=9997 width=12)
                           Group Key: p1.name
                           ->  Seq Scan on p1  (cost=0.00..371.33 rows=33333 width=4)
 Optimizer: Postgres query optimizer
(11 rows)

Usage restrictions

This section describes some restrictions of the aggregation pushdown feature, including cases where this optimization cannot be applied logically and cases that are not yet supported in the engineering implementation.

Restriction one

Restriction description: Aggregation pushdown cannot be applied when filtering is done on columns other than the GROUP BY column during the join and subsequent calculations. Consider the following SQL query:

SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.val > t2.val GROUP BY id;

In the example above, assume two tuples A and B from the t1 table with an id of 100, and a tuple C from the t2 table also with an id of 100.

During the join of AB and C, although A and B have the same id, it cannot be guaranteed that they will both pass or fail the filter condition AB.val > C.val simultaneously. In this case, summing the val based on id in advance will inevitably add the val of A and B together. However, because they do not necessarily pass or fail the filter condition at the same time, this will lead to incorrect results.

In contrast, the following similar query example can apply aggregation pushdown:

SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.id < t2.id_thre GROUP BY id;

In this example, considering the same three tuples ABC as in the previous example, because the additional filter condition only uses the id column from t1, the two tuples AB with the same id, when joined with a given tuple C, will either both pass or both fail the filter. Therefore, the val of tuples AB can be summed in advance through an aggregation operation.

Restriction two

Restriction description: Pushing down aggregation to both sides of a Join simultaneously is not supported. Consider the following SQL query:

SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY id;

We can actually rewrite the statement to get an equivalent one:

SELECT id, sum1 * cnt2 FROM
    (SELECT id, SUM(val) FROM t1 GROUP BY id) AT1(id, sum1),
    (SELECT id, COUNT(*) FROM t2 GROUP BY id) AT2(id, cnt2)
WHERE AT1.id = AT2.id GROUP BY id;

In this example, the aggregation operation is pushed down to both sides of the Join. For all tuples in the t1 table with an id of 100, SynxDB sums their val in advance to get the corresponding sum1.

During the actual join process, for each tuple in the t2 table with an id of 100, it will be joined with the tuple to which sum1 belongs, and a corresponding tuple will be obtained. This means that for every id of 100 in t2, sum1 will appear once in the final summation result. Therefore, SynxDB can pre-aggregate t2 to calculate that there are a total of cnt2 tuples with id of 100, and finally calculate the final result through sum1 * cnt2.

Because this scenario involves relatively complex statement and expression rewriting, it is not currently supported in the product.