Distributed SELECT statement execution in CrateDB
Table of content
This is the second post of a series about CrateDB internals. In the
first
post I introduced relational algebra, the optimizer and outlined how
the execution works on a very high level. In this post I’ll explore the
execution of a simple SELECT
statement in more detail. This
post focuses on the distribution aspect of the query execution.
A simple select ¶
Let’s start with the simplest query possible: a
SELECT name FROM users
. The query is supposed to return the
name
values for all records in the users
table. In the first post
I elaborated how the logical plan is created and later transformed into
an execution plan. Let us resume from there. The execution plan for
SELECT name FROM users
would consist of a
CollectPhase
and a MergePhase
. These phases
would look roughly like this:
RoutedCollectPhase:
toCollect: [Ref{name}]
routing:
node-0:
users: [0, 1]
node-1:
users: [2, 3]
MergePhase:
executionNodes: [node-0]
numUpstreams: 2
The RoutedCollectPhase
is one concrete implementation of
a CollectPhase
. The toCollect
property tells
us which attributes should be collected and the routing
property tells us from where. In this case the executor has to retrieve
the data from node-0
and node-1
as both these
nodes contain two shard copies of the table.
The MergePhase
is used to indicate that a node needs to
merge data from several upstream nodes. Usually this merge phase is
assigned to the handler node. In this case it is
node-0
and it is expecting results from two other nodes. In
this scenario, node-0
is both the handler node and
a collect node. It is expecting results from itself as well as
from one other node.
There are two modes of operation for this particular query.
Direct response ¶
CrateDB can opt to use a direct response mode. In this mode,
the handler node will send job requests to the collect
nodes, and it expects a response for each request. This response
must contain the result for the query. node-0
has to
collect the name
attributes for users
from
shard 0 and 1 and send these records as part of a response back to the
handler node. node-1
has to do the same, but with
shard 2 and 3.
The handler node will have setup some kind of merge
operation based on the MergePhase
that the optimizer
assigned to it. In this case there are no explicit merge instructions,
so the handler phase can - more or less directly - glossing
over some details - forward the results to the client who invoked the
query. We’ll look at some of the available merge operations in a later
post.
+--------+ 1 +--------+ 2 +--------+
| client | --> | node-0 | ----> | node-1 |
| | <-- | | <---- | |
+--------+ 4 +--------+ 3 +--------+
^ ^
| |
+--+
2/3
- Client sends
SELECT name FROM users
to server - node-0 sends a job request with the RoutedCollectPhase to itself and node-1
- node-0 and node-1 send the result in the response
- node-0 forwards the results to the client
Internal paging ¶
The direct response mode has a limitation: Each collect node must send the full result in a single response to the handler node. That means the entire intermediate data must be held in-memory. Because of this limitation, CrateDB contains a second mode which allows it to page internally. Whether internal paging or direct response is used is decided based on some heuristics.
In this mode the handler node also sends job requests to
each participating collect node, but instead of expecting the
results as part of the job-request-response, it only expects an empty
acknowledgment as response. Each collect node can then push a
subset of the actual result (a page) to the handler node. Each
collect node expects a response to this result push
request. This response can indicate if more data is needed. This
allows the handler node or merge node to request more
data - another page - or to short-circuit the execution. In the
SELECT name FROM users
example it could forward the records
received so far to the connected client and then respond to the
collect node, telling it to send more data.
If the client uses HTTP, the result has to be buffered on the handler
node in-memory until it is complete. The reason is a combination of HTTP
clients using a request-response model, a need to compute the value for
the Content-Length
header and because of the JSON format.
All this makes it challenging to stream individual rows. PostgreSQL
clients have an option to incrementally receive results as the
PostgreSQL wire protocol is designed to support streaming of individual
data rows.
+--------+ 1 +--------+ +--------+
| client | --> | node-0 | --2-> | node-1 |
| | <-- | | <-3-- | |
+--------+ 6 +--------+ +--------+
v ^ v ^
| | | |
| +----4-------+ |
| |
+-------5----------+
- Client sends
SELECT name FROM users
to server - node-0 sends a job request with the RoutedCollectPhase to node-1
- node-1 sends an empty acknowledgment
- node-1 pushes a result to node-0
- node-0 sends a response, requesting more data or short-circuiting the operation.
- node-0 sends the result to the client
Step 4 and 5 can repeat until the result is complete and it is possible for 6 to be interleaved with 4 and 5. This depends on the exact query, merge operation and client protocol in use.
node-0 also communicates with itself if it acts as both handler node and collect node, this is omitted here to simplify the illustration.
Wrap up ¶
This post walked through another building block of the query
execution in CrateDB. We focused on the distribution aspect and used a
simple SELECT name FROM users
query as example. Later posts
will walk through more complex queries.