(Abusing) Elasticsearch as a Framework

Or how Crate uses Elasticsearch.

Most people who know Elasticsearch think of it as a search engine, and they’re probably correct. But we at Crate think about it a bit differently and use it as a framework.

In this post I’ll try to explain how that works.

A short Elasticsearch intro

Elasticsearch is a clustered search engine. One can put documents into it and then run queries to find these documents and retrieve them. Being clustered means that it can run on one or more machines and the documents that are stored in Elasticsearch will be distributed among those machines.

There are two forms to communicate with it. Either via HTTP or via a Java client which uses something called transport protocol.

This transport protocol is also used for the communication between the machines within a cluster.

The indexing and search capabilities are powered by Lucene. A “high performance, full-featured Information Retrieval library”.

In a nutshell:

  • Elasticsearch does clustering (including all the tricky stuff that sane people don’t want to worry about: Discovery, master election, replication, dealing with net splits and race conditions)

  • Lucene does search and indexing (which Elasticsearch distributes among multiple machines)

A short Crate intro

Crate is a distributed SQL database that leverages Elasticsearch and Lucene. In it’s infant days it parsed SQL statements and translated them into Elasticsearch queries. It was basically a layer on top of Elasticsearch.

If you wrote something like

select * from users

It would be translated into (roughly)

POST /users/default/_search -d '{"query": {"match_all": {}}}'

Those were the early days. Since then it has evolved a lot. It got it’s own execution engine with it’s own DSL. Internally the same statement from before is now turned into something like this:

"fetchPhase": {
    "executionNodes": ["TLMh2zg0SRiC79mYxZj5uw"],
    "phaseType": "FETCH",
    "fetchRefs": ["doc.users._doc['name']"],
    "id": 1
},
"planType": "QueryThenFetch",
"subPlan": {
    "planType": "CollectAndMerge",
    "collectPhase": {
        "projections": [...],
        "phaseType": "COLLECT",
        "toCollect": ["doc.users._docid"],
        "id": 0,
        "distribution": {"distributedByColumn": 0, "type": "BROADCAST"},
        "routing": {
            "TLMh2zg0SRiC79mYxZj5uw": {
                "users": [ 0, 1, 2, 3, 4 ]
            }
        }
    }
},
"localMerge": {
    "projections": [...],
    "executionNodes": [],
    "phaseType": "MERGE",
    "id": 2
}

(Don’t worry if that doesn’t make sense to you. You may worry if it does)

This was done to implement features in Crate that do not exist in Elasticsearch.

But this whole SQL execution engine also uses Elasticsearch in some way. I’ll try to explain how.

Elasticsearch as a web framework

Since many developers are familiar with web frameworks I’ll try to go with that. It actually fits with how parts of Elasticsearch work quite well.

In a web framework one can usually register a route to a handler. Like, if a browser points to /foo/bar, then FooBarHandler.get should be called.

The get implementation of the FooBarHandler class may receives some sort of request object, then some business logic is executed and finally a response is generated to be returned by the get function.

Meanwhile the web framework does all the magic to produce the request object, call the appropriate function and deliver the response object in a sensible form back over the wire to the client.

Elasticsearch includes it’s on HTTP server and is in a sense also a web framework. Or rather: It does what web frameworks do.

Back to the example from earlier:

POST /users/default/_search -d '{"query": {"match_all": {}}}'

A HTTP request like that causes a handler registered to /{index}/{type}/_search to be fired. (RestSearchAction to be specific - in case you want to take a look at the source).

In Elasticsearch HTTP requests are always translated into their transport request equivalents. And the purpose of this handler is to do that.

In this case it becomes a SearchRequest. The same kind of SearchRequest a user of the Java API would create to execute the same query.

This SearchRequest is then sent to a TransportRequestHandler that is registered under a name. (The name being similar to a route like /foo/bar in the example before)

Those TransportRequestHandler are mostly part of a class called Transport<name>Action which contains the actual logic on how to process a request.

To sum up:

  • Elasticsearch has two network services: HTTP (9200) and Transport (9300).
  • Internally routes/URLs are mapped to handlers just like in web frameworks (E.g. /url/x/y maps to handleRequest in class XY).
  • HTTP requests are converted to transport requests.
  • Transport requests are sent to RequestHandler classes registered under a name .
  • Transport<name>Action does the actual work. (Like making more requests to other nodes to gather data)

Transport protocol

The transport protocol is the binary protocol used to send objects between nodes in a Elasticsearch (or Crate) cluster.

Most web APIs nowadays accpet requests containing JSON payloads. A client can send any JSON document as long as it is valid JSON. It doesn’t matter what keys or values it contains. The server will be able to read the whole request and parse the JSON. If it can then do anything useful with that payload is another matter.

With the transport protocol things are a bit different. Requests and responses are kind of static. The fields and their types have to be pre-defined. A TransportAction can only ever receive one type of request.

This has the advantage that it is faster because the requests and responses don’t have to include type and length information or field names.

(If you want to see some real code, take a look at the readFrom and writeTo implementations of the SQLRequest. Notice how both methods match in what they do)

This transport infastructure is one huge part of Elasticsearch that Crate uses heavily.

But there is more. If it was just for the communication we could’ve rolled with Netty and something like Google Protobuf. (The Transport service is based on Netty)

A cluster: Routing, shard allocation, replication and recovery

Elasticsearch provides us with a cluster. And this isn’t just about discovering other machines and then connecting them with each other. This includes much more.

Shard allocation and Routing

In Elasticsearch documents are stored within an index, or rather within a shard that is part of an index.

One Elasticsearch index consists of one or more shards. How many is defined when an index is created.

The terminology can be a bit confusing as there are two kinds of indices. It might refer to a index that has multiple shards, or it might refer to a Lucene index. A Lucene index being a shard. Yes that’s right: A index consists of (Lucene) indices.

A shard is the smallest unit that can be distributed among nodes. It cannot be further split for distribution. If there are two nodes in a cluster and an index has only 1 shard then one node will stay empty.

If you’re not confused enough already let’s make it a bit more complicated:

In Crate the terminology is table and shards. A shard is the same thing as in Elasticsearch, but a table can be 1 or more Elasticsearch indices:

table > (ES) index > shard (lucene index)

Why bring this up? Because Crate uses Elasticsearch to do the shard allocation. Elasticsearch services decide which shard should reside on which node.

Crate uses the information provided by Elasticsearch in order to make requests to the correct nodes.

Remember the Crate execution DSL from before? Especially this part:

"routing": {
    "TLMh2zg0SRiC79mYxZj5uw": {
        "users": [ 0, 1, 2, 3, 4 ]
    }
}

It tells us that shards 0-4 of the users table are on a node with the cryptic id TLMh2zg0SRiC79mYxZj5uw.

Replication and Recovery

Elasticsearch has replication and recovery built in, and so does Crate.

Replication means that if a document is inserted into an table/index that has a replica configured, the document will be put into 1 shard and a copy will be put into a replica of that 1 shard.

This replica is by default on another machine and cannot be allocated on the same machine as the primary shard. If one machine goes poof the data is still available.

Crate hooks into the replication/recovery mechanism to also support replication & recovery for it’s blob tables. (A special form of storage for binary objects that isn’t based on Elasticsearch or Lucene)

Friday, March 18, 2016 » Crate programming