Modern systems: Replication, Recovery and Fault tolerance

Containerised microservices are in vogue. Developers and architects strive for linear scalability and the services are often made stateless or serverless.

However, stateless or serverless applications are not necessarily the best solution to all sorts of problems. Some applications need not be infinitely scalable. To improve the overall system performance and thereby make a business impact, some applications store state in memory. With the state being stored in the application’s process space; replication, recovery and fault tolerance become important building blocks for the system.

Design: spoilt by choice

Modern systems often use multiple frameworks and libraries which overlap in functionality. For eg: Fault tolerance can be achieved using ZooKeeper, Kafka and AKKA. Some systems make use of all of these libraries / frameworks. With different choices, comes the risk of a non-uniform design and patterns between microservices.

As far as possible, the design techniques and patterns should have uniformity. This allows easy maintainability, supportability and developer communication through a common language that precisely states the intent of the speaker.

sidenoteMicroservice boundaries must be decided based on the functional boundaries / contexts. The integration between different microservices must be versioned APIs or Events only.

Some interesting reads: Microservice boundaries five characteristics, Identifying microservice boundaries

To address the concern of Fault-Tolerance, consider the following:

  1. Is the application stateless / serverless or stateful?
    • For stateful applications: mastership based or load-balanced deployments?
  2. What are the transaction / request volumes?
  3. What transport mechanisms are used?
    • HTTP (REST)
    • TCP/IP or UDP
    • MessageBus (like EMS)
    • Event log, Streams processing and Store system (like Kafka)
  4. Is the overall solution supporting vertical partitioning / sharding?
  5. Are the service endpoints discovered at runtime or configured?
sidenoteSome details about Service Discovery, Load balancing patterns, fault tolerance etc are discussed here: Managing services using Apache ZooKeeper. The link also describes the “Thundering herd” problem for services that have a huge demand.

Additional resources on fault-tolerance can be found here: AKKA cluster specification#intro, Achieving fault tolerance with Kafka a detailed explanation

Understanding principles of fault tolerance helps us in evaluating the frameworks / libraries for the specific needs of the application.

The choice should be made based on:

  1. Robustness of the framework and design.
  2. How well the framework supports other related concerns.
    • In some cases, there might be a need to look at “service-discovery” and “fault-tolerance” together.
    • In other cases, service discovery may not be needed and merely streaming data would suffice.
  3. Ease of support in production environments.

Making such design decisions requires a good understanding of the domain, production environment and tools used by support.

This post tries to cover the basics and principles used for designing stateful, fault tolerant and recoverable systems.

Stateful systems need replication and recovery.

Replication and Recovery

The most important aspects of replication and recovery are:

  • State (Application state / “actor” state / object state)
  • Transactions / Events
  • Transaction / Event handling code (the state transition function)

A state transition function is invoked to serve a request (transaction / event). It evaluates the transaction / event and its applicability in a given state. The state eventually gets modified or an error is reported.

Typically, a state transition function can be represented as:

f(S,E) -> S' or Error
where: 
S : current_application_state
E : transaction_or_event
S': final_application_state
f : state_transition_function
Error == S : when transaction gets ignored in case of error
Error == S": when transaction cannot be ignored

Any practical application has its state stored in multiple objects and various containers, like sequences, collections and associative containers.

A replicating or recovering application instance must arrive at the final application state: S’.

Detour-Sign-K-6717Error situations must be considered seriously in systems that can be exposed to users / programmers with malicious intent (like hackers). It is better to validate events / transactions upfront as much as possible. Cryptographers prefer to not even respond to such erroneous requests.

Hackers could disrupt or try to find bugs / loopholes in the implementation by firing bad requests. They may even delay / halt processing of genuine requests using a Denial-Of-Service attack.

In another case, there might be multiple login attempts to try cracking a password. These login attempts would result in an Error, and therefore result in no state-change in the session management system. However, the number of attempts must be tracked. All of the load balanced instances of a user login system must be aware of the number of attempts made so far.

This can be achieved by having a sub-system within your application which is listening on Errors or Heavy load events, and whenever a situation of potential attack is identified, it raises an alerting Event which changes the state to S”.

What could be done in such a scenario is a separate topic of its own. This post mainly highlights the need to identify “potential attack” as a state.

State change as incremental update

A replicating (often referred to as a secondary)Ā instance acts as a backup for the request processing (primary) instance and therefore must sync up with the primary instance. It must arrive at the final_application_state. Similarly, if all the application instances are down, a recovering instance must be up and running with the last valid application state (final_application_state).

A replicating or recovering instance need not perform the same validations and computations that the primary had performed. Only the final state is important. If the primary performs the computation and simply declares what the state change is, then both, the replicating and recovering instances, can apply the state change and arrive at the final_application_state.

This is very easily achieved by breaking the request processing (state transition function) as follows:

  1. Validate and find the state change (incremental update) that needs to be applied to current_application_state.
  2. Persist the incremental update and then apply it to the internal state.
    • The primary instance must not apply the incremental update to its internal state until the persistence is successful. If the persistence is unsuccessful, it must fail (Error out) the transaction, which was being processed. This ensures that the recovery will always be consistent with the final_application_state.

