Techno Blender
Digitally Yours.

Implementation of the Raft Consensus Algorithm Using C++20 Coroutines

0 24


This article describes how to implement a Raft Server consensus module in C++20 without using any additional libraries. The narrative is divided into three main sections:

  1. A comprehensive overview of the Raft algorithm
  2. A detailed account of the Raft Server’s development
  3. A description of a custom coroutine-based network library

The implementation makes use of the robust capabilities of C++20, particularly coroutines, to present an effective and modern methodology for building a critical component of distributed systems. This exposition not only demonstrates the practical application and benefits of C++20 coroutines in sophisticated programming environments, but it also provides an in-depth exploration of the challenges and resolutions encountered while building a consensus module from the ground up, such as Raft Server. The Raft Server and network library repositories, miniraft-cpp and coroio, are available for further exploration and practical applications.

Introduction

Before delving into the complexities of the Raft algorithm, let’s consider a real-world example. Our goal is to develop a network key-value storage (K/V) system. In C++, this can be easily accomplished by using an unordered_map<string, string>. However, in real-world applications, the requirement for a fault-tolerant storage system increases complexity. A seemingly simple approach could entail deploying three (or more) machines, each hosting a replica of this service. The expectation may be for users to manage data replication and consistency. However, this method can result in unpredictable behaviors. For example, it is possible to update data using a specific key and then retrieve an older version later.

What users truly want is a distributed system, potentially spread across multiple machines, that runs as smoothly as a single-host system. To meet this requirement, a consensus module is typically placed in front of the K/V storage (or any similar service, hereafter referred to as the “state machine”). This configuration ensures that all user interactions with the state machine are routed exclusively through the consensus module, rather than direct access. With this context in mind, let us now look at how to implement such a consensus module, using the Raft algorithm as an example.

Raft Overview

In the Raft algorithm, there are an odd number of participants known as peers. Each peer keeps its own log of records. There is one peer leader, and the others are followers. Users direct all requests (reads and writes) to the leader. When a write request to change the state machine is received, the leader logs it first before forwarding it to the followers, who also log it. Once the majority of peers have successfully responded, the leader considers this entry to be committed, applies it to the state machine, and notifies the user of its success.

The Term is a key concept in Raft, and it can only grow. The Term changes when there are system changes, such as a change in leadership. The log in Raft has a specific structure, with each entry consisting of a Term and a Payload. The term refers to the leader who wrote the initial entry. The Payload represents the changes to be made to the state machine. Raft guarantees that two entries with the same index and term are identical. Raft logs are not append-only and may be truncated. For example, in the scenario below, leader S1 replicated two entries before crashing. S2 took the lead and began replicating entries, and S1’s log differed from those of S2 and S3. As a result, the last entry in the S1 log will be removed and replaced with a new one.

Raft RPC API

Let us examine the Raft RPC. It’s worth noting that the Raft API is quite simple, with just two calls. We’ll begin by looking at the leader election API. It is important to note that Raft ensures that there can only be one leader per term. There may also be terms without a leader, such as if elections fail. To ensure that only one election occurs, a peer saves its vote in a persistent variable called VotedFor. The election RPC is called RequestVote and has three parameters: TermLastLogIndex, and LastLogTerm. The response contains Term and VoteGranted. Notably, every request contains Term, and in Raft, peers can only communicate effectively if their Terms are compatible.

When a peer initiates an election, it sends a RequestVote request to the other peers and collects their votes. If the majority of the responses are positive, the peer advances to the leader role.

Now let’s look at the AppendEntries request. It accepts parameters such as Term, PrevLogIndex, PrevLogTerm, and Entries, and the response contains Term and Success. If the Entries field in the request is empty, it acts as a Heartbeat.

When an AppendEntries request is received, a follower checks the PrevLogIndex for the Term. If it matches PrevLogTerm, the follower adds Entries to its log beginning with PrevLogIndex + 1 (entries after PrevLogIndex are removed if they exist):

Flow of AppendEntries request being received

If the terms do not match, the follower returns Success=false. In this case, the leader retries sending the request, lowering the PrevLogIndex by one.

Leader retries sending the request, lowering the PrevLogIndex by one

When a peer receives a RequestVote request, it compares its LastTerm and LastLogIndex pairs to the most recent log entry. If the pair is less than or equal to the requestor’s, the peer returns VoteGranted=true.

State Transitions in Raft

Raft’s state transitions look like this. Each peer begins in the Follower state. If a Follower does not receive AppendEntries within a set timeout, it extends its Term and moves to the Candidate state, triggering an election. A peer can move from the Candidate state to the Leader state if it wins the election, or return to the Follower state if it receives an AppendEntries request. A Candidate can also revert to being a Candidate if it does not transition to either a Follower or a Leader within the timeout period. If a peer in any state receives an RPC request with a Term greater than its current one, it moves to the Follower state.

Commit

Let us now consider an example that demonstrates how Raft is not as simple as it may appear. I took this example from Diego Ongaro’s dissertation. S1 was the leader in Term 2, where it replicated two entries before crashing. Following this, S5 took the lead in Term 3, added an entry, and then crashed. Next, S2 took over leadership in Term 4, replicated the entry from Term 2, added its own entry for Term 4, and then crashed. This results in two possible outcomes: S5 reclaims leadership and truncates the entries from Term 2, or S1 regains leadership and commits the entries from Term 2. The entries from Term 2 are securely committed only after they are covered by a subsequent entry from a new leader. 

How the Raft algorithm operates in a dynamic and often unpredictable set of circumstances

This example demonstrates how the Raft algorithm operates in a dynamic and often unpredictable set of circumstances. The sequence of events, which includes multiple leaders and crashes, demonstrates the complexity of maintaining a consistent state across a distributed system. This complexity is not immediately apparent, but it becomes important in situations involving leader changes and system failures. The example emphasizes the importance of a robust and well-thought-out approach to dealing with such complexities, which is precisely what Raft seeks to address.

