HBase replication is a way to copy data between HBase deployments. It can serve as a disaster recovery solution and can contribute to provide higher availability at the HBase layer. It can also serve more practically; for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce" cluster which will process old and new data and ship back the results automatically.
The basic architecture pattern used for HBase replication is (HBase cluster) master-push; it is much easier to keep track of what’s currently being replicated since each region server has its own write-ahead-log (aka WAL or HLog), just like other well known solutions like MySQL master/slave replication where there’s only one bin log to keep track of. One master cluster can replicate to any number of slave clusters, and each region server will participate to replicate their own stream of edits. For more information on the different properties of master/slave replication and other types of replication, please consult How Google Serves Data From Multiple Datacenters.
The replication is done asynchronously, meaning that the clusters can be geographically distant, the links between them can be offline for some time, and rows inserted on the master cluster won’t be available at the same time on the slave clusters (eventual consistency).
The replication format used in this design is conceptually the same as MySQL’s statement-based replication . Instead of SQL statements, whole WALEdits (consisting of multiple cell inserts coming from the clients' Put and Delete) are replicated in order to maintain atomicity.
The HLogs from each region server are the basis of HBase replication, and must be kept in HDFS as long as they are needed to replicate data to any slave cluster. Each RS reads from the oldest log it needs to replicate and keeps the current position inside ZooKeeper to simplify failure recovery. That position can be different for every slave cluster, same for the queue of HLogs to process.
The clusters participating in replication can be of asymmetric sizes and the master cluster will do its “best effort” to balance the stream of replication on the slave clusters by relying on randomization.
The guide on enabling and using cluster replication is contained in the API documentation shipped with your HBase distribution.
The most up-to-date documentation is available at this address.
The following sections describe the life of a single edit going from a client that communicates with a master cluster all the way to a single slave cluster.
The client uses a HBase API that sends a Put, Delete or ICV to a region server. The key values are transformed into a WALEdit by the region server and is inspected by the replication code that, for each family that is scoped for replication, adds the scope to the edit. The edit is appended to the current WAL and is then applied to its MemStore.
In a separate thread, the edit is read from the log (as part of a batch) and only the KVs that are replicable are kept (that is, that they are part of a family scoped GLOBAL in the family's schema and non-catalog so not .META. or -ROOT-). When the buffer is filled, or the reader hits the end of the file, the buffer is sent to a random region server on the slave cluster.
Synchronously, the region server that receives the edits reads them sequentially and separates each of them into buffers, one per table. Once all edits are read, each buffer is flushed using the normal HBase client (HTables managed by a HTablePool). This is done in order to leverage parallel insertion (MultiPut).
Back in the master cluster's region server, the offset for the current WAL that's being replicated is registered in ZooKeeper.
The edit is inserted in the same way.
In the separate thread, the region server reads, filters and buffers the log edits the same way as during normal processing. The slave region server that's contacted doesn't answer to the RPC, so the master region server will sleep and retry up to a configured number of times. If the slave RS still isn't available, the master cluster RS will select a new subset of RS to replicate to and will retry sending the buffer of edits.
In the mean time, the WALs will be rolled and stored in a queue in ZooKeeper. Logs that are archived by their region server (archiving is basically moving a log from the region server's logs directory to a central logs archive directory) will update their paths in the in-memory queue of the replicating thread.
When the slave cluster is finally available, the buffer will be applied the same way as during normal processing. The master cluster RS will then replicate the backlog of logs.
This section describes in depth how each of replication's internal features operate.
When a master cluster RS initiates a replication source to a slave cluster, it first connects to the slave's ZooKeeper ensemble using the provided cluster key (that key is composed of the value of hbase.zookeeper.quorum, zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It then scans the "rs" directory to discover all the available sinks (region servers that are accepting incoming streams of edits to replicate) and will randomly choose a subset of them using a configured ratio (which has a default value of 10%). For example, if a slave cluster has 150 machines, 15 will be chosen as potential recipient for edits that this master cluster RS will be sending. Since this is done by all master cluster RSs, the probability that all slave RSs are used is very high, and this method works for clusters of any size. For example, a master cluster of 10 machines replicating to a slave cluster of 5 machines with a ratio of 10% means that the master cluster RSs will choose one machine each at random, thus the chance of overlapping and full usage of the slave cluster is higher.
Every master cluster RS has its own znode in the replication znodes hierarchy. It contains one znode per peer cluster (if 5 slave clusters, 5 znodes are created), and each of these contain a queue of HLogs to process. Each of these queues will track the HLogs created by that RS, but they can differ in size. For example, if one slave cluster becomes unavailable for some time then the HLogs should not be deleted, thus they need to stay in the queue (while the others are processed). See the section named "Region server failover" for an example.
When a source is instantiated, it contains the current HLog that the region server is writing to. During log rolling, the new file is added to the queue of each slave cluster's znode just before it's made available. This ensures that all the sources are aware that a new log exists before HLog is able to append edits into it, but this operations is now more expensive. The queue items are discarded when the replication thread cannot read more entries from a file (because it reached the end of the last block) and that there are other files in the queue. This means that if a source is up-to-date and replicates from the log that the region server writes to, reading up to the "end" of the current file won't delete the item in the queue.
When a log is archived (because it's not used anymore or because there's too many of them per hbase.regionserver.maxlogs typically because insertion rate is faster than region flushing), it will notify the source threads that the path for that log changed. If the a particular source was already done with it, it will just ignore the message. If it's in the queue, the path will be updated in memory. If the log is currently being replicated, the change will be done atomically so that the reader doesn't try to open the file when it's already moved. Also, moving a file is a NameNode operation so, if the reader is currently reading the log, it won't generate any exception.
By default, a source will try to read from a log file and ship log entries as fast as possible to a sink. This is first limited by the filtering of log entries; only KeyValues that are scoped GLOBAL and that don't belong to catalog tables will be retained. A second limit is imposed on the total size of the list of edits to replicate per slave, which by default is 64MB. This means that a master cluster RS with 3 slaves will use at most 192MB to store data to replicate. This doesn't account the data filtered that wasn't garbage collected.
Once the maximum size of edits was buffered or the reader hits the end of the log file, the source thread will stop reading and will choose at random a sink to replicate to (from the list that was generated by keeping only a subset of slave RSs). It will directly issue a RPC to the chosen machine and will wait for the method to return. If it's successful, the source will determine if the current file is emptied or if it should continue to read from it. If the former, it will delete the znode in the queue. If the latter, it will register the new offset in the log's znode. If the RPC threw an exception, the source will retry 10 times until trying to find a different sink.
If replication isn't enabled, the master's logs cleaning thread will delete old logs using a configured TTL. This doesn't work well with replication since archived logs passed their TTL may still be in a queue. Thus, the default behavior is augmented so that if a log is passed its TTL, the cleaning thread will lookup every queue until it finds the log (while caching the ones it finds). If it's not found, the log will be deleted. The next time it has to look for a log, it will first use its cache.
As long as region servers don't fail, keeping track of the logs in ZK doesn't add any value. Unfortunately, they do fail, so since ZooKeeper is highly available we can count on it and its semantics to help us managing the transfer of the queues.
All the master cluster RSs keep a watcher on every other one of them to be notified when one dies (just like the master does). When it happens, they all race to create a znode called "lock" inside the dead RS' znode that contains its queues. The one that creates it successfully will proceed by transferring all the queues to its own znode (one by one since ZK doesn't support the rename operation) and will delete all the old ones when it's done. The recovered queues' znodes will be named with the id of the slave cluster appended with the name of the dead server.
Once that is done, the master cluster RS will create one new source thread per copied queue, and each of them will follow the read/filter/ship pattern. The main difference is that those queues will never have new data since they don't belong to their new region server, which means that when the reader hits the end of the last log, the queue's znode will be deleted and the master cluster RS will close that replication source.
For example, consider a master cluster with 3 region servers that's replicating to a single slave with id '2'. The following hierarchy represents what the znodes layout could be at some point in time. We can see the RSs' znodes all contain a "peers" znode that contains a single queue. The znode names in the queues represent the actual file names on HDFS in the form "address,port.timestamp".
/hbase/replication/rs/ 18.104.22.168,60020,123456780/ peers/ 2/ 22.214.171.124,60020.1234 (Contains a position) 126.96.36.199,60020.1265 188.8.131.52,60020,123456790/ peers/ 2/ 184.108.40.206,60020.1214 (Contains a position) 220.127.116.11,60020.1248 18.104.22.168,60020.1312 22.214.171.124,60020, 123456630/ peers/ 2/ 126.96.36.199,60020.1280 (Contains a position)
Now let's say that 188.8.131.52 loses its ZK session. The survivors will race to create a lock, and for some reasons 184.108.40.206 wins. It will then start transferring all the queues to its local peers znode by appending the name of the dead server. Right before 220.127.116.11 is able to clean up the old znodes, the layout will look like the following:
/hbase/replication/rs/ 18.104.22.168,60020,123456780/ peers/ 2/ 22.214.171.124,60020.1234 (Contains a position) 126.96.36.199,60020.1265 188.8.131.52,60020,123456790/ lock peers/ 2/ 184.108.40.206,60020.1214 (Contains a position) 220.127.116.11,60020.1248 18.104.22.168,60020.1312 22.214.171.124,60020,123456630/ peers/ 2/ 126.96.36.199,60020.1280 (Contains a position) 2-188.8.131.52,60020,123456790/ 184.108.40.206,60020.1214 (Contains a position) 220.127.116.11,60020.1248 18.104.22.168,60020.1312
Some time later, but before 22.214.171.124 is able to finish replicating the last HLog from 126.96.36.199, let's say that it dies too (also some new logs were created in the normal queues). The last RS will then try to lock 188.8.131.52's znode and will begin transferring all the queues. The new layout will be:
/hbase/replication/rs/ 184.108.40.206,60020,123456780/ peers/ 2/ 220.127.116.11,60020.1378 (Contains a position) 2-18.104.22.168,60020,123456630/ 22.214.171.124,60020.1325 (Contains a position) 126.96.36.199,60020.1401 2-188.8.131.52,60020,123456790-184.108.40.206,60020,123456630/ 220.127.116.11,60020.1312 (Contains a position) 18.104.22.168,60020,123456630/ lock peers/ 2/ 22.214.171.124,60020.1325 (Contains a position) 126.96.36.199,60020.1401 2-188.8.131.52,60020,123456790/ 184.108.40.206,60020.1312 (Contains a position)
Yes, this is for much later.
You can use the HBase-provided utility called CopyTable from the package org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to bulk copy data.
Here's a list of all the jiras that relate to major issues or missing features in the replication implementation.