Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
If you are looking for a high-performing distributed storage for your modern database management system, TiKV can be the one. It’s the potential of this product that it was accepted by CNCF in 2018 and has graduated in terms of project maturity.
Read this article to know everything about TiKV.
The name of this project is inspired from Titanium (‘Ti’), as the creators kept the element’s property in mind. ‘KV’ is for the ‘key-value’ combination in reference to the databases where it is majorly used. If you are wondering about how to pronounce it, it’s ‘Tai-K-V.’
TiKV is designed for scalability, super low latency, and added simplicity. It is a distributed key-value database without heavy dependencies on existing distributed file systems, unlike its counterparts. The solution aims at achieving similar functionality to Google Spanner, HBase, and F1.
The above image depicts the major components of the solution. Here is a quick description to explain it all better:
Regions: It is the most basic unit relating to the data range and movement of key-value pairs. Regions are copied to multiple nodes and are called ‘Peer.’ Various peers, when grouped, are called the Raft group.
Node: An endpoint in a cluster is called a node. This endpoint is a physical object (e.g., a device) and may contain multiple stores that further comprise multiple regions.
Placement Driver: Abbreviated as PD, it is the core component of the overall TiKV system. You may consider it as the brain of the product that stores the meta-data related to Nodes, Region-mapping, and Stores. The collected information is used for decision-making related to load-balancing and data storage location/path.
Store: Each store utilizes local disks to save the data and comprises a RocksDB.
The previous section covered the TiKV components. These components require communicating with each other to perform their destined tasks, and that’s where the Protocol Buffer Protocol finds its application. However, as Rust does not offer gRPC compatibility, the protocol format is defined explicitly by TiKV.
Each communication happens by exchanging one or more messages, where the message format is Header + Payload.
Here, the Payload contains the Protocol Buffer’s data and has the length as specified in the header. So, the final message/payload is read as per the header’s details and decoding it done accordingly.
Header is a 16-byte sequence and has the following format: | 0xdaf4(2 bytes magic value) | 0x01(version 2 bytes) | msg\_len(4 bytes) | msg\_id(8 bytes) |
The project named kvproto project has implemented the TiKV interaction-specific protocol. Another project tipB comprises its algorithm that takes care of push-down.
So, if you want to understand the TiKV architecture better, these files from kvproto can help:
metapb.proto: This file has everything that a TiKV placement driver (PD) might need. All the meta-data related to Peer, Store, Region, Raft, etc. is kept in it.
pdpb.proto: It is the mediator protocol that helps TiKV and PD communicate with each other.
msgpb.proto: As all messages follow the same interaction during the internal TiKV interactions, it's the MessageType that is used as the identifier.
raftpb.proto: Ported from etcd, it is essential for Raft (explained later in the article). Also, the file must be consistent when matched with etcd.
coprocessor.proto: Coprocessor (explained later in the article) helps TiKV perform Push-Down operations.
raft_serverpb.proto: It helps the Raft nodes to communicate smoothly.
mvccpb.proto: This file supports the internal MVCC.
raft_cmdpb.proto: Upon Raft’s implementation, it’s this command that applies.
kvrpcpb.proto: This file comprises the Key-Value protocol that governs TiKV transactions.
Now, if you want to utilize TiKV API for an external project, you must use the following files:
raft_cmdpb.proto: When you just need the key-value (KV) feature (basic).
kvrpcpb.proto: When you need transactional KV features.
coprocessor.proto: When you need the Push-Down features.
Raft is a popular algorithm that helps various systems achieve better performance or fault-tolerance for relatively independent sub-problems. TiKV can use this consensus algorithm to achieve high consistency in distributed systems – independently. You can utilize it as required for your project.
The algorithm is chosen as it is simple to implement, production-ready, highly practical, and migration-friendly. In fact, the Raft in TiKV is fully-migrated from etcd only.
Wondering about how to use Raft in your TiKV project? Check these steps:
Storage-specific details related to HardState & ConfState are returned by this function.
fn initial_state(&self) -> Result<RaftState>;
This step will return the log entries between the low and high level/range.
It will fetch the log entry’s term specific to the corresponding log-index.
fn term(&self, idx: u64) -> Result<u64>;
// 4. It fetches the 1st log-entry index at current position:
fn first_index(&self) -> Result<u64>;
// 5. It fetches the very last log-entry index at current position:
fn last_index(&self) -> Result<u64>;
// 6. It crafts the current snapshot and outputs it
fn snapshot(&self) -> Result<Snapshot>;
Step 1: Through this step, the Raft’s storage properties are defined and deployed.
Step 2: For this, it creates a raw node object to pass the initial state details (configuration and hardware/storage).
In the config data, periodical ticks, e.g. election_tick & heartbeat_tick are most essential, as Raft steps are performed as per them. The leader sets a heartbeat frequency and as the heartbeat_tick’s value reaches this threshold, heartbeats are sent to the followers and the elapse is reset.
At every follower’s end, the occurrence of election-tick is checked against the elapsing election. When the threshold is reached, the election/consensus process is initiated.
Step 3: After a raw node’s creation, as per the tick period (say, 100ms), this node’s tick interface will be called repeatedly.
Step 4: When there is a need to write the data by TiKV Raft, it will simply call the Propose interface that replicates arbitrary binary-formatted data as its parameters. Data-handling depends totally upon external logics.
Step 5: When there is a need to modify the membership, it will be done using the propose_conf_change interface. For this, the raw node has to send a ConfChange object when it is called. Adding or removing a particular node will be tackled accordingly.
Step 6: When the Tick & Propose like functions are called, Raft will set the concerned raw node’s state to Ready. It may have any of the 3 meanings:
The raw node comprises messages to be sent.
The raw node has entries, hard state, and the snapshot that require to be saved in the Raft storage
The raw node comprises committed_entries to be deployed to other state machines.
Once this status is handled by Raft, it will inform the next node/process in the ready state using the Advance function.
TiKV uses Raft through a Rust Library called Mio. Here’s the process:
Set a base tick-timer duration for Raft, say 100 ms, and on each timeout, re-register the timer and call the Tick of the raw node.
Use Mio to receive external commands via its function ‘notify,’ and when it happens, create a call for the interface called Propose/propose_conf_change.
Check if a Raft is ready to proceed during mio tick callback (when an event loop is ending), and if so, initiate the ready process.
The same steps are followed for all Raft groups. Each of these groups is independent and is related to a particular region.
Split and Merge
When the process begins, there exists just 1 TiKV region with its range as (-inf, +inf). The first split happens with the arrival of data as soon as it reaches the threshold of 64MB. The next regions are also created using splitting (upon reaching the threshold) recurringly, as per the Split Key.
The TiKV roadmap has a merge process too, but it is yet to be implemented in practical terms. In this process, contrary to the above, the adjacent regions with too little data are merged to form a bigger region.
As told previously, PD forms the brain of a TiKV cluster. It is responsible for ensuring the high consistency and availability of the cluster.
Achieving High Availability
Now, as it’s the central point of control, it may also act as the single point of failure. To avoid this, we may start various PD servers and elect one of these servers as the group's leader. The selection of the leader happens through the election mechanism in etcd, and therefore, is fair.
This selected leader takes care of external communication and provides services to the outsiders. When it becomes unresponsive due to failure or other reasons, another leader is elected utilizing the same election process.
Achieving High Consistency
When one leader is down, another leader takes their place. However, what about consistency? How can one ensure that the new leader has the latest and consistent data?
For this, PD data is kept with etcd. Being a distributed KV store, it can confirm data’s consistency level. The new PD (leader) will fetch this data present with etcd.
The previous versions had external etcd service. However, the current versions have the PD embedded in etcd, making it fast, simple to implement, and performance-oriented.
At present, the main responsibilities of the Placement Driver include:
To create universally unique timestamps for TiDB’s distributed transactions using its TSO service (TimeStamp Oracle).
To automatically balance tiKV regions by using heartbeat or regular triggering for the TiKV PDs or clusters.
To generate universally unique IDs for new TiKV stores and regions.
TiKV takes its inspiration from Google’s Percolator and Xiaomi - Themis when it comes to performing and handling transactions. However, the TiKV model is a bit different and is more optimized. Here is where it differs:
In Percolator, the TSO timestamp service allocates a monotonically-rising timestamp. Though the functioning of TSO is similar in our case too, TSO’s work is done in PD in TiKV, and its information is stored in etcd regularly. It is a pure memory operation
The Lock related information is kept with RocksDC in column family (CF). In TiKV, it is allowed to use extra columns CF in order to improve the Lock processing speed and simultaneous operations.
Discovering the left-out Locks (when we use extra CF) is easy, and so is its cleaning.
Concurrent transactions have the time series set as per the TSO. In this scenario, external clients can help resolve conflicts, such as early or unexpected transaction termination.
Here’s how a typical transaction functions in TiKV:
The client fetches the value of startTS, i.e., real-time timestamp, from the TSO as soon as the transaction (t1) begins.
While t1 proceeds, TiKV serves the requests. All read operations can only fetch the data written before startTS through RPC requests via MVCC. Write operations in TiKV utilize the optimistic concurrency control mechanism. So, the TiKV client writes on his own side and the data is cached for commit. It doesn’t impact other transactions or actual data until the commit.
TiKV initiates the commit, which completes in 2 phases (Prewrite and Final Commit). Here, the process does not have any dedicated transaction manager. As soon as the commit state of the primary key, selected through quorum, identifies a transaction ready to commit, the process begins.
Pre-write: The client shares the data created to update various TiKV servers. If a conflict is found, the transaction is aborted and rolled back. If everything is good, a new timestamp ‘commitTS’ is obtained from the TSO, and the data reaches the next phase.
Commit: Now, the received request, with PrimaryKey attached, reaches servers. TiKV writes its data with commitTS timestamp and clears the related Locks. To ensure that the operation is performed at an optimal cost, all participant keys (in an async manner) related to T1 are submitted back.
This concept matches the process used in HBase. However, the TiKV coprocessor doesn’t load dynamically. Instead, it is processed in a static manner. Also, the major role of coprocessor is in serving the TiDB TiKV in scenarios, e.g., Split or pushdown. Here is how it helps TiKV:
For any Split request, the legality of its Key must be confirmed. It’s because a new version for a TiDB row may arrive due to region-splitting, before a split actually happens. And it may shift the key to another region, making it impossible to handle the row’s data atomically. At this point, the Split-coprocessor can shift the SplitKey to the Region requiring it, making the key available during the split.
For any push-down request, coprocessor’s task is to boost the TiDB’s performance. It can push-down simple queries, such as select count(*) to related nodes that can perform this operation on their end and submit the output back to tiDB which declares it for the client.
Here’s how this process will proceed:
TiDB will parse the SQL query as per the Table’s range (say t1).
It is found that the data requested is present in Regions 1 and 2 only, so the push-down is needed for these regions only.
Upon receiving the command, both regions will use the Raft procedure to fetch their data’s snapshot.
Both regions will traverse these snapshots to figure out the value for the count() function.
TiDB receives their answers, sums them up, and shares the sum as the output.
A Quick Study of Essential Procedures
Wondering how TiKV serves a GET or PUT request in general, or how the replicas are updated? Let us explain the answer in this section.
The Key-Value Operation
TiKV may use three types of such operations, namely – simple, push-down, or transactional. The selection of operation depends upon the nature of the request. However, in the end, each such operation is treated as a simple KV operation (at present).
See how a simple KV operation for the PUT query proceeds:
The Put command is received from the client’s end (e.g., k1 v1).
As per the command, the region ID for the key, k1 is fetched first. The region peers from the PD leader are fetched alongside.
Once the client has the information fetched at step 2, it sends the same to the TiKV node alongside its put request.
TiKV server, upon finding this request, uses the Mio channel to notify the internal RaftStore thread. The channel uses its callback function to tell the TiKV server about its action.
The Raft Store thread confirms if the received request is legit. If yes, it looks for the peer in the Leader’s region. Once the peer is found, the request is encoded as a binary array and sent to the Propose interface to initiate the Raft process.
At this point, when the handle is ready, the Raft log will record a new entry and forward it to the other followers simultaneously.
When the entry is accepted and appended by the majority (nodes) in the region, the commit happens. Now, the entry is available in the committed_entries list, from where, it can be decoded and the corresponding command can be processed in RocksDB
The entry’s callback, upon the application of entry-log by the leader, will be called and a response for the client will be fetched.
The process for the GET request is also the same. It implies that the requests are only processed upon their replication by a majority of Raft nodes so that the distributed system could maintain data linearizability.
The TiKV raft may have the followers with the read service provision capability, or the leader leasing facility to offer the read service without the involvement of Raft replicated log. The purpose of these proposed ideas is to optimize the TiKV performance.
Stores in the TiKV maintain their multiple copies/replicas, in order to ensure cybersecurity of their data.
These replicas are placed as peers to each other in the storage. When a region has insufficient replicas, we will add more. And when a region has more than sufficient replacements, we will delete a few.
The Raft Membership Change component of TiKV takes care of modifying the replicas and completing the above two processes. However, the timestamp and the process for the region modification is controlled by PD.
To simplify, if PD is the manager, and Membership Change functions as the actor.
Now, see the steps of an example process to understand what exactly happens:
PD receives heartbeat (trigger) from all regions on a regular basis. This trigger shares the peers' details and other essential data with PD.
PD checks the heartbeat data for each region, and tries to understand the replica count for each of them. It matches this count with the corresponding region setup to confirm if there exist sufficient replicas/peers.
Now, consider that a region should have 5 replicas as per its setup but it just has 4 as per the heartbeat. PD will look for a suitable Store and send a command ‘ChangePeer’ to the region in question.
Upon receiving this command, the region will verify if it needs to replicate its peers to another Store. If so, it will send a ChangePeer request via the Raft procedure. Upon log’s application, the region meta will have an updated peer information and the Membership Change procedure will terminate upon its successful completion.
Note: Till now, only the meta of the region is updated with the replica’s information. Later on, if the Leader realized that the new Follower comprises no data, it will send a snapshot to it.
Note 2: The Raft paper has a different deployment process explained for the TiKV & etcd Membership Change than how it actually works. In it, Region meta is modified when we run the command ‘Propose.’ However, neither TiKV nor etcd adds the peer details to the Region meta before the log’s application.
TiKV is a reliable distributed database solution that is striving to achieve more in the future, as clear from its roadmap. Even at the present stage, the tool is now mature as per CNCF and can be used in commercial projects.
The above article must have helped you understand it better. So, if its architecture and processes look reasonable and performance-oriented, this solution can form an efficient part of your next project. For example, a TiKV kubernetes solution could lead to innovative results.