How CrateDB plans and executes queries
Table of content
This post is the first of a series that explains how CrateDB executes queries. This first post focuses on the optimizer, logical plan and execution plan.
Relational algebra ¶
SQL is a declarative language. As a user you declare what data you want and leave the choice on how exactly to retrieve that data up to the system. Databases like CrateDB contain a component - usually called optimizer - that is supposed to come up with the most efficient way to retrieve the requested data.
SQL is based on relational algebra. A table is a relation, and
various operators can transform an input relation into an output
relation. Take the query
SELECT name FROM users WHERE name like 'A%' as an
example.
With relational algebra you could view the query as a series of operations which are applied to relations in order:
- usersis a table or relation, your initial source.
- WHEREtakes as input this source relation, applies a filter - in relational algebra called Selection - and outputs a new reduced relation.
- SELECT nameis another operation. It takes as input a relation, reduces the set of attributes to- nameand outputs a new relation. In relational algebra this operation is called Projection.
Most state of the art optimizers create something similar to a tree
of operators and then apply optimization rules to the tree or to a
subset of the tree with the goal to re-write the tree into a variant
that produces the same result but is cheaper to execute. CrateDB does
that as well. You can get some insights into operators that exist using
the EXPLAIN statement. I’m going to walk through some
examples in a moment.
As side note: Knowing that SQL statements are internally transformed
into a series of operators can explain some of the semantics of a SQL
database. The operators have to be applied in order, and an operator can
only process the data of its input. WHERE is for example
one of the innermost operators, and SELECT is applied
almost at the end. This explains why in many database engines it is not
possible to use aliases within the WHERE clause. It also
explains why HAVING can refer to the result of
aggregations, and why WHERE can’t.
A logical plan ¶
Let’s have a look at some query plans using the EXPLAIN
statement. In recent versions of CrateDB this statement by default
outputs the logical plan of a query.
cr> EXPLAIN SELECT name FROM users;
+------------------------------------+
| EXPLAIN                            |
+------------------------------------+
| Collect[doc.users | [name] | true] |
+------------------------------------+You can see a single operator called Collect.
doc.users is the name of the table. name is
the attribute being collected, and true here is a query
expression.
Even if we add a filter, the explain output still shows a single operator:
cr> EXPLAIN SELECT name FROM users WHERE name = 'name';
+-----------------------------------------------+
| EXPLAIN                                       |
+-----------------------------------------------+
| Collect[doc.users | [name] | (name = 'name')] |
+-----------------------------------------------+This is because in CrateDB, the implementation of the
Collect operator is quite powerful and there is currently
no distinction between table scans or index lookup at the logical
planner level. There are also some optimization rules doing their work.
You’ll learn more about them in a moment.
Suppose you wrap the SELECT into another
SELECT:
cr> EXPLAIN 
      SELECT name FROM (SELECT name FROM users WHERE name = 'name') n 
      WHERE n.name = 'foo';
+------------------------------------------------------------------------+
| EXPLAIN                                                                |
+------------------------------------------------------------------------+
| Rename[name] AS n                                                      |
|   └ Collect[doc.users | [name] | ((name = 'foo') AND (name = 'name'))] |
+------------------------------------------------------------------------+You see that the virtual table results in a Rename
operator. What’s interesting is that the inner WHERE clause
and the outer WHERE clause got merged into
(name = 'foo') AND (name = 'name') and the expression is
part of the Collect operation. This happened due to
optimization rules that try to move filters as far down the tree as
possible.
Theoretically there could be another rule that figured out that it is
not possible for both name = 'foo' and
name = 'name' to be true and could optimize this to
false, but such a rule is not present at the time of this
writing.
If you’re using a client using the PostgreSQL protocol - which has
stateful sessions - it is possible to turn off optimization rules using
SET SESSION statements:
crate=> set optimizer_merge_filter_and_collect = false;
Time: 1.584 ms
crate=> set optimizer_move_filter_beneath_rename = false;
Time: 1.869 msWith these two optimizations disabled, the query plan looks quite different:
crate=> EXPLAIN 
        SELECT name FROM 
          (SELECT name FROM doc.users WHERE name = 'name') n 
        WHERE n.name = 'foo';
                  EXPLAIN
--------------------------------------------
 Filter[(name = 'foo')]                    +
   └ Rename[name] AS n                     +
     └ Filter[(name = 'name')]             +
       └ Collect[doc.users | [name] | true]We’ve two additional Filter operators, and running this
would be incredibly slow. With only 1 million rows it takes almost four
seconds on my machine:
crate=> SELECT name FROM (SELECT name FROM doc.users WHERE name = 'name') n 
        WHERE n.name = 'foo';
 name
------
(0 rows)
Time: 3827.194 ms (00:03.827)Why? Because Collect would load all records without
using an index, and the first Filter would apply the
name = 'name' expression against each row.
Once we re-activate the optimization rules, things look better:
crate=> SELECT name FROM (SELECT name FROM doc.users WHERE name = 'name') n 
        WHERE n.name = 'foo';
 name
