Friday, June 4, 2021

Group By query execution in CrateDB

In this post we’ll look at how CrateDB distributes GROUP BY queries. This is the fourth post in a series about CrateDB internals.

I assume that you’ve read the first and second post of the series. You may also be interested in the third post, but it is not necessary to read it to follow this article.

The Query ΒΆ

Suppose we’ve a table that stores measurements sent from sensors with a schema like this:

CREATE TABLE metrics (
    id text PRIMARY KEY,
    value double NOT NULL,
    sensor_type text NOT NULL,
    ts timestamp with time zone DEFAULT now() NOT NULL
) CLUSTERED INTO 4 SHARDS WITH (number_of_replicas = 0);

And we want to know how many metrics we stored per sensor type. We can use a GROUP BY query to get the results:

SELECT
    sensor_type,
    count(*)
FROM
    metrics
GROUP BY
    sensor_type
ORDER BY
    count(*) DESC

How does CrateDB execute this query? Let us first have a look at the logical plan:

EXPLAIN
SELECT
    sensor_type,
    count(*)
FROM
    metrics
GROUP BY
    sensor_type
ORDER BY
    count(*) DESC;
+---------------------------------------------------+
| EXPLAIN                                           |
+---------------------------------------------------+
| OrderBy[count(*) DESC]                            |
|   β”” GroupHashAggregate[sensor_type | count(*)]    |
|     β”” Collect[doc.metrics | [sensor_type] | true] |
+---------------------------------------------------+

We see three operators, from top to bottom:

  • OrderBy: This operator takes a relation as input and outputs the relation sorted by some attributes. Here there is a single attribute, count(*), and the sorting order is descending.
  • GroupHashAggregate: This operator takes a relation as input and groups the incoming row by one or more keys and optionally computes aggregations on the grouped rows. Here there is a single group key sensor_type and a single aggregation function, count(*).
  • Collect: This operator retrieves the values from a table. The first article explained this operator in more detail.

If you’re familiar with other SQL databases, this may look fairly familiar.

One thing to point out here is that both GroupHashAggregate and OrderBy are operators that we refer to as pipeline breaker. A pipeline breaker needs to consume its entire input before it can produce a single output row. To produce the correct result for count(*), the aggregation needs to process each incoming row. Assuming that the input is unsorted, there is no way to short-circuit the operation and yield a row early.

The same holds true for the OrderBy operator. Suppose we have the following three rows in the input:

'AA' | 10
'AB' | 18
'AC' | 21

Then the OrderBy operator needs to see all rows to determine that 21 is the highest number and should be the first output row.

Given these constraints, how can CrateDB run the Group By query distributed?

Physical execution plan ΒΆ

As elaborated in the first article, the data of a table can be distributed across several shards and each node in a cluster can hold one or more shards. The data is sharded based on the values of the CLUSTERED BY routing column. This column defaults to the primary key.

Suppose we have the following records in the table:

id sensor_type
ID1 AA
ID2 AB
ID3 AA
ID4 BB
ID5 BB
ID6 AA
ID7 BB
ID8 AA
ID9 AB
ID10 BB

The record with ID1 could be in a different shard than the record with ID3, but both have the same sensor_type value. Both records can be on two different nodes. To workaround this problem, CrateDB redistributes the data using a modulo operation. But we’re getting ahead of ourselves. Let’s back off for a moment and look at the execution plan.

The Collect operator will turn into a RoutedCollectPhase:

RoutedCollectPhase:
  toCollect: [Ref{sensor_type}]
  routing:
    node-0:
      metrics: [0, 1]
    node-1:
      metrics: [2, 3]

  distributionInfo:
    distributionType=MODULO
    distributeByColumn=0

  projections: [
    GroupProjection,
  ]

This is similar to the RoutedCollectPhase we saw in the second article. It specifies that the execution engine needs to collect the sensor_type attribute from the shard 0 and 1 of the metrics table on node-0 and from shard 2 and 3 of the metrics table on node-1:

There are also two new properties that are new: distributionInfo and projections. They’re strictly speaking part of every RoutedCollectPhase but I omitted them in the second article because they were not relevant at that point. But here they are. The distributionInfo indicates that the result must be distributed using a modulo operation on column zero and the projections list transformations that the engine must apply to the result.

The GroupProjection in more detail looks like this:

GroupProjection
  keys: [
    "INPUT(0)"
  ]
  values: [
    "count()"
  ]
  mode: "ITER_PARTIAL"
  outputs: [
    "INPUT(0)"
    "count()"
  ]
  requiredGranularity: "SHARD"