Additional Materials

For further study and a deeper understanding of Raft, I recommend the following materials: the original Raft paper, which is ideal for implementation. Diego Ongaro’s PhD dissertation provides more in-depth insights. Maxim Babenko’s lecture goes into even greater detail.

Raft Implementation

Let’s now move on to the Raft server implementation, which, in my opinion, benefits greatly from C++20 coroutines. In my implementation, the Persistent State is stored in memory. However, in real-world scenarios, it should be saved to disk. I’ll talk more about the MessageHolder later. It functions similarly to a shared_ptr, but is specifically designed to handle Raft messages, ensuring efficient management and processing of these communications.

struct TState {
    uint64_t CurrentTerm = 1;
    uint32_t VotedFor = 0;
    std::vector<TMessageHolder<TLogEntry>> Log;
};

In the Volatile State, I labeled entries with either L for “leader” or F for “follower” to clarify their use. The CommitIndex denotes the last log entry that was committed. In contrast, LastApplied is the most recent log entry applied to the state machine, and it is always less than or equal to the CommitIndex. The NextIndex is important because it identifies the next log entry to be sent to a peer. Similarly, MatchIndex keeps track of the last log entry that discovered a match. The Votes section contains the IDs of peers who voted for me. Timeouts are an important aspect to manage: HeartbeatDue and RpcDue manage leader timeouts, while ElectionDue handles follower timeouts.

using TTime = std::chrono::time_point<std::chrono::steady_clock>;

struct TVolatileState {
    uint64_t CommitIndex = 0; // L,F
    uint64_t LastApplied = 0; // L,F
    std::unordered_map<uint32_t, uint64_t> NextIndex; // L
    std::unordered_map<uint32_t, uint64_t> MatchIndex; // L
    std::unordered_set<uint32_t> Votes; // C
    std::unordered_map<uint32_t, TTime> HeartbeatDue; // L
    std::unordered_map<uint32_t, TTime> RpcDue; // L
    TTime ElectionDue; // F
};

Raft API

My implementation of the Raft algorithm has two classes. The first is INode, which denotes a peer. This class includes two methods: Send, which stores outgoing messages in an internal buffer, and Drain, which handles actual message dispatch. Raft is the second class, and it manages the current peer’s state. It also includes two methods: Process, which handles incoming connections, and ProcessTimeout, which must be called on a regular basis to manage timeouts, such as the leader election timeout. Users of these classes should use the Process, ProcessTimeout, and Drain methods as necessary. INode‘s Send method is invoked internally within the Raft class, ensuring that message handling and state management are seamlessly integrated within the Raft framework.

struct INode {
    virtual ~INode() = default;
    virtual void Send(TMessageHolder<TMessage> message) = 0;
    virtual void Drain() = 0;
};

class TRaft {
public:
    TRaft(uint32_t node,
        const std::unordered_map<uint32_t, std::shared_ptr<INode>>& nodes);
    void Process(TTime now,
        TMessageHolder<TMessage> message,
        const std::shared_ptr<INode>& replyTo = {});
    void ProcessTimeout(TTime now);
};

Raft Messages

Now let’s look at how I send and read Raft messages. Instead of using a serialization library, I read and send raw structures in TLV format. This is what the message header looks like:

struct TMessage {
    uint32_t Type;
    uint32_t Len;
    char Value[0];
};

For additional convenience, I’ve introduced a second-level header:

struct TMessageEx: public TMessage {
    uint32_t Src = 0;
    uint32_t Dst = 0;
    uint64_t Term = 0;
};

This includes the sender’s and receiver’s ID in each message. With the exception of LogEntry, all messages inherit from TMessageEx. LogEntry and AppendEntries are implemented as follows:

struct TLogEntry: public TMessage {
    static constexpr EMessageType MessageType = EMessageType::LOG_ENTRY;
    uint64_t Term = 1;
    char Data[0];
};

struct TAppendEntriesRequest: public TMessageEx {
    static constexpr EMessageType MessageType
        = EMessageType::APPEND_ENTRIES_REQUEST;
    uint64_t PrevLogIndex = 0;
    uint64_t PrevLogTerm = 0;
    uint32_t Nentries = 0;
};

To facilitate message handling, I use a class called MessageHolder, reminiscent of a shared_ptr:

template<typename T>
requires std::derived_from<T, TMessage>
struct TMessageHolder {
    T* Mes;
    std::shared_ptr<char[]> RawData;
    uint32_t PayloadSize;
    std::shared_ptr<TMessageHolder<TMessage>[]> Payload;

    template<typename U>
    requires std::derived_from<U, T>
    TMessageHolder<U> Cast() {...}

    template<typename U>
    requires std::derived_from<U, T>
    auto Maybe() { ... }
};

This class includes a char array containing the message itself. It may also include a Payload (which is only used for AppendEntry), as well as methods for safely casting a base-type message to a specific one (the Maybe method) and unsafe casting (the Cast method). Here is a typical example of using the MessageHolder:

void SomeFunction(TMessageHolder<TMessage> message) {
    auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
    }
    // if we are sure
    auto appendEntries = message.Cast<TAppendEntriesRequest>();
    // usage with overloaded operator->
    auto term = appendEntries->Term;
    auto nentries = appendEntries->Nentries;
    // ...
}

And a real-life example in the Candidate state handler:

void TRaft::Candidate(TTime now, TMessageHolder<TMessage> message) {
    if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) {
        OnRequestVote(std::move(maybeResponseVote.Cast()));
    } else
    if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>())
    {
        OnRequestVote(now, std::move(maybeRequestVote.Cast()));
    } else
    if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>())
    {
        OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
    }
}

This design approach improves the efficiency and flexibility of message handling in Raft implementations.

Raft Server

