Push Down Aggregation Operations
Aggregation pushdown is an optimization technique that moves aggregation operations closer to the data source. SynxDB Elastic supports pushing down aggregations by performing the aggregation operator before the join operator.
In appropriate scenarios, aggregation pushdown can significantly reduce the size of the input set for the join or aggregation operator, thereby improving the operator’s execution performance.
Note
In the native PostgreSQL optimizer logic, aggregation operations in a query are always performed after all join operations have been completed (excluding subqueries). Therefore, SynxDB Elastic introduces the aggregation pushdown feature, allowing it to choose to perform aggregation operations earlier in suitable scenarios.
To determine whether the execution plan chosen by the optimizer has applied the aggregation pushdown optimization, you can observe the positional relationship between Aggregation and Join in the execution plan tree. If an execution plan first performs a Partial Aggregation, then a Join, and finally a Final Aggregation, it means the optimizer has applied aggregation pushdown.
Usage example
Before using this optimization, you need to manually enable the GUC parameter gp_enable_agg_pushdown
.
Additionally, you need to manually set optimizer=off
to disable the GPORCA optimizer, as this optimization currently only works in the PostgreSQL optimizer.
Here is an example of using aggregation pushdown optimization.
-- Creates two tables, t1 and t2.
CREATE TABLE t1(id INT, val1 INT);
CREATE TABLE t2(id INT, val2 INT);
SET OPTIMIZER=OFF; -- Disable the GPORCA optimizer
SET gp_enable_agg_pushdown=ON; -- Enable the GUC parameter
-- Executes a query with aggregation and join operations.
EXPLAIN (COSTS OFF) SELECT id, SUM(val1) FROM t1 NATURAL JOIN t2 GROUP BY id;
QUERY PLAN
-----------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Finalize GroupAggregate
Group Key: t1.id
-> Sort
Sort Key: t1.id
-> Hash Join
Hash Cond: (t2.id = t1.id)
-> Seq Scan on t2
-> Hash
-> Partial HashAggregate
Group Key: t1.id
-> Seq Scan on t1
Optimizer: Postgres query optimizer
(13 rows)
From the execution plan result in the example above, you can see that before performing the HashJoin operation, SynxDB Elastic first performs an aggregation on the t1
table based on the id
column. This aggregation does not compromise the correctness of the statement result and is likely to reduce the amount of data entering the HashJoin, thus improving the statement’s execution efficiency.
Applicable scenarios
Using aggregation pushdown in the following scenarios is expected to yield significant query performance improvements.
Scenario 1
Scenario description: Each record in one table corresponds to multiple records in another table, and the two tables need to be joined for a grouped aggregation.
For example, you need to join an order_tbl
with an order_line_tbl
and calculate the total amount for each order by summing the prices of its corresponding order items, i.e., SUM(price)
:
SELECT o.order_id, SUM(price)
FROM order_tbl o, order_line_tbl ol
WHERE o.order_id = ol.order_id
GROUP BY o.order_id;
Execution in the native PostgreSQL optimizer: The native PostgreSQL optimizer can only join the two tables first and then perform the aggregation. Because every order item in
order_line_tbl
must have a corresponding order inorder_tbl
, the Join operator will not filter out any data.With aggregation pushdown: Assuming each order contains an average of 10 order items, the data volume is expected to decrease by a factor of 10 after the Aggregation operator. With aggregation pushdown enabled, the database will first aggregate the data in
order_line_tbl
byorder_id
. This reduces the amount of data passed to the Join operator by a factor of 10, significantly lowering the cost of the Join. The corresponding execution plan is as follows:EXPLAIN SELECT o.order_id, SUM(price) FROM order_tbl o, order_line_tbl ol WHERE o.order_id = ol.order_id GROUP BY o.order_id; QUERY PLAN ----------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (cost=712.89..879.56 rows=10000 width=12) -> Finalize HashAggregate (cost=712.89..746.23 rows=3333 width=12) Group Key: o.order_id -> Hash Join (cost=617.00..696.23 rows=3333 width=12) Hash Cond: (ol.order_id = o.order_id) -> Partial HashAggregate (cost=538.00..571.38 rows=3338 width=12) Group Key: ol.order_id -> Seq Scan on order_line_tbl ol (cost=0.00..371.33 rows=33333 width=8) -> Hash (cost=37.33..37.33 rows=3333 width=4) -> Seq Scan on order_tbl o (cost=0.00..37.33 rows=3333 width=4) Optimizer: Postgres query optimizer
A similar scenario is joining a project
table with an experiment
table to calculate the total experiment cost for each project over the past year. The reference SQL statement is as follows:
SELECT proj_name, sum(cost)
FROM experiment e, project p
WHERE e.e_pid = p.p_pid AND e.start_time > now() - interval '1 year'
GROUP BY proj_name;
For this query, with aggregation pushdown enabled, SynxDB Elastic will pre-aggregate the experiment
table by the e_pid
column, grouping information for the same project first.
However, if this query also involves significant filtering on the project
table, the join selectivity might become too high, leading to inefficient execution. Therefore, aggregation pushdown is not currently suitable for this situation.
Scenario 2
Scenario description: The Join operator in the query significantly expands the result set, which ultimately needs to be grouped for calculation.
For example, joining a person_1
table with a person_2
table to find out how many different pairs can be formed for each common name between the two tables:
SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;
In this example, if a name
appears X times in the p1
table and Y times in the p2
table, that name
will appear X*Y times in the final result. If a large amount of data fits this pattern, the result set after the join could be very large.
In the example above, if the aggregation is pushed down to either the p1
or p2
side, each name
will appear at most once after aggregation on that side. This effectively reduces the cost of the Join operator and the size of the input set for the subsequent Aggregation operator. The corresponding execution plan is as follows:
EXPLAIN SELECT p1.name, COUNT(p1.name) FROM person_1 p1, person_2 p2 WHERE p1.name = p2.name GROUP BY p1.name;
QUERY PLAN
-----------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (cost=1758.62..1925.23 rows=9997 width=12)
-> Finalize HashAggregate (cost=1758.62..1791.94 rows=3332 width=12)
Group Key: p1.name
-> Hash Join (cost=762.93..1592.17 rows=33290 width=12)
Hash Cond: (p2.name = p1.name)
-> Seq Scan on p2 (cost=0.00..371.33 rows=33333 width=4)
-> Hash (cost=637.97..637.97 rows=9997 width=12)
-> Partial HashAggregate (cost=538.00..637.97 rows=9997 width=12)
Group Key: p1.name
-> Seq Scan on p1 (cost=0.00..371.33 rows=33333 width=4)
Optimizer: Postgres query optimizer
(11 rows)
Scenarios where it is not recommended
Aggregation pushdown is unlikely to provide performance benefits in the following scenarios and is not recommended.
Not recommended scenario 1
Scenario description: Scenarios where aggregation does not significantly change the data volume.
Contrary to the applicable scenarios above, if performing the aggregation early does not change the data volume and cannot reduce the input set size for subsequent calculations, the Join operator should be executed first to avoid unnecessary overhead.
Not recommended scenario 2
Scenario description: If the join key is different from the grouping key, aggregation pushdown will cause the grouping key to change after being pushed down. In this case, the aggregation after rewriting the grouping key cannot reduce the data volume, leading to poor pushdown effectiveness:
SELECT t1.value, COUNT(*) FROM t1, t2 WHERE t1.key = t2.key GROUP BY t1.value;
For the query example above, directly pushing down the aggregation to the t1
side would lead to incorrect results, similar to the situation in Limitation 1. To ensure the correctness of the calculation, the actual grouping key for the pushed-down aggregation would be equivalent to GROUP BY t1.key, t1.value
.
In this case, if the key
and value
in the t1
table are completely unrelated, each group might contain only a single tuple, so this aggregation pushdown will not have any positive effect. However, if key
and value
are strongly correlated, or if the same key
always corresponds to the same value
, the grouping effectiveness will not be affected.
In the example above, grouping by t1.value
was originally effective. But after aggregation pushdown, the grouping key becomes t1.key, t1.value
. If key
and value
have a weak correlation, this aggregation will not produce a significant effect.
Usage limitations
This section describes some limitations of the aggregation pushdown feature, including cases where this optimization is logically inapplicable and cases that are not yet supported by the engineering implementation.
Limitation 1
Limitation description: Aggregation pushdown cannot be applied when filtering is performed on columns other than the GROUP BY
columns during the join and subsequent calculations. Consider the following SQL query:
SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.val > t2.val GROUP BY id;
In the example above, assume there are two tuples A and B from the t1
table with id
= 100
, and a tuple C from the t2
table also with id
= 100
.
During the join of AB and C, even though A and B have the same id
, it is not guaranteed that they will both pass or fail the AB.val > C.val
filter condition simultaneously. In this situation, pre-aggregating the val
based on id
would inevitably sum the val
of A and B together. However, since they do not necessarily pass or fail the filter condition at the same time, this would lead to incorrect results.
In contrast, the following similar query example can apply aggregation pushdown:
SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id AND t1.id < t2.id_thre GROUP BY id;
This example also considers the same three tuples A, B, and C as in the previous example. Because the additional filter condition only uses the id
column from t1
, when the two tuples A and B with the same id
are joined with tuple C, they will either both pass the filter or both fail. Therefore, the val
of tuples A and B can be summed up in advance through an aggregation operation.
Limitation 2
Limitation description: Pushing down aggregations to both sides of a Join at the same time is not supported. Consider the following SQL query:
SELECT id, SUM(val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY id;
We can actually rewrite the statement to get an equivalent one:
SELECT id, sum1 * cnt2 FROM
(SELECT id, SUM(val) FROM t1 GROUP BY id) AT1(id, sum1),
(SELECT id, COUNT(*) FROM t2 GROUP BY id) AT2(id, cnt2)
WHERE AT1.id = AT2.id GROUP BY id;
In this example, the aggregation operation is pushed down to both sides of the Join. For all tuples in t1
with id
= 100
, SynxDB Elastic pre-aggregates their val
to get the corresponding sum1
.
During the actual join process, for each tuple in t2
with id
= 100
, it will be joined with the tuple containing sum1
to produce a corresponding result tuple. This means that for every id
= 100
in t2
, sum1
will appear once in the final summation. Therefore, SynxDB Elastic can pre-aggregate t2
to calculate that there are a total of cnt2
tuples with id
= 100
, and finally calculate the result using sum1 * cnt2
.
Because this scenario involves relatively complex statement and expression rewriting, it is not currently supported in the product.