Saturday, May 15, 2021

How CrateDB plans and executes queries

This post is the first of a series that explains how CrateDB executes queries. This first post focuses on the optimizer, logical plan and execution plan.

Relational algebra

SQL is a declarative language. As a user you declare what data you want and leave the choice on how exactly to retrieve that data up to the system. Databases like CrateDB contain a component - usually called optimizer - that is supposed to come up with the most efficient way to retrieve the requested data.

SQL is based on relational algebra. A table is a relation, and various operators can transform an input relation into an output relation. Take the query SELECT name FROM users WHERE name like 'A%' as an example.

With relational algebra you could view the query as a series of operations which are applied to relations in order:

  • users is a table or relation, your initial source.
  • WHERE takes as input this source relation, applies a filter - in relational algebra called Selection - and outputs a new reduced relation.
  • SELECT name is another operation. It takes as input a relation, reduces the set of attributes to name and outputs a new relation. In relational algebra this operation is called Projection.

Most state of the art optimizers create something similar to a tree of operators and then apply optimization rules to the tree or to a subset of the tree with the goal to re-write the tree into a variant that produces the same result but is cheaper to execute. CrateDB does that as well. You can get some insights into operators that exist using the EXPLAIN statement. I’m going to walk through some examples in a moment.

As side note: Knowing that SQL statements are internally transformed into a series of operators can explain some of the semantics of a SQL database. The operators have to be applied in order, and an operator can only process the data of its input. WHERE is for example one of the innermost operators, and SELECT is applied almost at the end. This explains why in many database engines it is not possible to use aliases within the WHERE clause. It also explains why HAVING can refer to the result of aggregations, and why WHERE can’t.

A logical plan

Let’s have a look at some query plans using the EXPLAIN statement. In recent versions of CrateDB this statement by default outputs the logical plan of a query.

cr> EXPLAIN SELECT name FROM users;
| EXPLAIN                            |
| Collect[doc.users | [name] | true] |

You can see a single operator called Collect. doc.users is the name of the table. name is the attribute being collected, and true here is a query expression.

Even if we add a filter, the explain output still shows a single operator:

cr> EXPLAIN SELECT name FROM users WHERE name = 'name';
| EXPLAIN                                       |
| Collect[doc.users | [name] | (name = 'name')] |

This is because in CrateDB, the implementation of the Collect operator is quite powerful and there is currently no distinction between table scans or index lookup at the logical planner level. There are also some optimization rules doing their work. You’ll learn more about them in a moment.

Suppose you wrap the SELECT into another SELECT:

      SELECT name FROM (SELECT name FROM users WHERE name = 'name') n 
      WHERE = 'foo';
| EXPLAIN                                                                |
| Rename[name] AS n                                                      |
|   └ Collect[doc.users | [name] | ((name = 'foo') AND (name = 'name'))] |

You see that the virtual table results in a Rename operator. What’s interesting is that the inner WHERE clause and the outer WHERE clause got merged into (name = 'foo') AND (name = 'name') and the expression is part of the Collect operation. This happened due to optimization rules that try to move filters as far down the tree as possible.

Theoretically there could be another rule that figured out that it is not possible for both name = 'foo' and name = 'name' to be true and could optimize this to false, but such a rule is not present at the time of this writing.

If you’re using a client using the PostgreSQL protocol - which has stateful sessions - it is possible to turn off optimization rules using SET SESSION statements:

crate=> set optimizer_merge_filter_and_collect = false;
Time: 1.584 ms

crate=> set optimizer_move_filter_beneath_rename = false;
Time: 1.869 ms

With these two optimizations disabled, the query plan looks quite different:

crate=> EXPLAIN 
        SELECT name FROM 
          (SELECT name FROM doc.users WHERE name = 'name') n 
        WHERE = 'foo';
 Filter[(name = 'foo')]                    +
   └ Rename[name] AS n                     +
     └ Filter[(name = 'name')]             +
       └ Collect[doc.users | [name] | true]

We’ve two additional Filter operators, and running this would be incredibly slow. With only 1 million rows it takes almost four seconds on my machine:

crate=> SELECT name FROM (SELECT name FROM doc.users WHERE name = 'name') n 
        WHERE = 'foo';
(0 rows)

Time: 3827.194 ms (00:03.827)

Why? Because Collect would load all records without using an index, and the first Filter would apply the name = 'name' expression against each row.

Once we re-activate the optimization rules, things look better:

crate=> SELECT name FROM (SELECT name FROM doc.users WHERE name = 'name') n 
        WHERE = 'foo';
(0 rows)

Time: 25.516 ms