Let’s discuss the Raft server implementation. The Raft server will set up coroutines for network interactions. First, we’ll look at the coroutines that handle message reading and writing. The primitives used for these coroutines are discussed later in the article, along with an analysis of the network library. The writing coroutine is responsible for writing messages to the socket, whereas the reading coroutine is slightly more complex. To read, it must first retrieve the Type and Len variables, then allocate an array of Len bytes, and finally, read the rest of the message. This structure facilitates the efficient and effective management of network communications within the Raft server.

template<typename TSocket>
TValueTask<void>
TMessageWriter<TSocket>::Write(TMessageHolder<TMessage> message) {
    co_await TByteWriter(Socket).Write(message.Mes, message->Len);

    auto payload = std::move(message.Payload);
    for (uint32_t i = 0; i < message.PayloadSize; ++i) {
        co_await Write(std::move(payload[i]));
    }

    co_return;
}

template<typename TSocket>
TValueTask<TMessageHolder<TMessage>> TMessageReader<TSocket>::Read() {
    decltype(TMessage::Type) type; decltype(TMessage::Len) len;
    auto s = co_await Socket.ReadSome(&type, sizeof(type));
    if (s != sizeof(type)) { /* throw */ }
    s = co_await Socket.ReadSome(&len, sizeof(len));
    if (s != sizeof(len)) { /* throw */}
    auto mes = NewHoldedMessage<TMessage>(type, len);
    co_await TByteReader(Socket).Read(mes->Value, len - sizeof(TMessage));
    auto maybeAppendEntries = mes.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
        auto nentries = appendEntries->Nentries; mes.InitPayload(nentries);
        for (uint32_t i = 0; i < nentries; i++) mes.Payload[i] = co_await Read();
    }
    co_return mes;
}

To launch a Raft server, create an instance of the RaftServer class and call the Serve method. The Serve method starts two coroutines. The Idle coroutine is responsible for periodically processing timeouts, whereas InboundServe manages incoming connections.

class TRaftServer {
public:
    void Serve() {
        Idle();
        InboundServe();
    }

private:
    TVoidTask InboundServe();
    TVoidTask InboundConnection(TSocket socket);
    TVoidTask Idle();
}

Incoming connections are received via the accept call. Following this, the InboundConnection coroutine is launched, which reads incoming messages and forwards them to the Raft instance for processing. This configuration ensures that the Raft server can efficiently handle both internal timeouts and external communication.

TVoidTask InboundServe() {
    while (true) {
        auto client = co_await Socket.Accept();
        InboundConnection(std::move(client));
    }
    co_return;
}

TVoidTask InboundConnection(TSocket socket) {
    while (true) {
        auto mes = co_await TMessageReader(client->Sock()).Read();
        Raft->Process(std::chrono::steady_clock::now(), std::move(mes),
            client);
        Raft->ProcessTimeout(std::chrono::steady_clock::now());
        DrainNodes();
    }
    co_return;
}

The Idle coroutine works as follows: it calls the ProcessTimeout method every sleep second. It’s worth noting that this coroutine uses asynchronous sleep. This design enables the Raft server to efficiently manage time-sensitive operations without blocking other processes, improving the server’s overall responsiveness and performance.

while (true) {
    Raft->ProcessTimeout(std::chrono::steady_clock::now());
    DrainNodes();
    auto t1 = std::chrono::steady_clock::now();
    if (t1 > t0 + dt) {
        DebugPrint();
        t0 = t1;
    }
    co_await Poller.Sleep(t1 + sleep);
}

The coroutine was created for sending outgoing messages and is designed to be simple. It repeatedly sends all accumulated messages to the socket in a loop. In the event of an error, it starts another coroutine that is responsible for connecting (via the connect function). This structure ensures that outgoing messages are handled smoothly and efficiently while remaining robust through error handling and connection management.

try {
    while (!Messages.empty()) {
        auto tosend = std::move(Messages); Messages.clear();
        for (auto&& m : tosend) {
            co_await TMessageWriter(Socket).Write(std::move(m));
        }
    }
} catch (const std::exception& ex) {
    Connect();
}
co_return;

With the Raft Server implemented, these examples show how coroutines greatly simplify development. While I haven’t looked into Raft’s implementation (trust me, it’s much more complex than the Raft Server), the overall algorithm is not only simple but also compact in design.

Next, we’ll look at some Raft Server examples. Following that, I’ll describe the network library I created from scratch specifically for the Raft Server. This library is critical to enabling efficient network communication within the Raft framework.

Here’s an example of launching a Raft cluster with three nodes. Each instance receives its own ID as an argument, as well as the other instances’ addresses and IDs. In this case, the client communicates exclusively with the leader. It sends random strings while keeping a set number of in-flight messages and waiting for their commitment. This configuration depicts the interaction between the client and the leader in a multi-node Raft environment, demonstrating the algorithm’s handling of distributed data and consensus.

$ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
...
Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0
        MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176
