Group By query execution in CrateDB
In this post we’ll look at how CrateDB distributes GROUP BY queries. This is the fourth post in a series about CrateDB internals.
I assume that you’ve read the first and second post of the series. You may also be interested in the third post, but it is not necessary to read it to follow this article.
The Query ΒΆ
Suppose we’ve a table that stores measurements sent from sensors with a schema like this:
CREATE TABLE metrics (
id text PRIMARY KEY,
value double NOT NULL,
sensor_type text NOT NULL,
ts timestamp with time zone DEFAULT now() NOT NULL
) CLUSTERED INTO 4 SHARDS WITH (number_of_replicas = 0);
And we want to know how many metrics we stored per sensor type. We can use a GROUP BY query to get the results:
SELECT
sensor_type,
count(*)
FROM
metrics
GROUP BY
sensor_type
ORDER BY
count(*) DESC
How does CrateDB execute this query? Let us first have a look at the logical plan:
EXPLAIN
SELECT
sensor_type,
count(*)
FROM
metrics
GROUP BY
sensor_type
ORDER BY
count(*) DESC;
+---------------------------------------------------+
| EXPLAIN |
+---------------------------------------------------+
| OrderBy[count(*) DESC] |
| β GroupHashAggregate[sensor_type | count(*)] |
| β Collect[doc.metrics | [sensor_type] | true] |
+---------------------------------------------------+
We see three operators, from top to bottom:
OrderBy
: This operator takes a relation as input and outputs the relation sorted by some attributes. Here there is a single attribute,count(*)
, and the sorting order is descending.GroupHashAggregate
: This operator takes a relation as input and groups the incoming row by one or more keys and optionally computes aggregations on the grouped rows. Here there is a single group keysensor_type
and a single aggregation function,count(*)
.Collect
: This operator retrieves the values from a table. The first article explained this operator in more detail.
If you’re familiar with other SQL databases, this may look fairly familiar.
One thing to point out here is that both GroupHashAggregate
and OrderBy
are operators that we refer to as pipeline breaker. A pipeline breaker needs to consume its entire input before it can produce a single output row. To produce the correct result for count(*)
, the aggregation needs to process each incoming row. Assuming that the input is unsorted, there is no way to short-circuit the operation and yield a row early.
The same holds true for the OrderBy
operator. Suppose we have the following three rows in the input:
'AA' | 10
'AB' | 18
'AC' | 21
Then the OrderBy
operator needs to see all rows to determine that 21
is the highest number and should be the first output row.
Given these constraints, how can CrateDB run the Group By query distributed?
Physical execution plan ΒΆ
As elaborated in the first article, the data of a table can be distributed across several shards and each node in a cluster can hold one or more shards. The data is sharded based on the values of the CLUSTERED BY
routing column. This column defaults to the primary key.
Suppose we have the following records in the table:
id |
sensor_type |
---|---|
ID1 | AA |
ID2 | AB |
ID3 | AA |
ID4 | BB |
ID5 | BB |
ID6 | AA |
ID7 | BB |
ID8 | AA |
ID9 | AB |
ID10 | BB |
The record with ID1
could be in a different shard than the record with ID3
, but both have the same sensor_type
value. Both records can be on two different nodes. To workaround this problem, CrateDB redistributes the data using a modulo operation. But we’re getting ahead of ourselves. Let’s back off for a moment and look at the execution plan.
The Collect
operator will turn into a RoutedCollectPhase
:
RoutedCollectPhase:
toCollect: [Ref{sensor_type}]
routing:
node-0:
metrics: [0, 1]
node-1:
metrics: [2, 3]
distributionInfo:
distributionType=MODULO
distributeByColumn=0
projections: [
GroupProjection,
]
This is similar to the RoutedCollectPhase
we saw in the second article. It specifies that the execution engine needs to collect the sensor_type
attribute from the shard 0 and 1 of the metrics
table on node-0
and from shard 2 and 3 of the metrics
table on node-1
:
There are also two new properties that are new: distributionInfo
and projections
. They’re strictly speaking part of every RoutedCollectPhase
but I omitted them in the second article because they were not relevant at that point. But here they are. The distributionInfo
indicates that the result must be distributed using a modulo operation on column zero and the projections
list transformations that the engine must apply to the result.
The GroupProjection
in more detail looks like this:
GroupProjection
keys: [
"INPUT(0)"
]
values: [
"count()"
]
mode: "ITER_PARTIAL"
outputs: [
"INPUT(0)"
"count()"
]
requiredGranularity: "SHARD"
Let’s walk through the properties:
keys
lists the grouping keys. Here we have aINPUT(0)
instead ofsensor_type
because at this level theGroupProjection
isn’t aware anymore of what concrete attributes it is operating on. TheINPUT(0)
construct specifies that the keys value will be in the first column of the incoming rows.values
lists the aggregations to apply.mode
influences how the aggregations are evaluated. In CrateDB it is possible to fully evaluate an aggregation or to partially evaluate it. Partial evaluation creates an intermediate result that you can merge together with another intermediate result. This allows us to merge the results from more than one shard or more than one node. An example aggregation function where this difference between full and partial evaluation is important is the average aggregationavg
. The average is calculated summing up all values and keeping track of the number of values seen. The intermediate state contains both: The running sum and the running count. To finalize the aggregation and produce the final result, the sum is divided by the count. It is possible to merge two averages only if you have both, the sum and the count, you cannot merge two final computed averages.ITER_PARTIAL
indices that the projection should output partially evaluated aggregations.outputs
: These are the outputs of the projections, in theGroupProjection
case this iskeys
+values
.requiredGranularity
:SHARD
here indicates that the projection can be run in parallel per shard.
In addition to the RoutedCollectPhase
, there are two MergePhases
:
MergePhase
name: "distributed merge"
executionNodes: [node-0, node-1]
numUpstreams: 2
projections: [
GroupProjection,
mode: "PARTIAL_FINAL"
requiredGranularity: "CLUSTER"
OrderedTopNProjection
]
The first MergePhase
has two upstreams, runs on both nodes and applies two projections. The MergePhase
itself doesn’t declare who the two upstreams are, but based on the full execution plan it is possible to infer that it refers to node-0
and node-1
which also run the RoutedCollectPhase
phase.
We also see another GroupProjection
, I omitted some of the properties except two that changed. The mode
is now PARTIAL_FINAL
- indicating that the incoming rows are intermediate (partial) results, and that it must produce the final results in the output. requiredGranularity
changed to CLUSTER
, which in this case just means that there is no further parallelization possible.
The second projection is a OrderedTopNProjection
. It will result in an ordering operation and contains information on which columns it has to sort the result.
Together, the RoutedCollectPhase
and MergePhase
will result in an execution that operates like this:
ββββββββββββββββββββ ββββββββββββββββββββ
β node-0 β β node-1 β
βCollectPhase β βCollectPhase β
β β GroupProjectionβ β β GroupProjectionβ
ββββββββββββββββββββ ββββββββββββββββββββ
β ββββββββββββββββββββββββ
βββββββββββββββββββββββ β
βΌ βΌ βΌ βΌ
ββββββββββββββββββββββββββ ββββββββββββββββββββββββββ
β node-0 β β node-1 β
βMergePhase β βMergePhase β
β β GroupProjection β β β GroupProjection β
β β OrderedTopNProjectionβ β β OrderedTopNProjectionβ
ββββββββββββββββββββββββββ ββββββββββββββββββββββββββ
The CollectPhase
running on node-0
is sending data to both, the MergePhase
running on node-0
and node-1
. Each node receives a subset of the full data. The data is split using the modulo operation mentioned earlier.
Suppose we’ve the records mentioned earlier, this is how the data is split:
| `id` | `sensor_type` |
| --- | ----------- |
| ID1 | AA | hash(AA) % 2 β 0
| ID2 | AB | hash(AB) % 2 β 1
| ID3 | AA | hash(AA) % 2 β 0
β² β²
β β
β β The downstream node to
| receive this record
β
This is the number of nodes used for the merge
We call this process bucketing. The modulo operation is applied to a hash of the group key value. This ensures that each downstream node will see all rows for a given key.
I mentioned that there are two MergePhase
instances. Here is the second one:
MergePhase
name: "mergeOnHandler"
executionNodes: [node-0],
numUpstreams: 2
projections: []
positionalOrderBy:
indices=[1]
reverseFlags=[true]
nullsFirst=[true]
This MergePhase
indicates that the result must be merged on node-0
- in this case the handler node. The phase doesn’t contain any further projections, but it contains a positionalOrderBy
property. This property indicates that the results it receives is sorted, and that it should preserve the order by applying a merge sort when merging the results.
The final, full execution graph:
ββββββββββββββββββββ ββββββββββββββββββββ
β node-0 β β node-1 β
βCollectPhase β βCollectPhase β
β β GroupProjectionβ β β GroupProjectionβ
ββββββββββββββββββββ ββββββββββββββββββββ
β ββββββββββββββββββββββββ
βββββββββββββββββββββββ β
βΌ βΌ βΌ βΌ
ββββββββββββββββββββββββββ ββββββββββββββββββββββββββ
β node-0 β β node-1 β
βMergePhase β βMergePhase β
β β GroupProjection β β β GroupProjection β
β β OrderedTopNProjectionβ β β OrderedTopNProjectionβ
ββββββββββββββββββββββββββ ββββββββββββββββββββββββββ
β β
βΌ β
βββββββββββββ β
β node-0 ββββ
β MergePhaseβ
βββββββββββββ
Wrap Up ΒΆ
I hope this article gave you an overview on how CrateDB executes GROUP BY queries. I again omitted lots of details and focused on the execution plan and distributed execution aspect. There are also cases where CrateDB will generate different plans. For example if the keys match the routing column it is possible to avoid the re-distribution step - unless the table is also partitioned.