The state transition function can be represented as follows:

f = h.g (apply h after g)
therefore:
h ( I = g(S,E) ) -> S' or Error
where:
g : evaluates the event, and
    returns an incremental update
    that can be applied on S or Error
I : incremental update represented as an object 
    OR 
    Error
h : persists / relays the incremental update,
    and then applies the incremental update on S.
    OR
    raises Error event for an errors-listener

The incremental update “I” may be persisted to a No-SQL database or persistent queue or Kafka topic (the log store). The log here refers to the series of incremental updates and not to the application log that gets written to log files.

The replicating instance must subscribe for the incremental updates using the mechanisms of the chosen store. As it receives the incremental updates, it must apply the incremental update to its internal state. The replicating or recovering instance need not evaluate the incremental update. It could simply apply the state change.

An incremental update must ideally be small and must include only the changed values along with the pointers / IDs to uniquely identify the object that needs to change.

sidenoteIf the store is an “ancient” one like some “flatfile / binary-data-file” or “RDBMS” then the processing instance could replay the incremental update for the replicating instances on TCP-IP / HTTP / REST or EMS etc. To achieve such replay mechanisms, we need “Service Registry” which lets the processing instance know of existence of replicating instances and their endpoints. Some dated systems use configurations for identifying replicating instances and their endpoints.

When the application is first started

When the application is made live in the production environment, there is an “initial_application_state” (let’s call it S0). This is also the “current_application_state”.

Replication

In the above picture, there are 2 application instances. One is simply acting as a hot back-up of the other (secondary instance). The horizontal axis (timeline) pointing towards right represents time. The vertical arrows represent events like receiving transactions, arriving at the incremental update after validations and computations etc. There are incoming and outgoing vertical arrows which represent the nature of the events, for eg: persisting an incremental update to the log store is an outgoing arrow from the primary instance. The small circles on the timeline represent the application state. How the state evolves over time can be seen clearly.

A request R1 is received by the processing application instance and it evaluates the request. It finds that this is a good request and creates an incremental update I1. Then I1 is sent to the log store, and if that write is successful, I1 is applied on top if S0 to arrive at S1. This continues for a few more requests and it finally arrives at state S3, which is the “final application state”.

Meanwhile, the replicating instance has subscribed for updates. As soon as it sees an incremental update I1, it applies that to the state S0 and arrives at state S1. This continues for as many incremental updates that it receives and the replicating instance arrives at state S3. It doesn’t need to know anything about the requests like R1.

sidenoteThe log store is expected to maintain the sequence of incremental updates. Any change in sequence may result in a different state in many cases.

We are free to create as many replicating instances as we want, as a subscription mechanism is used by the replicating application instance.

Modern systems, which support subscription on the log store, enable replication and recovery to use the same mechanisms of replay.

State-changing and Non-state-changing requests

Some requests change the application state. For eg: Requests like creation of an order or alteration of an order. These can be classified as “state-changing” requests.

However, there are requests which simply ask for the current state of an order or some aggregation like “total” number of orders from a particular client etc. These can be classified as “non-state-changing” requests.

The non-state-changing requests can be processed by any application instance, as all the application instances are replicating state. The replication may have a slight delay, and in most cases it is acceptable.

It is important to identify state changing and non state changing requests during application design, as it reduces load on the primary. This helps in increasing the overall responsiveness of the system.

sidenoteMany modern applications use the Command Query Responsibility Segregation (CQRS) pattern. The “Query” processes handle the non-state-changing request. These processes typically receive the state-changes and then format those in a way that makes the query very responsive. They even persist data in a read optimised format. Nonetheless, even in this pattern, the classification of “state-changing” and “non-state-changing” requests becomes important.

Fault tolerance and Flushqueue

In the mechanism described above, replicating instances merely catch up with the state changes that happen in the primary instance. Which instance acts as a primary, is determined by the process called leader election while the rest of the instances act as secondary instances.

Mastership / Leader election

A protocol is needed to decide which instance is the request processing node. It could be as simple as asking a service registry to make this decision or as complex as running a consensus mechanism between instances. The process of arriving at this decision is called leader election.

Node failure

Whenever the primary instance dies / crashes, leader election needs to be performed again. The secondary instance that gets elected as the new leader, may be trailing behind in replication and therefore it may be behind the desired “final_application_state”. It must replay all the incremental state changes until it reaches the final application state after which it can start processing requests.

Note the subtle difference between gaining mastership and becoming a primary.

Client groups and leader election

To reduce the load on a single primary instance, client groups may be formed and the leader election process then declares leaders for each client group. The result of leader election is a map of client group to instance. Each instance is identified by an InstanceID. Instance IDs could simply be UUIDs created by each instance at start up.