....
$ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
$ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...
$ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1
 198k 0:00:03 [159.2k/s] [        <=>

I measured the commit latency for configurations of both 3-node and 5-node clusters. As expected, the latency is higher for the 5-node setup:

  • 3 Nodes
    • 50 percentile (median): 292,872 ns
    • 80 percentile: 407,561 ns
    • 90 percentile: 569,164 ns
    • 99 percentile: 40,279,001 ns
  • 5 Nodes

    • 50 percentile (median): 425,194 ns
    • 80 percentile: 672,541 ns
    • 90 percentile: 1,027,669 ns
    • 99 percentile: 38,578,749 ns

I/O Library

Let’s now look at the I/O library that I created from scratch and used in the Raft server’s implementation. I began with the example below, taken from cppreference.com, which is an implementation of an echo server:

task<> tcp_echo_server() {
    char data[1024];
    while (true) {
        std::size_t n = co_await socket.async_read_some(buffer(data));
        co_await async_write(socket, buffer(data, n));
    }
}

An event loop, a socket primitive, and methods like read_some/write_some (named ReadSome/WriteSome in my library) were required for my library, as well as higher-level wrappers such as async_write/async_read (named TByteReader/TByteWriter in my library).

To implement the ReadSome method of the socket, I had to create an Awaitable as follows:

auto ReadSome(char* buf, size_t size) {
    struct TAwaitable  {
        bool await_ready() { return false; /* always suspend */ }
        void await_suspend(std::coroutine_handle<> h) {
            poller->AddRead(fd, h);
        }
        int await_resume() {
            return read(fd, b, s);
        }
        TSelect* poller; int fd; char* b; size_t s;
    };
    return TAwaitable{Poller_,Fd_,buf,size};
}

When co_await is called, the coroutine suspends because await_ready returns false. In await_suspend, we capture the coroutine_handle and pass it along with the socket handle to the poller. When the socket is ready, the poller calls the coroutine_handle to restart the coroutine. Upon resumption, await_resume is called, which performs a read and returns the number of bytes read to the coroutine. The WriteSome, Accept, and Connect methods are implemented in a similar manner.

The Poller is set up as follows:

struct TEvent {
    int Fd; int Type; // READ = 1, WRITE = 2;
    std::coroutine_handle<> Handle;
};
class TSelect {
    void Poll() {
        for (const auto& ch : Events) { /* FD_SET(ReadFds); FD_SET(WriteFds);*/ }
        pselect(Size, ReadFds, WriteFds, nullptr, ts, nullptr);
        for (int k = 0; k < Size; ++k) {
            if (FD_ISSET(k, WriteFds)) {
                Events[k].Handle.resume();
            }
            // ...
        }
    }
    std::vector<TEvent> Events;
    // ...
};

I keep an array of pairs (socket descriptor, coroutine handle) that are used to initialize structures for the poller backend (in this case, select). Resume is called when the coroutines corresponding to ready sockets wake up.

This is applied in the main function as follows:

TSimpleTask task(TSelect& poller) {
    TSocket socket(0, poller);
    char buffer[1024];
    while (true) {
        auto readSize = co_await socket.ReadSome(buffer, sizeof(buffer));
    }
}
int main() {
    TSelect poller;
    task(poller);
    while (true) { poller.Poll(); }
}

We start a coroutine (or coroutines) that enters sleep mode on co_await, and control is then passed to an infinite loop that invokes the poller mechanism. If a socket becomes ready within the poller, the corresponding coroutine is triggered and executed until the next co_await.

To read and write Raft messages, I needed to create high-level wrappers over ReadSome/WriteSome, similar to:

TValueTask<T> Read() {
    T res; size_t size = sizeof(T);
    char* p = reinterpret_cast<char*>(&res);
    while (size != 0) {
        auto readSize = co_await Socket.ReadSome(p, size);
        p += readSize;
        size -= readSize;
    }
    co_return res;
}
// usage
T t = co_await Read<T>();

To implement these, I needed to create a coroutine that also functions as an Awaitable. The coroutine is made up of a pair: coroutine_handle and promise. The coroutine_handle is used to manage the coroutine from the outside, whereas the promise is for internal management. The coroutine_handle can include Awaitable methods, which allow the coroutine’s result to be awaited with co_await. The promise can be used to store the result returned by co_return and to awaken the calling coroutine.

In coroutine_handle, within the await_suspend method, we store the coroutine_handle of the calling coroutine. Its value will be saved in the promise:

template<typename T>
struct TValueTask : std::coroutine_handle<> {
    bool await_ready() { return !!this->promise().Value; }
    void await_suspend(std::coroutine_handle<> caller) {
        this->promise().Caller = caller;
    }
    T await_resume() { return *this->promise().Value; }
    using promise_type = TValuePromise<T>;
};

Within the promise itself, the return_value method will store the returned value. The calling coroutine is woken up with an awaitable, which is returned in final_suspend. This is because the compiler, after co_return, invokes co_await on final_suspend.

template<typename T>
struct TValuePromise {
    void return_value(const T& t) { Value = t; }
    std::suspend_never initial_suspend() { return {}; }
    // resume Caller here
    TFinalSuspendContinuation<T> final_suspend() noexcept;
    std::optional<T> Value;
    std::coroutine_handle<> Caller = std::noop_coroutine();
};

In await_suspend, the calling coroutine can be returned, and it will be automatically awakened. It is important to note that the called coroutine will now be in a sleeping state, and its coroutine_handle must be destroyed with destroy to avoid a memory leak. This can be accomplished, for example, in the destructor of TValueTask.

template<typename T>
struct TFinalSuspendContinuation {
    bool await_ready() noexcept { return false; }
    std::coroutine_handle<> await_suspend(
        std::coroutine_handle<TValuePromise<T>> h) noexcept
    {
        return h.promise().Caller;
    }
    void await_resume() noexcept { }
};

With the library description completed, I ported the libevent benchmark to it to ensure its performance. This benchmark generates a chain of N Unix pipes, each one linked to the next. It then initiates 100 write operations into the chain, which continues until there are 1000 total write calls. The image below depicts the benchmark’s runtime as a function of N for various backends of my library (coroio) versus libevent. This test demonstrates that my library performs similarly to libevent, confirming its efficiency and effectiveness in managing I/O operations.

Benchmark's runtime as a function of N for various backends of my library (coroio) versus libevent

Conclusion

In closing, this article has described the implementation of a Raft server using C++20 coroutines, emphasizing the convenience and efficiency provided by this modern C++ feature. The custom I/O library, which was written from scratch, is critical to this implementation because it effectively handles asynchronous I/O operations. The performance of the library was validated against the libevent benchmark, demonstrating its competency.

For those interested in learning more about or using these tools, the I/O library is available at coroio, and the Raft library at miniraft-cpp (linked at the beginning of the article). Both repositories provide a detailed look at how C++20 coroutines can be used to build robust, high-performance distributed systems.


This article describes how to implement a Raft Server consensus module in C++20 without using any additional libraries. The narrative is divided into three main sections:

  1. A comprehensive overview of the Raft algorithm
  2. A detailed account of the Raft Server’s development
  3. A description of a custom coroutine-based network library

The implementation makes use of the robust capabilities of C++20, particularly coroutines, to present an effective and modern methodology for building a critical component of distributed systems. This exposition not only demonstrates the practical application and benefits of C++20 coroutines in sophisticated programming environments, but it also provides an in-depth exploration of the challenges and resolutions encountered while building a consensus module from the ground up, such as Raft Server. The Raft Server and network library repositories, miniraft-cpp and coroio, are available for further exploration and practical applications.

Introduction

Before delving into the complexities of the Raft algorithm, let’s consider a real-world example. Our goal is to develop a network key-value storage (K/V) system. In C++, this can be easily accomplished by using an unordered_map<string, string>. However, in real-world applications, the requirement for a fault-tolerant storage system increases complexity. A seemingly simple approach could entail deploying three (or more) machines, each hosting a replica of this service. The expectation may be for users to manage data replication and consistency. However, this method can result in unpredictable behaviors. For example, it is possible to update data using a specific key and then retrieve an older version later.

What users truly want is a distributed system, potentially spread across multiple machines, that runs as smoothly as a single-host system. To meet this requirement, a consensus module is typically placed in front of the K/V storage (or any similar service, hereafter referred to as the “state machine”). This configuration ensures that all user interactions with the state machine are routed exclusively through the consensus module, rather than direct access. With this context in mind, let us now look at how to implement such a consensus module, using the Raft algorithm as an example.

Raft Overview

In the Raft algorithm, there are an odd number of participants known as peers. Each peer keeps its own log of records. There is one peer leader, and the others are followers. Users direct all requests (reads and writes) to the leader. When a write request to change the state machine is received, the leader logs it first before forwarding it to the followers, who also log it. Once the majority of peers have successfully responded, the leader considers this entry to be committed, applies it to the state machine, and notifies the user of its success.

The Term is a key concept in Raft, and it can only grow. The Term changes when there are system changes, such as a change in leadership. The log in Raft has a specific structure, with each entry consisting of a Term and a Payload. The term refers to the leader who wrote the initial entry. The Payload represents the changes to be made to the state machine. Raft guarantees that two entries with the same index and term are identical. Raft logs are not append-only and may be truncated. For example, in the scenario below, leader S1 replicated two entries before crashing. S2 took the lead and began replicating entries, and S1’s log differed from those of S2 and S3. As a result, the last entry in the S1 log will be removed and replaced with a new one.

Two entries with the same index and term are identical

Raft RPC API

Let us examine the Raft RPC. It’s worth noting that the Raft API is quite simple, with just two calls. We’ll begin by looking at the leader election API. It is important to note that Raft ensures that there can only be one leader per term. There may also be terms without a leader, such as if elections fail. To ensure that only one election occurs, a peer saves its vote in a persistent variable called VotedFor. The election RPC is called RequestVote and has three parameters: TermLastLogIndex, and LastLogTerm. The response contains Term and VoteGranted. Notably, every request contains Term, and in Raft, peers can only communicate effectively if their Terms are compatible.

When a peer initiates an election, it sends a RequestVote request to the other peers and collects their votes. If the majority of the responses are positive, the peer advances to the leader role.

Now let’s look at the AppendEntries request. It accepts parameters such as Term, PrevLogIndex, PrevLogTerm, and Entries, and the response contains Term and Success. If the Entries field in the request is empty, it acts as a Heartbeat.

When an AppendEntries request is received, a follower checks the PrevLogIndex for the Term. If it matches PrevLogTerm, the follower adds Entries to its log beginning with PrevLogIndex + 1 (entries after PrevLogIndex are removed if they exist):

Flow of AppendEntries request being received

If the terms do not match, the follower returns Success=false. In this case, the leader retries sending the request, lowering the PrevLogIndex by one.

Leader retries sending the request, lowering the PrevLogIndex by one

When a peer receives a RequestVote request, it compares its LastTerm and LastLogIndex pairs to the most recent log entry. If the pair is less than or equal to the requestor’s, the peer returns VoteGranted=true.

State Transitions in Raft

Raft’s state transitions look like this. Each peer begins in the Follower state. If a Follower does not receive AppendEntries within a set timeout, it extends its Term and moves to the Candidate state, triggering an election. A peer can move from the Candidate state to the Leader state if it wins the election, or return to the Follower state if it receives an AppendEntries request. A Candidate can also revert to being a Candidate if it does not transition to either a Follower or a Leader within the timeout period. If a peer in any state receives an RPC request with a Term greater than its current one, it moves to the Follower state.

Commit

Let us now consider an example that demonstrates how Raft is not as simple as it may appear. I took this example from Diego Ongaro’s dissertation. S1 was the leader in Term 2, where it replicated two entries before crashing. Following this, S5 took the lead in Term 3, added an entry, and then crashed. Next, S2 took over leadership in Term 4, replicated the entry from Term 2, added its own entry for Term 4, and then crashed. This results in two possible outcomes: S5 reclaims leadership and truncates the entries from Term 2, or S1 regains leadership and commits the entries from Term 2. The entries from Term 2 are securely committed only after they are covered by a subsequent entry from a new leader. 

How the Raft algorithm operates in a dynamic and often unpredictable set of circumstances

This example demonstrates how the Raft algorithm operates in a dynamic and often unpredictable set of circumstances. The sequence of events, which includes multiple leaders and crashes, demonstrates the complexity of maintaining a consistent state across a distributed system. This complexity is not immediately apparent, but it becomes important in situations involving leader changes and system failures. The example emphasizes the importance of a robust and well-thought-out approach to dealing with such complexities, which is precisely what Raft seeks to address.

Additional Materials

For further study and a deeper understanding of Raft, I recommend the following materials: the original Raft paper, which is ideal for implementation. Diego Ongaro’s PhD dissertation provides more in-depth insights. Maxim Babenko’s lecture goes into even greater detail.

Raft Implementation

Let’s now move on to the Raft server implementation, which, in my opinion, benefits greatly from C++20 coroutines. In my implementation, the Persistent State is stored in memory. However, in real-world scenarios, it should be saved to disk. I’ll talk more about the MessageHolder later. It functions similarly to a shared_ptr, but is specifically designed to handle Raft messages, ensuring efficient management and processing of these communications.

struct TState {
    uint64_t CurrentTerm = 1;
    uint32_t VotedFor = 0;
    std::vector<TMessageHolder<TLogEntry>> Log;
};

In the Volatile State, I labeled entries with either L for “leader” or F for “follower” to clarify their use. The CommitIndex denotes the last log entry that was committed. In contrast, LastApplied is the most recent log entry applied to the state machine, and it is always less than or equal to the CommitIndex. The NextIndex is important because it identifies the next log entry to be sent to a peer. Similarly, MatchIndex keeps track of the last log entry that discovered a match. The Votes section contains the IDs of peers who voted for me. Timeouts are an important aspect to manage: HeartbeatDue and RpcDue manage leader timeouts, while ElectionDue handles follower timeouts.

using TTime = std::chrono::time_point<std::chrono::steady_clock>;

struct TVolatileState {
    uint64_t CommitIndex = 0; // L,F
    uint64_t LastApplied = 0; // L,F
    std::unordered_map<uint32_t, uint64_t> NextIndex; // L
    std::unordered_map<uint32_t, uint64_t> MatchIndex; // L
    std::unordered_set<uint32_t> Votes; // C
    std::unordered_map<uint32_t, TTime> HeartbeatDue; // L
    std::unordered_map<uint32_t, TTime> RpcDue; // L
    TTime ElectionDue; // F
};

Raft API

My implementation of the Raft algorithm has two classes. The first is INode, which denotes a peer. This class includes two methods: Send, which stores outgoing messages in an internal buffer, and Drain, which handles actual message dispatch. Raft is the second class, and it manages the current peer’s state. It also includes two methods: Process, which handles incoming connections, and ProcessTimeout, which must be called on a regular basis to manage timeouts, such as the leader election timeout. Users of these classes should use the Process, ProcessTimeout, and Drain methods as necessary. INode‘s Send method is invoked internally within the Raft class, ensuring that message handling and state management are seamlessly integrated within the Raft framework.

struct INode {
    virtual ~INode() = default;
    virtual void Send(TMessageHolder<TMessage> message) = 0;
    virtual void Drain() = 0;
};

class TRaft {
public:
    TRaft(uint32_t node,
        const std::unordered_map<uint32_t, std::shared_ptr<INode>>& nodes);
    void Process(TTime now,
        TMessageHolder<TMessage> message,
        const std::shared_ptr<INode>& replyTo = {});
    void ProcessTimeout(TTime now);
};

Raft Messages

Now let’s look at how I send and read Raft messages. Instead of using a serialization library, I read and send raw structures in TLV format. This is what the message header looks like:

struct TMessage {
    uint32_t Type;
    uint32_t Len;
    char Value[0];
};

For additional convenience, I’ve introduced a second-level header:

struct TMessageEx: public TMessage {
    uint32_t Src = 0;
    uint32_t Dst = 0;
    uint64_t Term = 0;
};

This includes the sender’s and receiver’s ID in each message. With the exception of LogEntry, all messages inherit from TMessageEx. LogEntry and AppendEntries are implemented as follows:

struct TLogEntry: public TMessage {
    static constexpr EMessageType MessageType = EMessageType::LOG_ENTRY;
    uint64_t Term = 1;
    char Data[0];
};

struct TAppendEntriesRequest: public TMessageEx {
    static constexpr EMessageType MessageType
        = EMessageType::APPEND_ENTRIES_REQUEST;
    uint64_t PrevLogIndex = 0;
    uint64_t PrevLogTerm = 0;
    uint32_t Nentries = 0;
};

To facilitate message handling, I use a class called MessageHolder, reminiscent of a shared_ptr:

template<typename T>
requires std::derived_from<T, TMessage>
struct TMessageHolder {
    T* Mes;
    std::shared_ptr<char[]> RawData;
    uint32_t PayloadSize;
    std::shared_ptr<TMessageHolder<TMessage>[]> Payload;

    template<typename U>
    requires std::derived_from<U, T>
    TMessageHolder<U> Cast() {...}

    template<typename U>
    requires std::derived_from<U, T>
    auto Maybe() { ... }
};

This class includes a char array containing the message itself. It may also include a Payload (which is only used for AppendEntry), as well as methods for safely casting a base-type message to a specific one (the Maybe method) and unsafe casting (the Cast method). Here is a typical example of using the MessageHolder:

void SomeFunction(TMessageHolder<TMessage> message) {
    auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
    }
    // if we are sure
    auto appendEntries = message.Cast<TAppendEntriesRequest>();
    // usage with overloaded operator->
    auto term = appendEntries->Term;
    auto nentries = appendEntries->Nentries;
    // ...
}

