Push Down Aggregation Operations

Aggregation pushdown is an optimization technique that moves aggregation operations closer to the data source. SynxDB Elastic supports pushing down aggregations by performing the aggregation operator before the join operator.

In appropriate scenarios, aggregation pushdown can significantly reduce the size of the input set for the join or aggregation operator, thereby improving the operator’s execution performance.

Note

  • In the native PostgreSQL optimizer logic, aggregation operations in a query are always performed after all join operations have been completed (excluding subqueries). Therefore, SynxDB Elastic introduces the aggregation pushdown feature, allowing it to choose to perform aggregation operations earlier in suitable scenarios.

  • To determine whether the execution plan chosen by the optimizer has applied the aggregation pushdown optimization, you can observe the positional relationship between Aggregation and Join in the execution plan tree. If an execution plan first performs a Partial Aggregation, then a Join, and finally a Final Aggregation, it means the optimizer has applied aggregation pushdown.

Usage example

Before using this optimization, you need to manually enable the GUC parameter gp_enable_agg_pushdown.

Additionally, you need to manually set optimizer=off to disable the GPORCA optimizer, as this optimization currently only works in the PostgreSQL optimizer.

Here is an example of using aggregation pushdown optimization.

-- Creates two tables, t1 and t2.
CREATE TABLE t1(id INT, val1 INT);
CREATE TABLE t2(id INT, val2 INT);

SET OPTIMIZER=OFF; -- Disable the GPORCA optimizer
SET gp_enable_agg_pushdown=ON; -- Enable the GUC parameter

-- Executes a query with aggregation and join operations.
EXPLAIN (COSTS OFF) SELECT id, SUM(val1) FROM t1 NATURAL JOIN t2 GROUP BY id;
                     QUERY PLAN
-----------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Finalize GroupAggregate
         Group Key: t1.id
         ->  Sort
               Sort Key: t1.id
               ->  Hash Join
                     Hash Cond: (t2.id = t1.id)
                     ->  Seq Scan on t2
                     ->  Hash
                           ->  Partial HashAggregate
                                 Group Key: t1.id
                                 ->  Seq Scan on t1
Optimizer: Postgres query optimizer
(13 rows)

From the execution plan result in the example above, you can see that before performing the HashJoin operation, SynxDB Elastic first performs an aggregation on the t1 table based on the id column. This aggregation does not compromise the correctness of the statement result and is likely to reduce the amount of data entering the HashJoin, thus improving the statement’s execution efficiency.

Applicable scenarios

Using aggregation pushdown in the following scenarios is expected to yield significant query performance improvements.

Scenario 1

Scenario description: Each record in one table corresponds to multiple records in another table, and the two tables need to be joined for a grouped aggregation.

For example, you need to join an order_tbl with an order_line_tbl and calculate the total amount for each order by summing the prices of its corresponding order items, i.e., SUM(price):

SELECT o.order_id, SUM(price)
  FROM order_tbl o, order_line_tbl ol
  WHERE o.order_id = ol.order_id
  GROUP BY o.order_id;
  • Execution in the native PostgreSQL optimizer: The native PostgreSQL optimizer can only join the two tables first and then perform the aggregation. Because every order item in order_line_tbl must have a corresponding order in order_tbl, the Join operator will not filter out any data.

  • With aggregation pushdown: Assuming each order contains an average of 10 order items, the data volume is expected to decrease by a factor of 10 after the Aggregation operator. With aggregation pushdown enabled, the database will first aggregate the data in order_line_tbl by order_id. This reduces the amount of data passed to the Join operator by a factor of 10, significantly lowering the cost of the Join. The corresponding execution plan is as follows:

    EXPLAIN SELECT o.order_id, SUM(price)
    FROM order_tbl o, order_line_tbl ol
    WHERE o.order_id = ol.order_id
    GROUP BY o.order_id;
                                            QUERY PLAN
    -----------------------------------------------------------------------------------------------
    Gather Motion 3:1  (slice1; segments: 3)  (cost=712.89..879.56 rows=10000 width=12)
    ->  Finalize HashAggregate  (cost=712.89..746.23 rows=3333 width=12)
            Group Key: o.order_id
            ->  Hash Join  (cost=617.00..696.23 rows=3333 width=12)
                Hash Cond: (ol.order_id = o.order_id)
                ->  Partial HashAggregate  (cost=538.00..571.38 rows=3338 width=12)
                        Group Key: ol.order_id
                        ->  Seq Scan on order_line_tbl ol  (cost=0.00..371.33 rows=33333 width=8)
                ->  Hash  (cost=37.33..37.33 rows=3333 width=4)
                        ->  Seq Scan on order_tbl o  (cost=0.00..37.33 rows=3333 width=4)
    Optimizer: Postgres query optimizer
    

