Failures are the bane of existence for distributed systems. The primary technique for masking failures is, of course, to replicate the state and functionality of the distributed system so that the service as a whole can continue to function even as some pieces may fail.
But this replication has been, historically, very difficult to perform. While the first papers in this space appeared in the late 80's [OL88, Lam98], it wasn't until the last decade that we got actual reusable components. The two most commonly used components are Google's Chubby (based on Paxos) and its open-source counterpart, ZooKeeper (based on the ZAB protocol). Both of these systems provide a very similar interface, namely, a filesystem API where one can read and write replicated files.
When we were developing a distributed coordinator for HyperDex, we found that these systems suffer from a common shortcoming, because of their API. Inspired by OpenReplica's approach to replicating Python objects, we decided to build a new open-source state machine replication library, called Replicant, that makes it easy to build high-performance replicated state machines specified as C/C++ code.
Let's first illustrate this API problem, and then we'll explore how Replicant's improved API overcomes this problem and provides a superior way to develop distributed systems.
Conceptually, the filesystem API is very easy to use. Applications can create files and directories, and they communicate using the filesystem hierarchy and the files' contents. In practice, this results in rather convoluted code to perform straightforward tasks. Here are the steps to acquire a lock in ZooKeeper (copied from ZooKeeper recipes):
This code essentially maps a queue onto a directory in the filesystem, using the lexicographic ordering of files within the directory to enforce a FIFO order. Of course, it would be even better if we could just write a linked-list, but ZK's API doesn't allow for such a structure.
As an exercise, look at the above code, and try to understand why Step 5 is necessary for the algorithm. Is Step 5 even necessary? What bad things can happen if Step 5 is omitted?
Can you extend the above example to implement a semaphore that permits a configurable number of processes to enter the critical section? Despite the similarities in the two problems, the result of such an exercise would be a subtly different algorithm that is incompatible with the above protocol.
The filesystem abstraction is the wrong abstraction for building distributed systems.
There exist better abstractions.
Replication in distributed systems is a well-studied topic that spans decades of research. A good starting point is this tutorial.
The key concept in this space, around which Replicant is built, is that of a replicated state machine: the programmer defines the initial state that should tolerate failures and the set of allowable transitions on this state, and the system ensures that every replica starts in the same state, and every replica makes the same state transitions in the same order.
State machine replication is much more elegant than the filesystem API used in ZooKeeper and Chubby. In Replicant, programmers deal with objects, and worry about methods that define legal transitions on those objects. This is a lot easier than having to shoehorn a complex operation back onto directory and filesystem operations.
Replicant is designed from the ground up for replicating state machines. You provide Replicant with a state machine in the form of a .so file; it replicates it and performs the hard work of keeping all the replicas in sync at all times. Its clean interface means that applications need not be a mess of tangled filesystem calls, but instead can be expressed as plain, single-threaded C code that executes as if it's on a single machine even though there will be many replicas of the code running simultaneously.
In Replicant, you model your application as a set of named procedures that mutate an arbitrary piece of state. Replicant ensures that these procedures are called in the same order at all replicas, and provides a convenient RPC-like mechanism for making calls in a fault-tolerant manner.
Let's examine how this works.
For the sake of simplicity, let's first imagine that we are building a reliable counter service, possibly for tracking page views or orders.
We will first need some boilerplate code to get started:
#include <stdint.h>
#include <string.h>
#include <replicant_state_machine.h>
Next, we'll need a way to create our counter, by allocating enough memory to hold our crucial state, which in this case happens to be just 64 bits, and setting it to its initial value:
void* counter_create(struct replicant_state_machine_context* ctx) {
uint64_t* x = malloc(sizeof(uint64_t));
if(x) {
*x = 0;
}
return x;
}
Now, on occasion, our replicas might fail, and they may need to be recreated. Similarly, on occasion, we might want to take a snapshot of our counter, so that we can propagate it to failed replicas to bring them up to date, or to store it on permanent storage. For these functions, we'll need two additional helper functions:
void* counter_recreate(struct replicant_state_machine_context* ctx,
const char* data, size_t data_sz) {
uint64_t* x = malloc(sizeof(uint64_t));
if (x) {
memmove(x, data, data_sz < sizeof(uint64_t) ? data_sz : sizeof(uint64_t));
}
return x;
}
void counter_snapshot(struct replicant_state_machine_context* ctx,
void* obj, const char** data, size_t* data_sz) {
char* ptr = malloc(sizeof(uint64_t));
*data = ptr;
*data_sz = sizeof(uint64_t);
if (ptr) {
memmove(ptr, obj, sizeof(uint64_t));
}
}
And on occasion, we might want to shut down a replica and clean up its state. In this case, there isn't much to do except to free the little bit of space our counter is using:
void counter_destroy(struct replicant_state_machine_context* ctx,
void* f) {
free(f);
}
That takes us to the critical increment operation. It's incredibly easy to specify in Replicant:
void counter_increment(struct replicant_state_machine_context* ctx,
void* obj,
const char* data, size_t data_sz) {
uint64_t* count = (uint64_t*)obj;
*count += 1;
replicant_state_machine_set_response(ctx, obj, sizeof(uint64_t));
}
Replicant represents each object as a void*, which is passed to every method invocation on the object. Typically the user will cast this void* to a type suitable for holding the object's state. In our counter, the state is a uint64_t that is allocated to store the count. The code increments the counter, and returns the new value to the client.
All we need to do now is to package this up, let Replicant know where to find our maintenance functions, and let the world know what the interface to this shared object is:
struct replicant_state_machine rsm = {
counter_create,
counter_recreate,
counter_destroy,
counter_snapshot,
{{"increment", counter_increment}, // list of object-specific methods
{NULL, NULL}} // sentinel
};
In just 74 lines of code, we've defined a complete Replicant object that maintains a counter.
This object is instantiated as a Replicant object by compiling the code to a .so file and uploading the shared object to the Replicant cluster. Replicant will call the counter_create, counter_recreate, counter_destroy, and counter_snapshot methods as appropriate to create a new instance of the object which can then be passed to all subsequent calls.
On the client side, the application may connect to a cluster and send remote calls to the instantiated object. For example, to invoke the increment method on an instance of our counter named mycount, we can issue code like:
...
replicant_client r("127.0.0.1", 1982);
int64_t sid = r.send("mycount", "increment", "", 0, ...);
...
This method invocation will pass the name of the object, "mycount", and the method name, "increment", to the replicated object. Replicant will order all method calls and invoke the counter_increment C function with the object's state.
We've tailored the above code samples for the sake of exposition and have omitted several details about how to build and deploy our Replicant object. In the next section, we'll walk through a complete end-to-end example that shows you how to build an object, start a new Replicant cluster, instantiate your object, and make method calls.
Let's take a look at the distributed lock example, described above. The entire implementation is pretty compact, at just over 500 lines, available on GitHub. Replicant enables us to invoke the resulting replicated object easily from the command line.
First, make sure you have the latest version of Replicant installed. On the HyperDex downloads page we offer instructions for installing HyperDex. Instead of hyperdex, install the replicant and libreplicant-dev packages.
Once you have Replicant installed, clone our repository and make build the code:
$ git clone https://github.com/rescrv/demo-distributed-lock
$ cd demo-distributed-lock
$ make
This will create the .so that defines the state machine, as well as the client programs (called lock, unlock, holder) that can be called to invoke methods on that replicated state machine.
If all went well, you should see the following files:
$ ls
holder holder.cc lock lock.cc lock-object.c lock-object.o
lock-object.so Makefile unlock unlock.cc util.h
Then, in one terminal, launch the Replicant daemon:
$ replicant daemon --foreground --listen 127.0.0.1 --listen-port 1982 --data=/path/to/tmp
I1217 00:50:52.963048 9199 daemon.cc:211] running in the foreground
I1217 00:50:52.963330 9199 daemon.cc:212] no log will be generated; instead, the log messages will print to the terminal
I1217 00:50:52.963489 9199 daemon.cc:213] provide "--daemon" on the command-line if you want to run in the background
I1217 00:50:52.971564 9199 daemon.cc:246] started new cluster from command-line arguments: configuration(cluster=4047024000452744282, prev_token=0, this_token=7139395057213071932, version=1, command=[13325666613164145467], config=[13325666613164145467], members=[chain_node(bind_to=127.0.0.1:1982, token=13325666613164145467)])
I1217 00:50:52.972247 9199 daemon.cc:511] resuming normal operation
I1217 00:50:52.972265 9199 daemon.cc:1162] deploying configuration configuration(cluster=4047024000452744282, prev_token=0, this_token=7139395057213071932, version=1, command=[13325666613164145467], config=[13325666613164145467], members=[chain_node(bind_to=127.0.0.1:1982, token=13325666613164145467)])
I1217 00:50:52.972565 9199 daemon.cc:1251] the latest stable configuration is configuration(cluster=4047024000452744282, prev_token=0, this_token=7139395057213071932, version=1, command=[13325666613164145467], config=[13325666613164145467], members=[chain_node(bind_to=127.0.0.1:1982, token=13325666613164145467)])
I1217 00:50:52.972784 9199 daemon.cc:1252] the latest proposed configuration is configuration(cluster=4047024000452744282, prev_token=0, this_token=7139395057213071932, version=1, command=[13325666613164145467], config=[13325666613164145467], members=[chain_node(bind_to=127.0.0.1:1982, token=13325666613164145467)])
W1217 00:50:52.973001 9199 daemon.cc:1258] the most recently deployed configuration can tolerate at most 0 failures which is less than the 2 failures the cluster is expected to tolerate; bring 4 more servers online to restore 2-fault tolerance
I1217 00:50:52.973194 9199 daemon.cc:1952] we are chain_node(bind_to=127.0.0.1:1982, token=13325666613164145467) and here's some info: issued <=1 | acked <=1
I1217 00:50:52.973361 9199 daemon.cc:1955] our stable configuration is configuration(cluster=4047024000452744282, prev_token=0, this_token=7139395057213071932, version=1, command=[13325666613164145467], config=[13325666613164145467], members=[chain_node(bind_to=127.0.0.1:1982, token=13325666613164145467)])
I1217 00:50:52.973575 9199 daemon.cc:1956] the suffix of the chain stabilized through 0
I1217 00:50:52.973718 9199 daemon.cc:2199] command tail stabilizes at configuration 1
Let's create an object called "lock" using the lock-object.so you built above:
$ replicant new-object lock lock-object.so
If all goes well, you'll see the following log messages in your Replicant daemon:
I1217 00:53:37.413707 9894 daemon.cc:1910] registering client 5857825500350879133
I1217 00:53:37.414326 9894 daemon.cc:1915] disconnecting client 5857825500350879133
I1217 00:53:37.415477 10025 object_manager.cc:891] spawning worker thread for object 7813573189723750400
Here, 5857825500350879133 is the client that executes the new-object command. The number 7813573189723750400 is the ID of the "lock" object. Your numbers may vary, and that's OK.
Now let's have Alice grab the lock:
$ ./lock 127.0.0.1 1982 Alice
lock acquired @ 0
release with: unlock 127.0.0.1 1982 Alice@0
Here, Alice has acquired the lock, and is currently holding it. We can see that this is the case by looking in the Replicant daemon's log for:
I1217 00:56:35.535212 10025 object_manager.cc:959] lock:lock @ 5: lock acquired by Alice@0
When Alice is done with the lock, she can release it with:
$ ./unlock 127.0.0.1 1982 Alice@0
and we see in the daemon's console:
I1217 00:57:38.499760 10025 object_manager.cc:959] lock:unlock @ 7: lock released by Alice@0
Now what happens if Alice and Bob try to acquire the lock at the same time? One of the two must block and wait for the other to release it:
$ ./lock 127.0.0.1 1982 Alice
lock acquired @ 1
release with: unlock 127.0.0.1 1982 Alice@1
$ ./lock 127.0.0.1 1982 Bob
Here, Bob's command will block until Alice runs (in another terminal):
$ ./unlock 127.0.0.1 1982 Alice@1
For convenience, the implementation enables anyone to see who holds the lock:
$ ./holder 127.0.0.1 1982
lock held by: Bob@2
If you browse the code for the lock object, you'll see that each command line program translates to a function on in the lock object. For example, the unlock command calls the lock_unlock function, passing the command-line argument directly to the object. The linked code is pretty well-documented, and is easy to use as a starting point for building Replicant objects.
We developed Replicant because it enables a cleaner, easier way to develop fault-tolerant distributed systems. Replicant handles the complexity of replication and lets us focus on our applications. Rather than having to mash an application onto a filesystem API, it allows us to implement shared state directly as objects. The resulting applications are simple to understand and easy to prove correct, even though the underlying replication protocols typically are subtle and complicated.
We currently use Replicant to implement HyperDex's coordinator. The hyperdex coordinator command you see in the HyperDex Tutorial is just a thin wrapper around Replicant that automatically creates a new Replicant daemon and associated object.
The next time you reach for ZooKeeper, ask yourself whether it provides the primitive you really need. If ZooKeeper's filesystem and znode abstractions truly meet your needs, great. But the odds are, you'll be better off writing your application as a replicated state machine.
In a future blog post we'll describe ChainSaw (an extension to Chain Replication [vRS04]), the new consensus protocol behind Replicant that does the heavy lifting for replicated state machines. In the meantime, you can get started with Replicant today by grabbing the code from GitHub and building your own replicated objects.
Lam98 | Leslie Lamport. The Part-Time Parliament. In ACM Transactions on Computer Systems, 16(2):133-169, 1998. |
OL88 | Brian M. Oki and Barbara Liskov. Viewstamped Replication: A General Primary Copy. In Proceedings of the ACM Symposium on Principles of Distributed Computing, pages 8-17, Toronto, Canada, August 1988. |
vRS04 | Robbert van Renesse and Fred B. Schneider. Chain Replication For Supporting High Throughput And Availability. In Proceedings of the Symposium on Operating System Design and Implementation, pages 91-104, San Francisco, California, December 2004. |