result of leader election => 
{ 
  ClientGroup1 -> f850dee3-74b6-480d-b8b7-3774b9b7cca9,
  ClientGroup2 -> 6a6f08dd-106a-4b24-9a4e-4b6fc9935304,
  ClientGroup3 -> f850dee3-74b6-480d-b8b7-3774b9b7cca9,
  ClientGroup4 -> 6a6f08dd-106a-4b24-9a4e-4b6fc9935304,
  ...
}

To support vertical partitioning based on client groups, both incoming requests and the incremental updates need to be partitioned. A separate subscription with the log store is needed for each client group.

sidenoteThere are many interesting recipes on ZooKeeper: ZooKeeper Recipes. Curator, another Apache project is based on ZooKeeper and has some more interesting recipes: Curator Recipes.

Kafka supports partitioning and client groups. These can be used for leader election. A number of partitions can be created on a topic. All the instances can subscribe with the same group ID, and the partitions are uniformly distributed by Kafka across these instances. Kafka documentation has more details on partitioning and group IDs.

Flushqueue process

To achieve fault tolerance, the replicating / recovering nodes must know when they have finally caught up and are ready to start processing requests. The newly elected leader must apply all the remaining incremental updates. This is called flushqueue. If the log store provides a functionality to let us know how many pending incremental updates exist, we can use it. Otherwise, the newly elected leader can simply inject a UUID in the log store. When the UUID is received via the replay mechanism, it becomes the primary.

To become primary, the application instance must do the following:

  1. Perform flushqueue,
  2. Stop subscription for incremental updates,
  3. Start processing state-changing client requests, and inject incremental updates to the log store.

Booting up the system

When the system boots up, all instances start as secondary. Each instance diligently starts the process of replicating states by subscribing to the log store. If we have a pair of instances that are starting, both do the following tasks in parallel:

  1. Start replication
  2. Participate in leader election

Snapshots, replication and recovery

If the request volume is very high, then applying all the incremental changes from the beginning would be slow. Startup time for the application will increase significantly with time.

If the application is processing about a million events a day and needs to store the state for at least a week, then it needs to apply 7 million incremental updates during recovery.

S0 ->->-> S1000 ->->-> S1000000 ->->-> S7000000

Snapshoting can help significantly. The application could create a snapshot of its state periodically, say every hour. Then the worst case scenario for boot up would be just 42000 incremental updates to be applied over the last snapshot. That is 167 times lesser incremental updates.

S6958000 -> S7000000

Snapshots can be created by either the application or an auxiliary process. The snapshot captures state changes until the index of a particular incremental update. This is called the last index. The last index needs to be stored along with the snapshot. The snapshot could be stored on the same log store. If Kafka is used, a separate snapshot topic can be created. If NoSQL databases are used, separate buckets or indexes can be used for the snapshot.

At the time of startup, the application reads the snapshot and starts replaying incremental updates beyond the stored last index.

Auxiliary process

The role of the auxiliary process is to create new snapshots (every hour) and purge some very old snapshots and unneeded incremental updates.

To ensure that the snapshot is not corrupted, the auxiliary process can store checksums and validate them. If it detects a corruption (mismatch of checksums), it can raise alerts for “corrupt snapshot” and roll back to a previous snapshot.

Increasing the effectiveness of snapshots

Startup time of the application is guided by the size of the snapshot and the number of incremental updates. Some other factors that affect the startup time are:

  1. The number of “records” sent over the wire.
  2. The size of the records.
  3. Number of interactions between the log store and the application
    • Higher number of interactions mean more TCP messages back and forth between the log store and the application. This causes bigger delay due to the protocol level acks / retransmissions of packets etc.

Batching records while reading from the log store reduces the number of back and forth interactions between the two systems. This speeds up the startup.

Storing the snapshot in a compressed format reduces the amount of data over the wire. The application can simply uncompress the snapshot and create the in-memory model.

Versioning

As features get added, the data structures for capturing state evolve and so does the data for the incremental update.

Although the application has evolved, the historical data (past transactions) is still needed. Just because a new version of the application is being released, transactions done in the past hour or day or months do not become irrelevant. The application must therefore interpret past incremental updates with old schema and apply it to the new data structure.

To support this behaviour, the application stores a version number / string along with the incremental update. The version number can be stored in some header if the state store allows that. Kafka allows storing header information (Kafka record header). If the state store does not support headers, then the incremental update may store the version as a part of the record. Two examples of how this can be achieved using JSON and Binary serialization formats are shown below:

json serialized incremental update:
{
"ClassName" : "IncrementalUpdateForOrders",
"Version" : "2.3.1",
"UpdateData" : {
  ...
  ...
  }
}

binary serialized incremental update:
ClassID => int32
Version => int16
UpdateData_v231 =>
  Field_1_Length => int32
  Field_1 => Byte[]
  Field_2 => Char
  Field_3 => int64

Conclusion

In past, the recovery and replication were two separate procedures. It is not so with modern systems. Complexity is significantly reduced by using the same procedures for replication and recovery. Although open-source libraries and frameworks shield this from the developers, the application developer must understand the basics to improve the effectiveness of recovery and replication.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s