------
(0 rows)
Time: 25.516 msIf the expression is part of the Collect operator,
CrateDB will use the Lucene index structures if feasible. In this case
it will look for the terms foo and name in a
reverse index. This is a lot cheaper than to load each row and then
compute name = 'foo' with the concrete name
value for each row. The reverse index maps the terms to a set of
document ids. In this case there are no entries that match, so nothing
is returned.
In CrateDB the operations listed in EXPLAIN are the
logical operators. Before CrateDB can execute a query it needs to
transform the logical plan into a physical execution plan.
Physical execution plan ¶
You may have noticed that the logical plan doesn’t contain any information about the distribution of the data. In CrateDB the data is sharded. That means a table can be split into many parts - so called shards. A shard is like a subset of the table. Shards can be independently replicated and moved from one node to another. The number of shards a table can have is fixed at the time the table is created. There are more advanced features that give some flexibility - partitioned tables, splitting or shrinking operations - but let’s focus on the simpler case for now.
Suppose you’ve a user table:
CREATE TABLE users (
    id text DEFAULT gen_random_text_uuid() PRIMARY KEY,
    name text
) clustered INTO 2 shards;The table is clustered into two shards. That means that the primary copies of the shards can be distributed at most across two nodes. If you insert records, the records are assigned to a shard by using a modulo operation on the hash of the clustered by column. The clustered by column defaults to the primary key.
Suppose we insert three records:
INSERT INTO users (name) VALUES ('Arthur'), ('Trillian'), ('Marvin');We can learn more about the distribution of the data by querying the
sys.shards table:
cr> SELECT id, num_docs FROM sys.shards WHERE table_name = 'users';
+----+----------+
| id | num_docs |
+----+----------+
|  0 |        1 |
|  1 |        2 |
+----+----------+A physical execution plan takes into consideration where the data is located and how it is supposed to arrive at the node with which the client communicates. We refer to this node as handler node because it handles the communication with the client, and because it initiates the execution of the query. The nodes from which data is gathered are called collect nodes, and sometimes there are also merge nodes involved, which merge the results from collect nodes.
Depending on the exact query, the execution plans can look quite different, but most of the execution plans share the same building blocks.
An execution plan consists of one or more execution phases. Each logical operator contains logic to either create an execution phase, or to add additional transformations to an execution phase created by a child operator.
For example, the Collect operator can create a
CollectPhase - the CollectPhase is a
source, which means that it is supposed to output rows by
reading them from disk or - in the case of system tables - from memory.
The Collect operator creates this CollectPhase
by combining the information it holds itself with information from the
cluster state. The cluster state is a snapshot that represents the
current state of the cluster. This includes information like which
tables exist, what columns they have and where they’re currently located
(or at least supposed to be, the state can be outdated by the time the
query is executed). This cluster state is updated whenever the schema
changes, a node joins or leaves, or when a shard is relocated - for
example because of disk pressure on one node. More about how that works
in detail would blow up the scope of this post, so we’ll leave it at
that.
What’s important for the purpose of this post is that the cluster state contains the information where the shards of a table are (supposedly) located. This information is part of a structure called routing table.
The routing table is used to compute the routing for the
CollectPhase. The routing includes the shard ids of the
tables or partitions which should be queried and on which nodes they
reside. Whether primary copies or replica copies of the shards are used
is randomized for regular SELECT queries - taking into account
additional constraints like allocation awareness attributes.
In our users table example, the routing could
look like this:
{
  "node-id-1": {
    "users": [0]
  },
  "node-id-2": {
    "users": [1]
  },
}Or if there is only a single node in the cluster, it would look as follows:
{
  "node-id-1": {
    "users": [0, 1]
  },
}The CollectPhase will also contain a list of columns or
expressions to retrieve, an optional filter expression and an optional
ordering expression.
Other operators can then add other execution plan elements on top.
Once the planner/optimizer finishes creating the physical execution plan, it executes it.
The execution ¶
The execution layer looks at the routing information of the execution
plan to figure out which nodes are involved in the query, and then sends
each node the phases of the execution plan that they have to execute as
part of a JobRequest.
Each node contains a handler that is responsible for accepting and
processing these JobRequests. To process them, they look at
the phases of the plan and initiate their corresponding operations.
For a CollectPhase this could include creating a Lucene
query out of the filter expression, acquiring a Lucene reader for each
shard and iterating over the matching documents while applying all the
transformations that are part of the phase before sending the result
back to the handler node.
Depending on the exact plan there could be more phases involved, and the results might be pushed to merge nodes before the final result is sent to the handler node.
Wrap up ¶
This was a first introduction elaborating how CrateDB creates logical plans, optimizes them and how it finally creates an execution plan which can be executed. I made some simplifications to keep the scope of the article small and focused. Later posts might highlight various aspects that I skipped over or neglected - like failure handling, or walk through some specific execution strategies in more detail.