Friday, May 28, 2021

Query Then Fetch execution in CrateDB

This is the third post in a series about CrateDB internals. This time I’ll introduce the Query Then Fetch execution strategy. Similar to the previous posts, this focuses on the distribution aspect.

Make sure to read the first and second post of the series.

A SELECT with LIMIT

In the previous post I used SELECT name FROM users as example, in this post we’ll have to come up with a slightly more complicated query to see a different execution plan. We need to extend the query with a LIMIT, and to make the example a bit more realistic we also add a WHERE clause and select an additional column. The query now looks like this:

SELECT name, details FROM users WHERE name LIKE 'A%' LIMIT 10`

Let’s use the EXPLAIN statement to see the logical query plan:

cr> EXPLAIN SELECT name, details FROM users WHERE name LIKE 'A%' LIMIT 50;
+----------------------------------------------------------+
| EXPLAIN                                                  |
+----------------------------------------------------------+
| Fetch[name, details]                                     |
|   └ Limit[50::bigint;0]                                  |
|     └ Collect[doc.users | [_fetchid] | (name LIKE 'A%')] |
+----------------------------------------------------------+

Starting from top to bottom, we see:

  • The Fetch operator. Fetch takes a relation as input and expects it to contain one or more _fetchid columns. It uses these _fetchid values to retrieve the value of other attributes, in this example it retrieves name and details.

  • The Limit operator. Limit takes a relation and limits it to at most 50 records.

  • The Collect operator. The first post of the series already touched on Collect. The Collect operator in this example indicates that an attribute called _fetchid should be retrieved from the doc.users table. The operator also includes the name LIKE 'A%' query expressions, indicating that only records matching this expressions should be included. _fetchid is a system column which can be used by the Fetch operator. CrateDB uses Lucene to store data, and Lucene can uniquely address individual documents within a reader with a docId. A reader is a construct that Lucene uses to access data within an index¹ . The _fetchid contains both, this docId and a readerId. This makes it possible for the Fetch operation to uniquely identify a single document during the lifetime of a Lucene reader.

Why is CrateDB creating a plan like that?

Loading the values of a record is relatively expensive, so CrateDB tries to avoid it unless it has to. The data in CrateDB is distributed across several shards and nodes, and CrateDB cannot accurately know up-front how many records each shard holds and how many of these records will match the name LIKE 'A%' expression. Because of that, CrateDB has to fetch at most 50 records from each node and then merge these together, stopping once the limit is hit.

CrateDB could use a logical plan like the following:

+---------------------------------------------------------------+
| EXPLAIN                                                       |
+---------------------------------------------------------------+
|   └ Limit[50::bigint;0]                                       |
|     └ Collect[doc.users | [name, details] | (name LIKE 'A%')] |
+---------------------------------------------------------------+

Which would translate to an execution strategy that looks roughly like this:

+-----------------+  +-----------------+  +-----------------+
|        N1       |  |       N2        |  |       N3        |
|     Collect     |  |     Collect     |  |     Collect     |
|  name, details  |  |  name, details  |  |  name, details  |
|   Limit 50      |  |   Limit 50      |  |   Limit 50      |
+-----------------+  +-----------------+  +-----------------+
         \                     |                  /
          \                    |                 /
           +-------------------+----------------+
                              |
                              |
                              |
                       +------------+
                       |    N1      |
                       |   Merge    |
                       |  Limit 50  |
                       +------------+

In this hypothetical execution plan, each node loads 50 records with name and details and sends them to the handler node N1. There the result is merged and reduced to at most 50 records. Using an execution plan like this would mean that it would have to load name and details for up to 50 records per node. The logical plan we saw with the first EXPLAIN statement avoids that:

+-----------------+  +-----------------+  +-----------------+
|        N1       |  |       N2        |  |       N3        |
|     Collect     |  |     Collect     |  |     Collect     |
|     _fetchid    |  |     _fetchid    |  |     _fetchid    |
|   Limit 50      |  |   Limit 50      |  |   Limit 50      |
+-----------------+  +-----------------+  +-----------------+
         \                     |                  /           
          \                    |                 /                 
           +-------------------+----------------+                  
                              |                                    
                              v                                    
                       +------------+                              
                       |   Merge    |                              
                       |  Limit 50  |                              
                       +------------+                              
                            |                                      
                            v                                      
           +-------------------------------+                       
           | Fetch                         |                       
           |  _fetchid → [name, details]   |
           +-------------------------------+

Here each node collects up to 50 records with _fetchid values, it then sends those to the handler node where the records are merged and reduced to 50 records. The Fetch operation then retrieves the actual name and details values:

                            +-------------------------------------+
                            |                                     |
                            v                                     |
+-----------------+  +-----------------+  +-----------------+     |
|        N1       |  |       N2        |  |       N3        |     |
|     Collect     |  |     Collect     |  |     Collect     |     |
|     _fetchid    |  |     _fetchid    |  |     _fetchid    |     |
|   Limit 50      |  |   Limit 50      |  |   Limit 50      |     |
+-----------------+  +-----------------+  +-----------------+     |
    ^                                                   ^         |
    |                                                   |         |
    |            [Merge from first step ommitted]       |         |
    |                                                   |         |
    |      +-------------------------------+            |         |
    |      | Fetch                         |            |         |
    +----->|  _fetchid → [name, details]   |<-----------+---------+
           +-------------------------------+

You can think of the _fetchid as a placeholder for the actual values. The Fetch operation uses the readerId that is encoded into the _fetchid to figure out which node it has to contact to get the actual value. The whole Fetch operation works in batches, so multiple records are fetched at a time. It works a bit like an asynchronous flat-map. The major advantage is of this approach is that now each of the three nodes only has to load the name and details values which are required. N1 might load 16 records, N2 another 17 and N3 the remaining 17, making up the total 50.

This execution strategy works with both, the direct response mode and the incremental paging mode outlined in the second post.

Wrap up

In this post you learned about the Query then fetch execution strategy in CrateDB, seeing how it operates on a high level.

The next post in the series will look at the execution of GROUP BY queries.


¹ Index here is referring to a Lucene index. In CrateDB terminology that would be a shard. One CrateDB shard maps to one Lucene index. The Lucene index in turn can contain segments and various structures - like the data stored, or a reverse index for efficient lookup of documents based on terms.