And a real-life example in the Candidate state handler:

void TRaft::Candidate(TTime now, TMessageHolder<TMessage> message) {
    if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) {
        OnRequestVote(std::move(maybeResponseVote.Cast()));
    } else
    if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>())
    {
        OnRequestVote(now, std::move(maybeRequestVote.Cast()));
    } else
    if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>())
    {
        OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
    }
}

This design approach improves the efficiency and flexibility of message handling in Raft implementations.

Raft Server

Let’s discuss the Raft server implementation. The Raft server will set up coroutines for network interactions. First, we’ll look at the coroutines that handle message reading and writing. The primitives used for these coroutines are discussed later in the article, along with an analysis of the network library. The writing coroutine is responsible for writing messages to the socket, whereas the reading coroutine is slightly more complex. To read, it must first retrieve the Type and Len variables, then allocate an array of Len bytes, and finally, read the rest of the message. This structure facilitates the efficient and effective management of network communications within the Raft server.

template<typename TSocket>
TValueTask<void>
TMessageWriter<TSocket>::Write(TMessageHolder<TMessage> message) {
    co_await TByteWriter(Socket).Write(message.Mes, message->Len);

    auto payload = std::move(message.Payload);
    for (uint32_t i = 0; i < message.PayloadSize; ++i) {
        co_await Write(std::move(payload[i]));
    }

    co_return;
}

