Group By query execution in CrateDB

  Friday, June 4, 2021 » CrateDB

Table of content


In this post we’ll look at how CrateDB distributes GROUP BY queries. This is the fourth post in a series about CrateDB internals.

I assume that you’ve read the first and second post of the series. You may also be interested in the third post, but it is not necessary to read it to follow this article.

The Query ΒΆ

Suppose we’ve a table that stores measurements sent from sensors with a schema like this:

CREATE TABLE metrics (
    id text PRIMARY KEY,
    value double NOT NULL,
    sensor_type text NOT NULL,
    ts timestamp with time zone DEFAULT now() NOT NULL
) CLUSTERED INTO 4 SHARDS WITH (number_of_replicas = 0);

And we want to know how many metrics we stored per sensor type. We can use a GROUP BY query to get the results:

SELECT
    sensor_type,
    count(*)
FROM
    metrics
GROUP BY
    sensor_type
ORDER BY
    count(*) DESC

How does CrateDB execute this query? Let us first have a look at the logical plan:

EXPLAIN
SELECT
    sensor_type,
    count(*)
FROM
    metrics
GROUP BY
    sensor_type
ORDER BY
    count(*) DESC;
+---------------------------------------------------+
| EXPLAIN                                           |
+---------------------------------------------------+
| OrderBy[count(*) DESC]                            |
|   β”” GroupHashAggregate[sensor_type | count(*)]    |
|     β”” Collect[doc.metrics | [sensor_type] | true] |
+---------------------------------------------------+

We see three operators, from top to bottom:

If you’re familiar with other SQL databases, this may look fairly familiar.

One thing to point out here is that both GroupHashAggregate and OrderBy are operators that we refer to as pipeline breaker. A pipeline breaker needs to consume its entire input before it can produce a single output row. To produce the correct result for count(*), the aggregation needs to process each incoming row. Assuming that the input is unsorted, there is no way to short-circuit the operation and yield a row early.

The same holds true for the OrderBy operator. Suppose we have the following three rows in the input:

'AA' | 10
'AB' | 18
'AC' | 21

Then the OrderBy operator needs to see all rows to determine that 21 is the highest number and should be the first output row.

Given these constraints, how can CrateDB run the Group By query distributed?

Physical execution plan ΒΆ

As elaborated in the first article, the data of a table can be distributed across several shards and each node in a cluster can hold one or more shards. The data is sharded based on the values of the CLUSTERED BY routing column. This column defaults to the primary key.

Suppose we have the following records in the table:

id sensor_type
ID1 AA
ID2 AB
ID3 AA
ID4 BB
ID5 BB
ID6 AA
ID7 BB
ID8 AA
ID9 AB
ID10 BB

The record with ID1 could be in a different shard than the record with ID3, but both have the same sensor_type value. Both records can be on two different nodes. To workaround this problem, CrateDB redistributes the data using a modulo operation. But we’re getting ahead of ourselves. Let’s back off for a moment and look at the execution plan.

The Collect operator will turn into a RoutedCollectPhase:

RoutedCollectPhase:
  toCollect: [Ref{sensor_type}]
  routing:
    node-0:
      metrics: [0, 1]
    node-1:
      metrics: [2, 3]

  distributionInfo:
    distributionType=MODULO
    distributeByColumn=0

  projections: [
    GroupProjection,
  ]

This is similar to the RoutedCollectPhase we saw in the second article. It specifies that the execution engine needs to collect the sensor_type attribute from the shard 0 and 1 of the metrics table on node-0 and from shard 2 and 3 of the metrics table on node-1:

There are also two new properties that are new: distributionInfo and projections. They’re strictly speaking part of every RoutedCollectPhase but I omitted them in the second article because they were not relevant at that point. But here they are. The distributionInfo indicates that the result must be distributed using a modulo operation on column zero and the projections list transformations that the engine must apply to the result.

The GroupProjection in more detail looks like this:

GroupProjection
  keys: [
    "INPUT(0)"
  ]
  values: [
    "count()"
  ]
  mode: "ITER_PARTIAL"
  outputs: [
    "INPUT(0)"
    "count()"
  ]
  requiredGranularity: "SHARD"

Let’s walk through the properties:

In addition to the RoutedCollectPhase, there are two MergePhases:

