In the context of databases, data aggregation includes operations such as:
Grouping rows by the value of a certain column, Calculating the sum, maximum, or minimum value of a certain column, Selecting distinct values of a column. To process such operations, the query plan tree uses aggregation nodes. This article explains the use-cases of different aggregation nodes and shows example query plans illustrating their use.
To follow along this article, it is necessary to already be familiar with more basic concepts, such as:
Aggregation operations are considered auxiliary operations. They work upon the output of more fundamental operations, like SELECT and JOIN. Thus, it is highly recommended that you know to read and understand query plans that involve:
Aggregation Nodes Aggregation operations use the GROUP BY, SUM, MAX and MIN, and SELECT DISTINCT clauses. These clauses are handled by aggregation nodes, as discussed below.
Aggregate The base Aggregate node is used for single-column aggregation operations, like COUNT, SUM, MAX, and MIN. These operations aggregate the values of a single column, and do not affect other columns.
HashAggregate The planner uses the HashAggregate node when the output of a GROUP BY clause doesn’t need to be sorted. This method scans each row in the table. While doing so, it builds a hash based on the value of the aggregation (GROUP BY) column. This hash is used to assign rows into buckets corresponding to different groups. Because it has to process the entire hash table, this method can have significant memory requirements. If the memory is insufficient, there is spillover onto disk - which can reduce performance.
If the planner uses this method, and if the hash spills over onto disk, its performance can be improved by increasing the working memory.
GroupAggregate GroupAggregate is commonly used when the resultset of the GROUP BY clause needs to be sorted. It accepts presorted rows and groups them into buckets. In some cases, it uses a Sort node to presort the input rows. If the child nodes are Index based scan nodes, a dedicated Sort node is not needed because the indexed data is implicitly sorted.
Because its input rows are presorted, GroupAggregate only has to process one group at a time. Thus, this operation has lower memory requirements than HashAggregate. If the planner uses this method, its performance can be improved by using the right index.
GroupAggregate or HashAggregate As discussed above, there are two approaches to aggregation - hashing and sorting. The choice of method depends on the query itself, the size and type of data, the availability of suitable indices, and the available working memory.
Even with sorted rows, GroupAggregate may not always be the most efficient when the number of groups is large relative to the dataset. In such cases, the planner can choose to use the HashAggregate method and then sort the resultset in a separate operation. Conversely, it can also choose to use GroupAggregate even if the output doesn’t need to be sorted. GroupAggregate can be more efficient than HashAggregate, especially if the available working memory is too small for the latter.
Parallelized Aggregation Nodes The above nodes can also be executed in parallel. The parallelization is handled by two sets of nodes - one that the worker processes execute in parallel, and another that the leader process uses to combine the results.
The nodes that the workers execute are:
Partial Aggregate Partial HashAggregate Partial GroupAggregate The corresponding nodes used by the leader process are:
Finalize Aggregate Finalize HashAggregate Finalize GroupAggregate The dataset to be aggregated is divided among two or more worker processes. Each worker runs a Partial aggregation node. The leader process runs a Finalize aggregate node. It receives and combines the output of the worker nodes. Note that the leader and workers do not need to run the same type of aggregation node. For example, if the workers use Partial HashAggregate, the leader can use either Finalize HashAggregate or Finalize GroupAggregate.
When the dataset to be aggregated is distributed among parallel workers, it is likely that there will be duplicates across workers. For example, consider a table with passenger names and ticket numbers. Each passenger can have many tickets. Suppose this table is to be grouped in parallel by passenger name. It is quite possible that the datasets of both workers have rows containing the same passenger name. Thus, those passengers that are present in both datasets are double-counted. The leader process eliminates these duplicates. It does this either by sorting or by hashing - i.e., using either the Finalize HashAggregate or the Finalize GroupAggregate nodes.
Unique The Unique node is used to get distinct values from a sorted list. It is commonly used when the query has a SELECT DISTINCT clause. Unique is not the only way to get distinct values. The HashAggregate and GroupAggregate operations discussed above can also be used to return unique values.
The next sections show a few examples to illustrate the concepts discussed so far. The examples in this article are based on the demo flight tickets mid-sized database . Earlier articles in the series also used this demo database. It is recommended to connect to this database, as explained in the introductory article on reading query plans , and to try out the examples in the following sections.
Single Column Aggregation To start, try a simple aggregation operation like COUNT on the ‘tickets’ table.
Example 1 =# EXPLAIN ANALYZE SELECT COUNT(*) FROM tickets ;
The query plan is shown below:
"Finalize Aggregate (cost=19208.29..19208.30 rows=1 width=8) (actual time=80.126..81.504 rows=1 loops=1)"
" -> Gather (cost=19208.08..19208.29 rows=2 width=8) (actual time=79.974..81.494 rows=3 loops=1)"
" Workers Planned: 2"
" Workers Launched: 2"
" -> Partial Aggregate (cost=18208.08..18208.09 rows=1 width=8) (actual time=77.021..77.023 rows=1 loops=3)"
" -> Parallel Seq Scan on tickets (cost=0.00..17344.46 rows=345446 width=0) (actual time=0.039..48.607 rows=276357 loops=3)"
"Planning Time: 0.070 ms"
"Execution Time: 81.541 ms"
In the above plan:
Since this is a simple single column aggregation, it uses the Aggregate node. In this case, it chooses to parallelize using the Partial Aggregate and Finalize Aggregate nodes. The output of the Partial Aggregate nodes is combined with a Gather node The Partial Aggregate nodes receive their data from child parallel sequential scan nodes. Grouping Results by a Single Column Consider an example where you need to find how many times each person appears in the tickets table. This might be useful to find something like the most frequent filers. To do this, use the GROUP BY clause to combine the rows of the table according to passenger names.
Example 2 =# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ;
To handle the GROUP BY clause, the planner needs either HashAggregate or GroupAggregate. The output query plan is shown below:
"Finalize HashAggregate (cost=22319.65..22421.83 rows=10218 width=24) (actual time=216.970..222.847 rows=27909 loops=1)"
" Group Key: passenger_name"
" Batches: 5 Memory Usage: 4145kB Disk Usage: 216kB"
" -> Gather (cost=20071.69..22217.47 rows=20436 width=24) (actual time=180.209..191.924 rows=60899 loops=1)"
" Workers Planned: 2"
" Workers Launched: 2"
" -> Partial HashAggregate (cost=19071.69..19173.87 rows=10218 width=24) (actual time=177.183..182.311 rows=20300 loops=3)"
" Group Key: passenger_name"
" Batches: 1 Memory Usage: 2577kB"
" Worker 0: Batches: 1 Memory Usage: 2833kB"
" Worker 1: Batches: 1 Memory Usage: 2577kB"
" -> Parallel Seq Scan on tickets (cost=0.00..17344.46 rows=345446 width=16) (actual time=0.043..48.760 rows=276357 loops=3)"
"Planning Time: 0.064 ms"
"Execution Time: 224.796 ms"
The planner has chosen to parallelize the aggregation operation. So:
The topmost node is a Finalize HashAggregate. This operation uses a hash table to combine rows for a “GROUP BY clause. The child node of the Finalize HashAggregate node is a Gather node. The Gather node combines the resultsets of the parallel workers The working memory is sufficient for hashing to be efficient. So the parallel workers choose Partial HashAggregate. Each worker node sequentially scans the underlying table to get the rows to build the hash. In this case, the parent Finalize node can use hashing to combine the outputs of the Partial HashAggregates. To do this, it needs to split the dataset in 5 batches. Had the working memory been less, it would have needed many more batches. This would have been inefficient. The next example shows the effect when working memory is too low for hashing to be efficient.
Effect of Working Memory To simulate the effect of insufficient memory, artificially reduce the working memory to 1 MB (from the default value of 4MB):
=# SET work_mem = '1 MB' ; Example 3 Re-run the same query used in Example 2:
=# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ;
Notice that the plan cost and execution time are now considerably higher. The query plan has also changed considerably.
The topmost node now uses GroupAggregate instead of HashAggregate (in Example 2). Hashing is a memory intensive operation. When working memory is insufficient, hashing works in batches and stashes intermediate results to disk - this is inefficient when the number of batches is high.
Thus, the planner chooses to first sort the output of the child nodes and then use Finalize GroupAggregate in the parent node. Even though sorting is processor intensive, it is still cheaper than hashing using too many batches.
The query plan is shown below:
"Finalize GroupAggregate (cost=44630.76..47219.48 rows=10218 width=24) (actual time=270.025..309.145 rows=27909 loops=1)"
" Group Key: passenger_name"
" -> Gather Merge (cost=44630.76..47015.12 rows=20436 width=24) (actual time=270.014..293.964 rows=61056 loops=1)"
" Workers Planned: 2"
" Workers Launched: 2"
" -> Sort (cost=43630.73..43656.28 rows=10218 width=24) (actual time=264.612..270.608 rows=20352 loops=3)"
" Sort Key: passenger_name"
" Sort Method: external merge Disk: 728kB"
" Worker 0: Sort Method: external merge Disk: 720kB"
" Worker 1: Sort Method: external merge Disk: 776kB"
" -> Partial HashAggregate (cost=39474.60..42950.27 rows=10218 width=24) (actual time=198.285..223.757 rows=20352 loops=3)"
" Group Key: passenger_name"
" Batches: 5 Memory Usage: 1073kB Disk Usage: 2312kB"
" Worker 0: Batches: 5 Memory Usage: 1073kB Disk Usage: 1760kB"
" Worker 1: Batches: 5 Memory Usage: 1073kB Disk Usage: 3400kB"
" -> Parallel Seq Scan on tickets (cost=0.00..17344.46 rows=345446 width=16) (actual time=0.053..52.217 rows=276357 loops=3)"
"Planning Time: 0.081 ms"
"Execution Time: 311.872 ms"
Given relatively large tables and a multithreaded system, the planner decides to use parallel workers.
Execution starts at the bottommost node with parallel sequential scans. The output of the sequential scans feeds into the partial hash aggregate nodes of the parallel workers. The output of the parallel HashAggregate nodes is explicitly sorted, using a Sort node, The sorted output of the workers is fed into the Finalize GroupAggregate node (which needs sorted inputs) which combines the resultsets of the workers. some textThe hashing and grouping by the partial hash aggregate nodes of the child workers has reduced the size of the resultset. On this smaller number of rows, it is more efficient to process sorted rows, than to process a large hash table in batches. Notice that the planner uses Sort nodes even though the query doesn’t ask for it. This example also illustrates that the planner can mix and match different types of aggregation nodes while parallelizing. In this case, child Partial HashAggregate nodes feed into a parent Finalize GroupAggregate node.
Example 4 - Exercise Recall that the above example used sorting (instead of hashing) to merge the output of parallel workers. Add an explicit sorting clause to the query used in Example 3. The sorting is according to the same column as the grouping.
=# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ORDER BY passenger_name ;
Notice that the query plan is identical. This is not surprising because it already has sorted the rows.
Before continuing further, ensure to reset the working memory to its default size:
Suggested Read: PostgreSQL Query Plans for Sorting Data
Aggregation with Sorting With the working memory reset to its default (higher) value, re-run the same query (using sorting) as in the previous example.
Example 5 =# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ORDER BY passenger_name ;
Below are selected lines from the output of EXPLAIN on this query:
"Sort (cost=23102.29..23127.84 rows=10218 width=24) (actual time=249.994..253.885 rows=27909 loops=1)"
" Sort Key: passenger_name"
" Sort Method: quicksort Memory: 2949kB"
" -> Finalize HashAggregate (cost=22319.65..22421.83 rows=10218 width=24) (actual time=206.050..214.283 rows=27909 loops=1)"
" Group Key: passenger_name"
" Batches: 5 Memory Usage: 4145kB Disk Usage: 216kB"
" -> Gather (cost=20071.69..22217.47 rows=20436 width=24) (actual time=157.395..173.091 rows=61147 loops=1)"
" Workers Planned: 2"
" Workers Launched: 2"
" -> Partial HashAggregate (cost=19071.69..19173.87 rows=10218 width=24) (actual time=154.389..160.904 rows=20382 loops=3)"
" Group Key: passenger_name"
" Batches: 1 Memory Usage: 2833kB"
" Worker 0: Batches: 1 Memory Usage: 2833kB"
" Worker 1: Batches: 1 Memory Usage: 2833kB"
" -> Parallel Seq Scan on tickets (cost=0.00..17344.46 rows=345446 width=16) (actual time=0.035..42.002 rows=276357 loops=3)"
"Planning Time: 0.068 ms"
"Execution Time: 255.919 ms"
As explained earlier, hashing is generally a very efficient operation, as long as there’s sufficient memory. So, the Finalize aggregation node is now a HashAggregate, instead of GroupAggregate (which was used when the working memory was low).
In the above plan:
The topmost node of the query plan is a Sort node. Since the aggregation has reduced the number of rows considerably, sorting is cheaper. some textThe number of rows in this table is around 830, 000. The number of unique passengers is around 28, 000. So, after aggregation, the Sort node has to work with only 28, 000 rows. Using GroupAggregate would have meant having to sort through 830, 000 rows. The rest of the plan is identical to the previous example which did not involve sorting Notice that even though the query requires sorting, the planner chose to use HashAggregate (and then sort explicitly) instead of using GroupAggregate, which directly produces sorted output.
Example 6 - Exercise To verify that HashAggregate (plus explicit sorting) is indeed cheaper than GroupAggregate in Example 5 above, disable hash aggregation:
=# SET enable_hashagg = 'off' ;
Re-run the query of Example 5:
=# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ORDER BY passenger_name ;
Notice that the query plan now uses GroupAggregate throughout and that it is considerably more expensive (and slower) than the plan in Example 5.
Before proceeding with further examples, make sure to re-enable hash aggregation:
=# RESET enable_hashagg ;
Effect of Indexing The previous queries do a SELECT on the passenger name column. Create an index on this column:
=# CREATE INDEX tickets_passenger_name ON tickets (passenger_name) ;
CREATE INDEX ;
=# VACUUM ANALYZE tickets ;
Example 7 To study how indexing affects aggregation, re-run the query which did a GROUP BY but did not use a ORDER BY clause, as in Examples 2 and 3:
=# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ;
Because there is now a relevant index:
The bottommost node can be an (parallel) index-only scan, instead of a (parallel) sequential scan Using the index-only scan returns sorted rows. So, it is efficient to use Partial GroupAggregate instead of HashAggregate, on the output of the parallel index-only scans. The topmost node is still a Finalize HashAggregate - because the output doesn’t need to be sorted. This node combines the resultsets of the child Partial GroupAggregates Notice that the cost of the query is now less than in Example 2. This is due to the index.
The query plan is shown below:
"Finalize HashAggregate (cost=15871.43..15973.61 rows=10218 width=24) (actual time=75.663..82.212 rows=27909 loops=1)"
" Group Key: passenger_name"
" Batches: 5 Memory Usage: 4145kB Disk Usage: 216kB"
" -> Gather (cost=1000.42..15769.25 rows=20436 width=24) (actual time=0.187..65.149 rows=28152 loops=1)"
" Workers Planned: 2"
" Workers Launched: 2"
" -> Partial GroupAggregate (cost=0.42..12725.65 rows=10218 width=24) (actual time=0.186..66.723 rows=9384 loops=3)"
" Group Key: passenger_name"
" -> Parallel Index Only Scan using tickets_passenger_name on tickets (cost=0.42..10896.24 rows=345446 width=16) (actual time=0.043..31.426 rows=276357 loops=3)"
" Heap Fetches: 0"
"Planning Time: 0.095 ms"
"Execution Time: 83.913 ms"
Example 8 - Exercise As in previous examples, add a sorting clause to the earlier query to sort the result according to the aggregation column:
=# EXPLAIN ANALYZE SELECT passenger_name, count(*) AS n FROM tickets GROUP BY passenger_name ORDER BY passenger_name ;
Observe that the new query plan is almost the same as Example 7 above. The only difference is that now the topmost node is a GroupAggregate (instead of HashAggregate) - this is because the output needs to be sorted. Furthermore, the output of the child Partial GroupAggregate nodes is combined using a Gather Merge (which preserves the sort order of the resultset), instead of Gather in Example 7.
SELECT DISTINCT When the query uses SELECT DISTINCT, the planner can choose to use the Unique node to filter out distinct values.
Example 9 Write a simple query to select unique values:
=# EXPLAIN ANALYZE SELECT DISTINCT passenger_name FROM tickets ;
The plan of this query is:
"Unique (cost=0.42..17805.17 rows=10266 width=16) (actual time=0.021..217.878 rows=27909 loops=1)"
" -> Index Only Scan using tickets_passenger_name on tickets (cost=0.42..15732.49 rows=829071 width=16) (actual time=0.019..111.271 rows=829071 loops=1)"
" Heap Fetches: 0"
"Planning Time: 0.085 ms"
"Execution Time: 219.934 ms"
Notice that the plan uses the Index created for Example 7 to do an index-only scan. The (sorted) resultset is passed to the Unique node, which returns the distinct values.
Example 10 The operation in Example 9 is efficient due to the use of the index-only scan. Recall that indexing leads to (implicitly) sorting the indexed column. Explicit sorting is an expensive operation. Test the effect of removing the index used in the query plan of Example 9.
=# DROP INDEX tickets_passenger_name ;
=# VACUUM ANALYZE tickets ;
Now re-run the same query from Example 9:
=# EXPLAIN ANALYZE SELECT DISTINCT passenger_name FROM tickets ;
The new query plan for the same query is:
"HashAggregate (cost=24253.39..24356.05 rows=10266 width=16) (actual time=372.509..376.856 rows=27909 loops=1)"
" Group Key: passenger_name"
" Batches: 1 Memory Usage: 3345kB"
" -> Seq Scan on tickets (cost=0.00..22180.71 rows=829071 width=16) (actual time=0.031..107.300 rows=829071 loops=1)"
"Planning Time: 0.048 ms"
"Execution Time: 378.363 ms"
Notice that the plan no longer uses the Unique node. Instead, it uses HashAggregate to get the distinct rows. Hashing works by creating a hash for each value (in this case, of the ‘passenger_name’ column). Repeated values lead to the same hash. Thus, hashing can also be used to return distinct values. Since there is no index to leverage, the starting node is a sequential scan. Hence, this plan is costlier than the one which used the index (in Example 9).
Example 11 - Exercise
The previous example used HashAggregate to get distinct values. As shown in Example 6, disable hash aggregation and re-run the query from Examples 10 above. In the new query plan, observe that:
The first step is sequentially scanning the ‘tickets’ table. Recall the index is deleted (in Example 10 above). The scanned data is explicitly sorted using the Sort node.some textRecall that the Unique node only works on sorted data. Observe that the sorting is the most expensive operation in this plan The topmost node is the Unique node which works on the sorted data. It starts returning unique rows as soon as it receives the sorted resultset. In a sorted dataset, duplicate values are all next to each other. So, it is easy to identify duplicates and return only unique copies of the data. Thus, in addition to hashing, sorting can also be used to return distinct values. As in Example 6, ensure to re-enable hash aggregation before moving further.
Conclusion This article discussed the nodes used in aggregation operations like SUM, MAX, GROUP BY, and so on. In addition to aggregation, sorting data is one of the most common operations used in both OLAP and OLTP queries. Some of the query plans in the above examples internally used the Sort node. The next article in the series looks deeper into how the planner handles queries involving explicit sorting using ORDER BY clauses.