template<typename TSocket>
TValueTask<TMessageHolder<TMessage>> TMessageReader<TSocket>::Read() {
    decltype(TMessage::Type) type; decltype(TMessage::Len) len;
    auto s = co_await Socket.ReadSome(&type, sizeof(type));
    if (s != sizeof(type)) { /* throw */ }
    s = co_await Socket.ReadSome(&len, sizeof(len));
    if (s != sizeof(len)) { /* throw */}
    auto mes = NewHoldedMessage<TMessage>(type, len);
    co_await TByteReader(Socket).Read(mes->Value, len - sizeof(TMessage));
    auto maybeAppendEntries = mes.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
        auto nentries = appendEntries->Nentries; mes.InitPayload(nentries);
        for (uint32_t i = 0; i < nentries; i++) mes.Payload[i] = co_await Read();
    }
    co_return mes;
}

To launch a Raft server, create an instance of the RaftServer class and call the Serve method. The Serve method starts two coroutines. The Idle coroutine is responsible for periodically processing timeouts, whereas InboundServe manages incoming connections.

class TRaftServer {
public:
    void Serve() {
        Idle();
        InboundServe();
    }

private:
    TVoidTask InboundServe();
    TVoidTask InboundConnection(TSocket socket);
    TVoidTask Idle();
}