If the expression is part of the Collect operator, CrateDB will use the Lucene index structures if feasible. In this case it will look for the terms foo and name in a reverse index. This is a lot cheaper than to load each row and then compute name = 'foo' with the concrete name value for each row. The reverse index maps the terms to a set of document ids. In this case there are no entries that match, so nothing is returned.

In CrateDB the operations listed in EXPLAIN are the logical operators. Before CrateDB can execute a query it needs to transform the logical plan into a physical execution plan.

Physical execution plan

You may have noticed that the logical plan doesn’t contain any information about the distribution of the data. In CrateDB the data is sharded. That means a table can be split into many parts - so called shards. A shard is like a subset of the table. Shards can be independently replicated and moved from one node to another. The number of shards a table can have is fixed at the time the table is created. There are more advanced features that give some flexibility - partitioned tables, splitting or shrinking operations - but let’s focus on the simpler case for now.

Suppose you’ve a user table:

    id text DEFAULT gen_random_text_uuid() PRIMARY KEY,
    name text
) clustered INTO 2 shards;

The table is clustered into two shards. That means that the primary copies of the shards can be distributed at most across two nodes. If you insert records, the records are assigned to a shard by using a modulo operation on the hash of the clustered by column. The clustered by column defaults to the primary key.

Suppose we insert three records:

INSERT INTO users (name) VALUES ('Arthur'), ('Trillian'), ('Marvin');

We can learn more about the distribution of the data by querying the sys.shards table:

cr> SELECT id, num_docs FROM sys.shards WHERE table_name = 'users';
| id | num_docs |
|  0 |        1 |
|  1 |        2 |

A physical execution plan takes into consideration where the data is located and how it is supposed to arrive at the node with which the client communicates. We refer to this node as handler node because it handles the communication with the client, and because it initiates the execution of the query. The nodes from which data is gathered are called collect nodes, and sometimes there are also merge nodes involved, which merge the results from collect nodes.

Depending on the exact query, the execution plans can look quite different, but most of the execution plans share the same building blocks.

An execution plan consists of one or more execution phases. Each logical operator contains logic to either create an execution phase, or to add additional transformations to an execution phase created by a child operator.

For example, the Collect operator can create a CollectPhase - the CollectPhase is a source, which means that it is supposed to output rows by reading them from disk or - in the case of system tables - from memory. The Collect operator creates this CollectPhase by combining the information it holds itself with information from the cluster state. The cluster state is a snapshot that represents the current state of the cluster. This includes information like which tables exist, what columns they have and where they’re currently located (or at least supposed to be, the state can be outdated by the time the query is executed). This cluster state is updated whenever the schema changes, a node joins or leaves, or when a shard is relocated - for example because of disk pressure on one node. More about how that works in detail would blow up the scope of this post, so we’ll leave it at that.

What’s important for the purpose of this post is that the cluster state contains the information where the shards of a table are (supposedly) located. This information is part of a structure called routing table.

The routing table is used to compute the routing for the CollectPhase. The routing includes the shard ids of the tables or partitions which should be queried and on which nodes they reside. Whether primary copies or replica copies of the shards are used is randomized for regular SELECT queries - taking into account additional constraints like allocation awareness attributes.

In our users table example, the routing could look like this:

  "node-id-1": {
    "users": [0]
  "node-id-2": {
    "users": [1]

Or if there is only a single node in the cluster, it would look as follows:

  "node-id-1": {
    "users": [0, 1]

The CollectPhase will also contain a list of columns or expressions to retrieve, an optional filter expression and an optional ordering expression.

Other operators can then add other execution plan elements on top.

Once the planner/optimizer finishes creating the physical execution plan, it executes it.

The execution

The execution layer looks at the routing information of the execution plan to figure out which nodes are involved in the query, and then sends each node the phases of the execution plan that they have to execute as part of a JobRequest.

Each node contains a handler that is responsible for accepting and processing these JobRequests. To process them, they look at the phases of the plan and initiate their corresponding operations.

For a CollectPhase this could include creating a Lucene query out of the filter expression, acquiring a Lucene reader for each shard and iterating over the matching documents while applying all the transformations that are part of the phase before sending the result back to the handler node.

Depending on the exact plan there could be more phases involved, and the results might be pushed to merge nodes before the final result is sent to the handler node.

Wrap up

This was a first introduction elaborating how CrateDB creates logical plans, optimizes them and how it finally creates an execution plan which can be executed. I made some simplifications to keep the scope of the article small and focused. Later posts might highlight various aspects that I skipped over or neglected - like failure handling, or walk through some specific execution strategies in more detail.