Query Then Fetch execution in CrateDB
This is the third post in a series about CrateDB internals. This time I’ll introduce the Query Then Fetch execution strategy. Similar to the previous posts, this focuses on the distribution aspect.
Make sure to read the first and second post of the series.
A SELECT with LIMIT ¶
In the previous post I used SELECT name FROM users
as example, in this post we’ll have to come up with a slightly more complicated query to see a different execution plan. We need to extend the query with a LIMIT, and to make the example a bit more realistic we also add a WHERE clause and select an additional column. The query now looks like this:
SELECT name, details FROM users WHERE name LIKE 'A%' LIMIT 10`
Let’s use the EXPLAIN statement to see the logical query plan:
cr> EXPLAIN SELECT name, details FROM users WHERE name LIKE 'A%' LIMIT 50;
+----------------------------------------------------------+
| EXPLAIN |
+----------------------------------------------------------+
| Fetch[name, details] |
| └ Limit[50::bigint;0] |
| └ Collect[doc.users | [_fetchid] | (name LIKE 'A%')] |
+----------------------------------------------------------+
Starting from top to bottom, we see:
-
The
Fetch
operator.Fetch
takes a relation as input and expects it to contain one or more_fetchid
columns. It uses these_fetchid
values to retrieve the value of other attributes, in this example it retrievesname
anddetails
. -
The
Limit
operator.Limit
takes a relation and limits it to at most 50 records. -
The
Collect
operator. The first post of the series already touched onCollect
. TheCollect
operator in this example indicates that an attribute called_fetchid
should be retrieved from thedoc.users
table. The operator also includes thename LIKE 'A%'
query expressions, indicating that only records matching this expressions should be included._fetchid
is a system column which can be used by theFetch
operator. CrateDB uses Lucene to store data, and Lucene can uniquely address individual documents within a reader with adocId
. A reader is a construct that Lucene uses to access data within an index¹ . The_fetchid
contains both, thisdocId
and areaderId
. This makes it possible for theFetch
operation to uniquely identify a single document during the lifetime of a Lucene reader.
Why is CrateDB creating a plan like that?
Loading the values of a record is relatively expensive, so CrateDB tries to avoid it unless it has to.
The data in CrateDB is distributed across several shards and nodes, and CrateDB cannot accurately know up-front how many records each shard holds and how many of these records will match the name LIKE 'A%'
expression. Because of that, CrateDB has to fetch at most 50 records from each node and then merge these together, stopping once the limit is hit.
CrateDB could use a logical plan like the following:
+---------------------------------------------------------------+
| EXPLAIN |
+---------------------------------------------------------------+
| └ Limit[50::bigint;0] |
| └ Collect[doc.users | [name, details] | (name LIKE 'A%')] |
+---------------------------------------------------------------+
Which would translate to an execution strategy that looks roughly like this:
+-----------------+ +-----------------+ +-----------------+
| N1 | | N2 | | N3 |
| Collect | | Collect | | Collect |
| name, details | | name, details | | name, details |
| Limit 50 | | Limit 50 | | Limit 50 |
+-----------------+ +-----------------+ +-----------------+
\ | /
\ | /
+-------------------+----------------+
|
|
|
+------------+
| N1 |
| Merge |
| Limit 50 |
+------------+
In this hypothetical execution plan, each node loads 50 records with name
and details
and sends them to the handler node N1. There the result is merged and reduced to at most 50 records.
Using an execution plan like this would mean that it would have to load name
and details
for up to 50 records per node. The logical plan we saw with the first EXPLAIN
statement avoids that:
+-----------------+ +-----------------+ +-----------------+
| N1 | | N2 | | N3 |
| Collect | | Collect | | Collect |
| _fetchid | | _fetchid | | _fetchid |
| Limit 50 | | Limit 50 | | Limit 50 |
+-----------------+ +-----------------+ +-----------------+
\ | /
\ | /
+-------------------+----------------+
|
v
+------------+
| Merge |
| Limit 50 |
+------------+
|
v
+-------------------------------+
| Fetch |
| _fetchid → [name, details] |
+-------------------------------+
Here each node collects up to 50 records with _fetchid
values, it then sends those to the handler node where the records are merged and reduced to 50 records. The Fetch
operation then retrieves the actual name
and details
values:
+-------------------------------------+
| |
v |
+-----------------+ +-----------------+ +-----------------+ |
| N1 | | N2 | | N3 | |
| Collect | | Collect | | Collect | |
| _fetchid | | _fetchid | | _fetchid | |
| Limit 50 | | Limit 50 | | Limit 50 | |
+-----------------+ +-----------------+ +-----------------+ |
^ ^ |
| | |
| [Merge from first step ommitted] | |
| | |
| +-------------------------------+ | |
| | Fetch | | |
+----->| _fetchid → [name, details] |<-----------+---------+
+-------------------------------+
You can think of the _fetchid
as a placeholder for the actual values. The Fetch operation uses the readerId
that is encoded into the _fetchid
to figure out which node it has to contact to get the actual value. The whole Fetch operation works in batches, so multiple records are fetched at a time. It works a bit like an asynchronous flat-map. The major advantage is of this approach is that now each of the three nodes only has to load the name
and details
values which are required. N1 might load 16 records, N2 another 17 and N3 the remaining 17, making up the total 50.
This execution strategy works with both, the direct response mode and the incremental paging mode outlined in the second post.
Wrap up ¶
In this post you learned about the Query then fetch execution strategy in CrateDB, seeing how it operates on a high level.
The next post in the series will look at the execution of GROUP BY queries.
¹ Index here is referring to a Lucene index. In CrateDB terminology that would be a shard. One CrateDB shard maps to one Lucene index. The Lucene index in turn can contain segments and various structures - like the data stored, or a reverse index for efficient lookup of documents based on terms.