Incoming connections are received via the accept call. Following this, the InboundConnection coroutine is launched, which reads incoming messages and forwards them to the Raft instance for processing. This configuration ensures that the Raft server can efficiently handle both internal timeouts and external communication.

TVoidTask InboundServe() {
    while (true) {
        auto client = co_await Socket.Accept();
        InboundConnection(std::move(client));
    }
    co_return;
}

TVoidTask InboundConnection(TSocket socket) {
    while (true) {
        auto mes = co_await TMessageReader(client->Sock()).Read();
        Raft->Process(std::chrono::steady_clock::now(), std::move(mes),
            client);
        Raft->ProcessTimeout(std::chrono::steady_clock::now());
        DrainNodes();
    }
    co_return;
}

The Idle coroutine works as follows: it calls the ProcessTimeout method every sleep second. It’s worth noting that this coroutine uses asynchronous sleep. This design enables the Raft server to efficiently manage time-sensitive operations without blocking other processes, improving the server’s overall responsiveness and performance.

while (true) {
    Raft->ProcessTimeout(std::chrono::steady_clock::now());
    DrainNodes();
    auto t1 = std::chrono::steady_clock::now();
    if (t1 > t0 + dt) {
        DebugPrint();
        t0 = t1;
    }
    co_await Poller.Sleep(t1 + sleep);
}

The coroutine was created for sending outgoing messages and is designed to be simple. It repeatedly sends all accumulated messages to the socket in a loop. In the event of an error, it starts another coroutine that is responsible for connecting (via the connect function). This structure ensures that outgoing messages are handled smoothly and efficiently while remaining robust through error handling and connection management.

try {
    while (!Messages.empty()) {
        auto tosend = std::move(Messages); Messages.clear();
        for (auto&& m : tosend) {
            co_await TMessageWriter(Socket).Write(std::move(m));
        }
    }
} catch (const std::exception& ex) {
    Connect();
}
co_return;

With the Raft Server implemented, these examples show how coroutines greatly simplify development. While I haven’t looked into Raft’s implementation (trust me, it’s much more complex than the Raft Server), the overall algorithm is not only simple but also compact in design.

Next, we’ll look at some Raft Server examples. Following that, I’ll describe the network library I created from scratch specifically for the Raft Server. This library is critical to enabling efficient network communication within the Raft framework.

Here’s an example of launching a Raft cluster with three nodes. Each instance receives its own ID as an argument, as well as the other instances’ addresses and IDs. In this case, the client communicates exclusively with the leader. It sends random strings while keeping a set number of in-flight messages and waiting for their commitment. This configuration depicts the interaction between the client and the leader in a multi-node Raft environment, demonstrating the algorithm’s handling of distributed data and consensus.

$ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
...
Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0
        MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176