A similar scenario is joining a project table with an experiment table to calculate the total experiment cost for each project over the past year. The reference SQL statement is as follows:

SELECT proj_name, sum(cost)
    FROM experiment e, project p
    WHERE e.e_pid = p.p_pid AND e.start_time > now() - interval '1 year'
    GROUP BY proj_name;

For this query, with aggregation pushdown enabled, SynxDB Elastic will pre-aggregate the experiment table by the e_pid column, grouping information for the same project first.

However, if this query also involves significant filtering on the project table, the join selectivity might become too high, leading to inefficient execution. Therefore, aggregation pushdown is not currently suitable for this situation.

Scenario 2

Scenario description: The Join operator in the query significantly expands the result set, which ultimately needs to be grouped for calculation.

For example, joining a person_1 table with a person_2 table to find out how many different pairs can be formed for each common name between the two tables:

SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;

In this example, if a name appears X times in the p1 table and Y times in the p2 table, that name will appear X*Y times in the final result. If a large amount of data fits this pattern, the result set after the join could be very large.

In the example above, if the aggregation is pushed down to either the p1 or p2 side, each name will appear at most once after aggregation on that side. This effectively reduces the cost of the Join operator and the size of the input set for the subsequent Aggregation operator. The corresponding execution plan is as follows:

EXPLAIN SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=1758.62..1925.23 rows=9997 width=12)
   ->  Finalize HashAggregate  (cost=1758.62..1791.94 rows=3332 width=12)
         Group Key: p1.name
         ->  Hash Join  (cost=762.93..1592.17 rows=33290 width=12)
               Hash Cond: (p2.name = p1.name)
               ->  Seq Scan on p2  (cost=0.00..371.33 rows=33333 width=4)
               ->  Hash  (cost=637.97..637.97 rows=9997 width=12)
                     ->  Partial HashAggregate  (cost=538.00..637.97 rows=9997 width=12)
                           Group Key: p1.name
                           ->  Seq Scan on p1  (cost=0.00..371.33 rows=33333 width=4)
 Optimizer: Postgres query optimizer
(11 rows)

Usage limitations

This section describes some limitations of the aggregation pushdown feature, including cases where this optimization is logically inapplicable and cases that are not yet supported by the engineering implementation.

Limitation 1

Limitation description: Aggregation pushdown cannot be applied when filtering is performed on columns other than the GROUP BY columns during the join and subsequent calculations. Consider the following SQL query:

SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.val > t2.val GROUP BY id;

In the example above, assume there are two tuples A and B from the t1 table with id = 100, and a tuple C from the t2 table also with id = 100.

During the join of AB and C, even though A and B have the same id, it is not guaranteed that they will both pass or fail the AB.val > C.val filter condition simultaneously. In this situation, pre-aggregating the val based on id would inevitably sum the val of A and B together. However, since they do not necessarily pass or fail the filter condition at the same time, this would lead to incorrect results.

In contrast, the following similar query example can apply aggregation pushdown:

SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.id < t2.id_thre GROUP BY id;

This example also considers the same three tuples A, B, and C as in the previous example. Because the additional filter condition only uses the id column from t1, when the two tuples A and B with the same id are joined with tuple C, they will either both pass the filter or both fail. Therefore, the val of tuples A and B can be summed up in advance through an aggregation operation.

Limitation 2

Limitation description: Pushing down aggregations to both sides of a Join at the same time is not supported. Consider the following SQL query:

SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY id;

We can actually rewrite the statement to get an equivalent one:

SELECT id, sum1 * cnt2 FROM
    (SELECT id, SUM(val) FROM t1 GROUP BY id) AT1(id, sum1),
    (SELECT id, COUNT(*) FROM t2 GROUP BY id) AT2(id, cnt2)
WHERE AT1.id = AT2.id GROUP BY id;

In this example, the aggregation operation is pushed down to both sides of the Join. For all tuples in t1 with id = 100, SynxDB Elastic pre-aggregates their val to get the corresponding sum1.

During the actual join process, for each tuple in t2 with id = 100, it will be joined with the tuple containing sum1 to produce a corresponding result tuple. This means that for every id = 100 in t2, sum1 will appear once in the final summation. Therefore, SynxDB Elastic can pre-aggregate t2 to calculate that there are a total of cnt2 tuples with id = 100, and finally calculate the result using sum1 * cnt2.

Because this scenario involves relatively complex statement and expression rewriting, it is not currently supported in the product.