Let’s walk through the properties:

  • keys lists the grouping keys. Here we have a INPUT(0) instead of sensor_type because at this level the GroupProjection isn’t aware anymore of what concrete attributes it is operating on. The INPUT(0) construct specifies that the keys value will be in the first column of the incoming rows.
  • values lists the aggregations to apply.
  • mode influences how the aggregations are evaluated. In CrateDB it is possible to fully evaluate an aggregation or to partially evaluate it. Partial evaluation creates an intermediate result that you can merge together with another intermediate result. This allows us to merge the results from more than one shard or more than one node. An example aggregation function where this difference between full and partial evaluation is important is the average aggregation avg. The average is calculated summing up all values and keeping track of the number of values seen. The intermediate state contains both: The running sum and the running count. To finalize the aggregation and produce the final result, the sum is divided by the count. It is possible to merge two averages only if you have both, the sum and the count, you cannot merge two final computed averages. ITER_PARTIAL indices that the projection should output partially evaluated aggregations.
  • outputs: These are the outputs of the projections, in the GroupProjection case this is keys + values.
  • requiredGranularity: SHARD here indicates that the projection can be run in parallel per shard.

In addition to the RoutedCollectPhase, there are two MergePhases:

MergePhase
  name: "distributed merge"
  executionNodes: [node-0, node-1]
  numUpstreams: 2
  projections: [
    GroupProjection,
      mode: "PARTIAL_FINAL"
      requiredGranularity: "CLUSTER"
    OrderedTopNProjection
  ]

The first MergePhase has two upstreams, runs on both nodes and applies two projections. The MergePhase itself doesn’t declare who the two upstreams are, but based on the full execution plan it is possible to infer that it refers to node-0 and node-1 which also run the RoutedCollectPhase phase.

We also see another GroupProjection, I omitted some of the properties except two that changed. The mode is now PARTIAL_FINAL - indicating that the incoming rows are intermediate (partial) results, and that it must produce the final results in the output. requiredGranularity changed to CLUSTER, which in this case just means that there is no further parallelization possible.

The second projection is a OrderedTopNProjection. It will result in an ordering operation and contains information on which columns it has to sort the result.

Together, the RoutedCollectPhase and MergePhase will result in an execution that operates like this:

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0         β”‚            β”‚   node-1         β”‚
    β”‚CollectPhase      β”‚            β”‚CollectPhase      β”‚
    β”‚ β”” GroupProjectionβ”‚            β”‚ β”” GroupProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”‚
                │────────│────────────┐         β”‚
                β–Ό        β–Ό            β–Ό         β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0               β”‚      β”‚   node-1               β”‚
    β”‚MergePhase              β”‚      β”‚MergePhase              β”‚
    β”‚ β”œ GroupProjection      β”‚      β”‚ β”œ GroupProjection      β”‚
    β”‚ β”” OrderedTopNProjectionβ”‚      β”‚ β”” OrderedTopNProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The CollectPhase running on node-0 is sending data to both, the MergePhase running on node-0 and node-1. Each node receives a subset of the full data. The data is split using the modulo operation mentioned earlier.

Suppose we’ve the records mentioned earlier, this is how the data is split:

| `id` | `sensor_type` |           
| ---  | -----------   |           
| ID1  | AA            |  hash(AA) % 2 β†’ 0
| ID2  | AB            |  hash(AB) % 2 β†’ 1
| ID3  | AA            |  hash(AA) % 2 β†’ 0
                                     β–²   β–²
                                     β”‚   β”‚
                                     β”‚   β”” The downstream node to
                                     |     receive this record
                                     β”‚
                         This is the number of nodes used for the merge

We call this process bucketing. The modulo operation is applied to a hash of the group key value. This ensures that each downstream node will see all rows for a given key.

I mentioned that there are two MergePhase instances. Here is the second one:

MergePhase
  name: "mergeOnHandler"
  executionNodes: [node-0],
  numUpstreams: 2
  projections: []
  positionalOrderBy:
    indices=[1]
    reverseFlags=[true]
    nullsFirst=[true]

This MergePhase indicates that the result must be merged on node-0 - in this case the handler node. The phase doesn’t contain any further projections, but it contains a positionalOrderBy property. This property indicates that the results it receives is sorted, and that it should preserve the order by applying a merge sort when merging the results.

The final, full execution graph:

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0         β”‚            β”‚   node-1         β”‚
    β”‚CollectPhase      β”‚            β”‚CollectPhase      β”‚
    β”‚ β”” GroupProjectionβ”‚            β”‚ β”” GroupProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”‚
                │────────│────────────┐         β”‚
                β–Ό        β–Ό            β–Ό         β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   node-0               β”‚      β”‚   node-1               β”‚
    β”‚MergePhase              β”‚      β”‚MergePhase              β”‚
    β”‚ β”œ GroupProjection      β”‚      β”‚ β”œ GroupProjection      β”‚
    β”‚ β”” OrderedTopNProjectionβ”‚      β”‚ β”” OrderedTopNProjectionβ”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚             β”‚
                          β–Ό             β”‚
                         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                         β”‚   node-0  β”‚β—„β”€β”˜
                         β”‚ MergePhaseβ”‚
                         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Wrap Up ΒΆ

I hope this article gave you an overview on how CrateDB executes GROUP BY queries. I again omitted lots of details and focused on the execution plan and distributed execution aspect. There are also cases where CrateDB will generate different plans. For example if the keys match the routing column it is possible to avoid the re-distribution step - unless the table is also partitioned.