....
$ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
$ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...
$ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1
 198k 0:00:03 [159.2k/s] [        <=>

I measured the commit latency for configurations of both 3-node and 5-node clusters. As expected, the latency is higher for the 5-node setup:

  • 3 Nodes
    • 50 percentile (median): 292,872 ns
    • 80 percentile: 407,561 ns
    • 90 percentile: 569,164 ns
    • 99 percentile: 40,279,001 ns
  • 5 Nodes

    • 50 percentile (median): 425,194 ns
    • 80 percentile: 672,541 ns
    • 90 percentile: 1,027,669 ns
    • 99 percentile: 38,578,749 ns

I/O Library

Let’s now look at the I/O library that I created from scratch and used in the Raft server’s implementation. I began with the example below, taken from cppreference.com, which is an implementation of an echo server:

task<> tcp_echo_server() {
    char data[1024];
    while (true) {
        std::size_t n = co_await socket.async_read_some(buffer(data));
        co_await async_write(socket, buffer(data, n));
    }
}

An event loop, a socket primitive, and methods like read_some/write_some (named ReadSome/WriteSome in my library) were required for my library, as well as higher-level wrappers such as async_write/async_read (named TByteReader/TByteWriter in my library).

To implement the ReadSome method of the socket, I had to create an Awaitable as follows:

auto ReadSome(char* buf, size_t size) {
    struct TAwaitable  {
        bool await_ready() { return false; /* always suspend */ }
        void await_suspend(std::coroutine_handle<> h) {
            poller->AddRead(fd, h);
        }
        int await_resume() {
            return read(fd, b, s);
        }
        TSelect* poller; int fd; char* b; size_t s;
    };
    return TAwaitable{Poller_,Fd_,buf,size};
}

When co_await is called, the coroutine suspends because await_ready returns false. In await_suspend, we capture the coroutine_handle and pass it along with the socket handle to the poller. When the socket is ready, the poller calls the coroutine_handle to restart the coroutine. Upon resumption, await_resume is called, which performs a read and returns the number of bytes read to the coroutine. The WriteSome, Accept, and Connect methods are implemented in a similar manner.

The Poller is set up as follows:

struct TEvent {
    int Fd; int Type; // READ = 1, WRITE = 2;
    std::coroutine_handle<> Handle;
};
class TSelect {
    void Poll() {
        for (const auto& ch : Events) { /* FD_SET(ReadFds); FD_SET(WriteFds);*/ }
        pselect(Size, ReadFds, WriteFds, nullptr, ts, nullptr);
        for (int k = 0; k < Size; ++k) {
            if (FD_ISSET(k, WriteFds)) {
                Events[k].Handle.resume();
            }
            // ...
        }
    }
    std::vector<TEvent> Events;
    // ...
};

I keep an array of pairs (socket descriptor, coroutine handle) that are used to initialize structures for the poller backend (in this case, select). Resume is called when the coroutines corresponding to ready sockets wake up.

This is applied in the main function as follows:

TSimpleTask task(TSelect& poller) {
    TSocket socket(0, poller);
    char buffer[1024];
    while (true) {
        auto readSize = co_await socket.ReadSome(buffer, sizeof(buffer));
    }
}
int main() {
    TSelect poller;
    task(poller);
    while (true) { poller.Poll(); }
}

We start a coroutine (or coroutines) that enters sleep mode on co_await, and control is then passed to an infinite loop that invokes the poller mechanism. If a socket becomes ready within the poller, the corresponding coroutine is triggered and executed until the next co_await.

To read and write Raft messages, I needed to create high-level wrappers over ReadSome/WriteSome, similar to:

TValueTask<T> Read() {
    T res; size_t size = sizeof(T);
    char* p = reinterpret_cast<char*>(&res);
    while (size != 0) {
        auto readSize = co_await Socket.ReadSome(p, size);
        p += readSize;
        size -= readSize;
    }
    co_return res;
}
// usage
T t = co_await Read<T>();

To implement these, I needed to create a coroutine that also functions as an Awaitable. The coroutine is made up of a pair: coroutine_handle and promise. The coroutine_handle is used to manage the coroutine from the outside, whereas the promise is for internal management. The coroutine_handle can include Awaitable methods, which allow the coroutine’s result to be awaited with co_await. The promise can be used to store the result returned by co_return and to awaken the calling coroutine.

In coroutine_handle, within the await_suspend method, we store the coroutine_handle of the calling coroutine. Its value will be saved in the promise:

template<typename T>
struct TValueTask : std::coroutine_handle<> {
    bool await_ready() { return !!this->promise().Value; }
    void await_suspend(std::coroutine_handle<> caller) {
        this->promise().Caller = caller;
    }
    T await_resume() { return *this->promise().Value; }
    using promise_type = TValuePromise<T>;
};

Within the promise itself, the return_value method will store the returned value. The calling coroutine is woken up with an awaitable, which is returned in final_suspend. This is because the compiler, after co_return, invokes co_await on final_suspend.

template<typename T>
struct TValuePromise {
    void return_value(const T& t) { Value = t; }
    std::suspend_never initial_suspend() { return {}; }
    // resume Caller here
    TFinalSuspendContinuation<T> final_suspend() noexcept;
    std::optional<T> Value;
    std::coroutine_handle<> Caller = std::noop_coroutine();
};

In await_suspend, the calling coroutine can be returned, and it will be automatically awakened. It is important to note that the called coroutine will now be in a sleeping state, and its coroutine_handle must be destroyed with destroy to avoid a memory leak. This can be accomplished, for example, in the destructor of TValueTask.

template<typename T>
struct TFinalSuspendContinuation {
    bool await_ready() noexcept { return false; }
    std::coroutine_handle<> await_suspend(
        std::coroutine_handle<TValuePromise<T>> h) noexcept
    {
        return h.promise().Caller;
    }
    void await_resume() noexcept { }
};

With the library description completed, I ported the libevent benchmark to it to ensure its performance. This benchmark generates a chain of N Unix pipes, each one linked to the next. It then initiates 100 write operations into the chain, which continues until there are 1000 total write calls. The image below depicts the benchmark’s runtime as a function of N for various backends of my library (coroio) versus libevent. This test demonstrates that my library performs similarly to libevent, confirming its efficiency and effectiveness in managing I/O operations.

Benchmark's runtime as a function of N for various backends of my library (coroio) versus libevent

Conclusion

In closing, this article has described the implementation of a Raft server using C++20 coroutines, emphasizing the convenience and efficiency provided by this modern C++ feature. The custom I/O library, which was written from scratch, is critical to this implementation because it effectively handles asynchronous I/O operations. The performance of the library was validated against the libevent benchmark, demonstrating its competency.

For those interested in learning more about or using these tools, the I/O library is available at coroio, and the Raft library at miniraft-cpp (linked at the beginning of the article). Both repositories provide a detailed look at how C++20 coroutines can be used to build robust, high-performance distributed systems.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment