Thursday, May 20, 2021

Distributed SELECT statement execution in CrateDB

This is the second post of a series about CrateDB internals. In the first post I introduced relational algebra, the optimizer and outlined how the execution works on a very high level. In this post I’ll explore the execution of a simple SELECT statement in more detail. This post focuses on the distribution aspect of the query execution.

A simple select

Let’s start with the simplest query possible: a SELECT name FROM users. The query is supposed to return the name values for all records in the users table. In the first post I elaborated how the logical plan is created and later transformed into an execution plan. Let us resume from there. The execution plan for SELECT name FROM users would consist of a CollectPhase and a MergePhase. These phases would look roughly like this:

RoutedCollectPhase:
  toCollect: [Ref{name}]
  routing:
    node-0:
      users: [0, 1]
    node-1:
      users: [2, 3]


MergePhase:
  executionNodes: [node-0]
  numUpstreams: 2

The RoutedCollectPhase is one concrete implementation of a CollectPhase. The toCollect property tells us which attributes should be collected and the routing property tells us from where. In this case the executor has to retrieve the data from node-0 and node-1 as both these nodes contain two shard copies of the table.

The MergePhase is used to indicate that a node needs to merge data from several upstream nodes. Usually this merge phase is assigned to the handler node. In this case it is node-0 and it is expecting results from two other nodes. In this scenario, node-0 is both the handler node and a collect node. It is expecting results from itself as well as from one other node.

There are two modes of operation for this particular query.

Direct response

CrateDB can opt to use a direct response mode. In this mode, the handler node will send job requests to the collect nodes, and it expects a response for each request. This response must contain the result for the query. node-0 has to collect the name attributes for users from shard 0 and 1 and send these records as part of a response back to the handler node. node-1 has to do the same, but with shard 2 and 3.

The handler node will have setup some kind of merge operation based on the MergePhase that the optimizer assigned to it. In this case there are no explicit merge instructions, so the handler phase can - more or less directly - glossing over some details - forward the results to the client who invoked the query. We’ll look at some of the available merge operations in a later post.

   +--------+  1  +--------+   2   +--------+
   | client | --> | node-0 | ----> | node-1 |
   |        | <-- |        | <---- |        |
   +--------+  4  +--------+   3   +--------+
                   ^  ^        
                   |  |
                   +--+
                    2/3
  1. Client sends SELECT name FROM users to server
  2. node-0 sends a job request with the RoutedCollectPhase to itself and node-1
  3. node-0 and node-1 send the result in the response
  4. node-0 forwards the results to the client

Internal paging

The direct response mode has a limitation: Each collect node must send the full result in a single response to the handler node. That means the entire intermediate data must be held in-memory. Because of this limitation, CrateDB contains a second mode which allows it to page internally. Whether internal paging or direct response is used is decided based on some heuristics.

In this mode the handler node also sends job requests to each participating collect node, but instead of expecting the results as part of the job-request-response, it only expects an empty acknowledgment as response. Each collect node can then push a subset of the actual result (a page) to the handler node. Each collect node expects a response to this result push request. This response can indicate if more data is needed. This allows the handler node or merge node to request more data - another page - or to short-circuit the execution. In the SELECT name FROM users example it could forward the records received so far to the connected client and then respond to the collect node, telling it to send more data.

If the client uses HTTP, the result has to be buffered on the handler node in-memory until it is complete. The reason is a combination of HTTP clients using a request-response model, a need to compute the value for the Content-Length header and because of the JSON format. All this makes it challenging to stream individual rows. PostgreSQL clients have an option to incrementally receive results as the PostgreSQL wire protocol is designed to support streaming of individual data rows.

   +--------+  1  +--------+       +--------+
   | client | --> | node-0 | --2-> | node-1 |
   |        | <-- |        | <-3-- |        |
   +--------+  6  +--------+       +--------+
                     v  ^            v  ^
                     |  |            |  |
                     |  +----4-------+  |
                     |                  |
                     +-------5----------+
  1. Client sends SELECT name FROM users to server
  2. node-0 sends a job request with the RoutedCollectPhase to node-1
  3. node-1 sends an empty acknowledgment
  4. node-1 pushes a result to node-0
  5. node-0 sends a response, requesting more data or short-circuiting the operation.
  6. node-0 sends the result to the client

Step 4 and 5 can repeat until the result is complete and it is possible for 6 to be interleaved with 4 and 5. This depends on the exact query, merge operation and client protocol in use.

node-0 also communicates with itself if it acts as both handler node and collect node, this is omitted here to simplify the illustration.

Wrap up

This post walked through another building block of the query execution in CrateDB. We focused on the distribution aspect and used a simple SELECT name FROM users query as example. Later posts will walk through more complex queries.