MergePhase
  name: "distributed merge"
  executionNodes: [node-0, node-1]
  numUpstreams: 2
  projections: [
    GroupProjection,
      mode: "PARTIAL_FINAL"
      requiredGranularity: "CLUSTER"
    OrderedTopNProjection
  ]

The first MergePhase has two upstreams, runs on both nodes and applies two projections. The MergePhase itself doesn’t declare who the two upstreams are, but based on the full execution plan it is possible to infer that it refers to node-0 and node-1 which also run the RoutedCollectPhase phase.

We also see another GroupProjection, I omitted some of the properties except two that changed. The mode is now PARTIAL_FINAL - indicating that the incoming rows are intermediate (partial) results, and that it must produce the final results in the output. requiredGranularity changed to CLUSTER, which in this case just means that there is no further parallelization possible.

The second projection is a OrderedTopNProjection. It will result in an ordering operation and contains information on which columns it has to sort the result.

Together, the RoutedCollectPhase and MergePhase will result in an execution that operates like this:

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0         β”‚            β”‚   node-1         β”‚
    β”‚CollectPhase      β”‚            β”‚CollectPhase      β”‚
    β”‚ β”” GroupProjectionβ”‚            β”‚ β”” GroupProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”‚
                │────────│────────────┐         β”‚
                β–Ό        β–Ό            β–Ό         β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0               β”‚      β”‚   node-1               β”‚
    β”‚MergePhase              β”‚      β”‚MergePhase              β”‚
    β”‚ β”œ GroupProjection      β”‚      β”‚ β”œ GroupProjection      β”‚
    β”‚ β”” OrderedTopNProjectionβ”‚      β”‚ β”” OrderedTopNProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The CollectPhase running on node-0 is sending data to both, the MergePhase running on node-0 and node-1. Each node receives a subset of the full data. The data is split using the modulo operation mentioned earlier.

Suppose we’ve the records mentioned earlier, this is how the data is split:

| `id` | `sensor_type` |           
| ---  | -----------   |           
| ID1  | AA            |  hash(AA) % 2 β†’ 0
| ID2  | AB            |  hash(AB) % 2 β†’ 1
| ID3  | AA            |  hash(AA) % 2 β†’ 0
                                     β–²   β–²
                                     β”‚   β”‚
                                     β”‚   β”” The downstream node to
                                     |     receive this record
                                     β”‚
                         This is the number of nodes used for the merge

We call this process bucketing. The modulo operation is applied to a hash of the group key value. This ensures that each downstream node will see all rows for a given key.

I mentioned that there are two MergePhase instances. Here is the second one:

MergePhase
  name: "mergeOnHandler"
  executionNodes: [node-0],
  numUpstreams: 2
  projections: []
  positionalOrderBy:
    indices=[1]
    reverseFlags=[true]
    nullsFirst=[true]

This MergePhase indicates that the result must be merged on node-0 - in this case the handler node. The phase doesn’t contain any further projections, but it contains a positionalOrderBy property. This property indicates that the results it receives is sorted, and that it should preserve the order by applying a merge sort when merging the results.

The final, full execution graph:

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0         β”‚            β”‚   node-1         β”‚
    β”‚CollectPhase      β”‚            β”‚CollectPhase      β”‚
    β”‚ β”” GroupProjectionβ”‚            β”‚ β”” GroupProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”‚
                │────────│────────────┐         β”‚
                β–Ό        β–Ό            β–Ό         β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0               β”‚      β”‚   node-1               β”‚
    β”‚MergePhase              β”‚      β”‚MergePhase              β”‚
    β”‚ β”œ GroupProjection      β”‚      β”‚ β”œ GroupProjection      β”‚
    β”‚ β”” OrderedTopNProjectionβ”‚      β”‚ β”” OrderedTopNProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚             β”‚
                          β–Ό             β”‚
                         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                         β”‚   node-0  β”‚β—„β”€β”˜
                         β”‚ MergePhaseβ”‚
                         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Wrap Up ΒΆ

I hope this article gave you an overview on how CrateDB executes GROUP BY queries. I again omitted lots of details and focused on the execution plan and distributed execution aspect. There are also cases where CrateDB will generate different plans. For example if the keys match the routing column it is possible to avoid the re-distribution step - unless the table is also partitioned.