Chapter 1. Apache HBase Operational Management

Table of Contents

1.1. HBase Tools and Utilities
1.1.1. Canary
1.1.2. Health Checker
1.1.3. Driver
1.1.4. HBase hbck
1.1.5. HFile Tool
1.1.6. WAL Tools
1.1.7. Compression Tool
1.1.8. CopyTable
1.1.9. Export
1.1.10. Import
1.1.11. ImportTsv
1.1.12. CompleteBulkLoad
1.1.13. WALPlayer
1.1.14. RowCounter and CellCounter
1.1.15. mlockall
1.1.16. Offline Compaction Tool
1.1.17. hbase clean
1.1.18. hbase pe
1.1.19. hbase ltt
1.2. Region Management
1.2.1. Major Compaction
1.2.2. Merge
1.3. Node Management
1.3.1. Node Decommission
1.3.2. Rolling Restart
1.3.3. Adding a New Node
1.4. HBase Metrics
1.4.1. Metric Setup
1.4.2. Disabling Metrics
1.4.3. Discovering Available Metrics
1.4.4. Most Important RegionServer Metrics
1.5. HBase Monitoring
1.5.1. Overview
1.5.2. Slow Query Log
1.5.3. Block Cache Monitoring
1.6. Cluster Replication
1.6.1. Life of a WAL Edit
1.6.2. Replication Internals
1.6.3. Replication Configuration Options
1.6.4. Replication Implementation Details
1.7. HBase Backup
1.7.1. Full Shutdown Backup
1.7.2. Live Cluster Backup - Replication
1.7.3. Live Cluster Backup - CopyTable
1.7.4. Live Cluster Backup - Export
1.8. HBase Snapshots
1.8.1. Configuration
1.8.2. Take a Snapshot
1.8.3. Listing Snapshots
1.8.4. Deleting Snapshots
1.8.5. Clone a table from snapshot
1.8.6. Restore a snapshot
1.8.7. Snapshots operations and ACLs
1.8.8. Export to another cluster
1.9. Capacity Planning and Region Sizing
1.9.1. Node count and hardware/VM configuration
1.9.2. Determining region count and size
1.9.3. Initial configuration and tuning
1.10. Table Rename

This chapter will cover operational tools and practices required of a running Apache HBase cluster. The subject of operations is related to the topics of ???, ???, and ??? but is a distinct topic in itself.

1.1. HBase Tools and Utilities

HBase provides several tools for administration, analysis, and debugging of your cluster. The entry-point to most of these tools is the bin/hbase command, though some tools are available in the dev-support/ directory.

To see usage instructions for bin/hbase command, run it with no arguments, or with the -h argument. These are the usage instructions for HBase 0.98.x. Some commands, such as version, pe, ltt, clean, are not available in previous versions.

$ bin/hbase
Usage: hbase [<options>] <command> [<args>]
Options:
  --config DIR    Configuration direction to use. Default: ./conf
  --hosts HOSTS   Override the list in 'regionservers' file

Commands:
Some commands take arguments. Pass no args or -h for usage.
  shell           Run the HBase shell
  hbck            Run the hbase 'fsck' tool
  hlog            Write-ahead-log analyzer
  hfile           Store file analyzer
  zkcli           Run the ZooKeeper shell
  upgrade         Upgrade hbase
  master          Run an HBase HMaster node
  regionserver    Run an HBase HRegionServer node
  zookeeper       Run a Zookeeper server
  rest            Run an HBase REST server
  thrift          Run the HBase Thrift server
  thrift2         Run the HBase Thrift2 server
  clean           Run the HBase clean up script
  classpath       Dump hbase CLASSPATH
  mapredcp        Dump CLASSPATH entries required by mapreduce
  pe              Run PerformanceEvaluation
  ltt             Run LoadTestTool
  version         Print the version
  CLASSNAME       Run the class named CLASSNAME      
    

Some of the tools and utilities below are Java classes which are passed directly to the bin/hbase command, as referred to in the last line of the usage instructions. Others, such as hbase shell (???), hbase upgrade (???), and hbase thrift (???), are documented elsewhere in this guide.

1.1.1. Canary

There is a Canary class can help users to canary-test the HBase cluster status, with every column-family for every regions or regionservers granularity. To see the usage, use the --help parameter.

$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.tool.Canary -help

Usage: bin/hbase org.apache.hadoop.hbase.tool.Canary [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]
 where [opts] are:
   -help          Show this help and exit.
   -regionserver  replace the table argument to regionserver,
      which means to enable regionserver mode
   -daemon        Continuous check at defined intervals.
   -interval <N>  Interval between checks (sec)
   -e             Use region/regionserver as regular expression
      which means the region/regionserver is regular expression pattern
   -f <B>         stop whole program if first error occurs, default is true
   -t <N>         timeout for a check, default is 600000 (milliseconds)

This tool will return non zero error codes to user for collaborating with other monitoring tools, such as Nagios. The error code definitions are:

private static final int USAGE_EXIT_CODE = 1;
private static final int INIT_ERROR_EXIT_CODE = 2;
private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
private static final int ERROR_EXIT_CODE = 4;

Here are some examples based on the following given case. There are two HTable called test-01 and test-02, they have two column family cf1 and cf2 respectively, and deployed on the 3 regionservers. see following table.

RegionServertest-01test-02
rs1r1r2
rs2r2 
rs3r2r1

Following are some examples based on the previous given case.

1.1.1.1. Canary test for every column family (store) of every region of every table

$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.tool.Canary
            
3/12/09 03:26:32 INFO tool.Canary: read from region test-01,,1386230156732.0e3c7d77ffb6361ea1b996ac1042ca9a. column family cf1 in 2ms
13/12/09 03:26:32 INFO tool.Canary: read from region test-01,,1386230156732.0e3c7d77ffb6361ea1b996ac1042ca9a. column family cf2 in 2ms
13/12/09 03:26:32 INFO tool.Canary: read from region test-01,0004883,1386230156732.87b55e03dfeade00f441125159f8ca87. column family cf1 in 4ms
13/12/09 03:26:32 INFO tool.Canary: read from region test-01,0004883,1386230156732.87b55e03dfeade00f441125159f8ca87. column family cf2 in 1ms
...
13/12/09 03:26:32 INFO tool.Canary: read from region test-02,,1386559511167.aa2951a86289281beee480f107bb36ee. column family cf1 in 5ms
13/12/09 03:26:32 INFO tool.Canary: read from region test-02,,1386559511167.aa2951a86289281beee480f107bb36ee. column family cf2 in 3ms
13/12/09 03:26:32 INFO tool.Canary: read from region test-02,0004883,1386559511167.cbda32d5e2e276520712d84eaaa29d84. column family cf1 in 31ms
13/12/09 03:26:32 INFO tool.Canary: read from region test-02,0004883,1386559511167.cbda32d5e2e276520712d84eaaa29d84. column family cf2 in 8ms

So you can see, table test-01 has two regions and two column families, so the Canary tool will pick 4 small piece of data from 4 (2 region * 2 store) different stores. This is a default behavior of the this tool does.

1.1.1.2. Canary test for every column family (store) of every region of specific table(s)

You can also test one or more specific tables.

$ ${HBASE_HOME}/bin/hbase orghapache.hadoop.hbase.tool.Canary test-01 test-02

1.1.1.3. Canary test with regionserver granularity

This will pick one small piece of data from each regionserver, and can also put your resionserver name as input options for canary-test specific regionservers.

$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.tool.Canary -regionserver
            
13/12/09 06:05:17 INFO tool.Canary: Read from table:test-01 on region server:rs2 in 72ms
13/12/09 06:05:17 INFO tool.Canary: Read from table:test-02 on region server:rs3 in 34ms
13/12/09 06:05:17 INFO tool.Canary: Read from table:test-01 on region server:rs1 in 56ms

1.1.1.4. Canary test with regular expression pattern

This will test both table test-01 and test-02.

$ ${HBASE_HOME}/bin/hbase orghapache.hadoop.hbase.tool.Canary -e test-0[1-2]

1.1.1.5. Run canary test as daemon mode

Run repeatedly with interval defined in option -interval whose default value is 6 seconds. This daemon will stop itself and return non-zero error code if any error occurs, due to the default value of option -f is true.

$ ${HBASE_HOME}/bin/hbase orghapache.hadoop.hbase.tool.Canary -daemon

Run repeatedly with internal 5 seconds and will not stop itself even error occurs in the test.

$ ${HBASE_HOME}/bin/hbase orghapache.hadoop.hbase.tool.Canary -daemon -interval 50000 -f false

1.1.1.6. Force timeout if canary test stuck

In some cases, we suffered the request stucked on the regionserver and not response back to the client. The regionserver in problem, would also not indicated to be dead by Master, which would bring the clients hung. So we provide the timeout option to kill the canary test forcefully and return non-zero error code as well. This run sets the timeout value to 60 seconds, the default value is 600 seconds.

$ ${HBASE_HOME}/bin/hbase orghapache.hadoop.hbase.tool.Canary -t 600000

1.1.2. Health Checker

You can configure HBase to run a script on a period and if it fails N times (configurable), have the server exit. See HBASE-7351 Periodic health check script for configurations and detail.

1.1.3. Driver

Several frequently-accessed utilities are provided as Driver classes, and executed by the bin/hbase command. These utilities represent MapReduce jobs which run on your cluster. They are run in the following way, replacing UtilityName with the utility you want to run. This command assumes you have set the environment variable HBASE_HOME to the directory where HBase is unpacked on your server.

${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.mapreduce.UtilityName        
      

The following utilities are available:

LoadIncrementalHFiles

Complete a bulk data load.

CopyTable

Export a table from the local cluster to a peer cluster.

Export

Write table data to HDFS.

Import

Import data written by a previous Export operation.

ImportTsv

Import data in TSV format.

RowCounter

Count rows in an HBase table.

replication.VerifyReplication

Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed. Note that this command is in a different package than the others.

Each command except RowCounter accepts a single --help argument to print usage instructions.

1.1.4. HBase hbck

An fsck for your HBase install

To run hbck against your HBase cluster run $ ./bin/hbase hbck At the end of the command's output it prints OK or INCONSISTENCY. If your cluster reports inconsistencies, pass -details to see more detail emitted. If inconsistencies, run hbck a few times because the inconsistency may be transient (e.g. cluster is starting up or a region is splitting). Passing -fix may correct the inconsistency (This latter is an experimental feature).

For more information, see ???.

1.1.5. HFile Tool

See ???.

1.1.6. WAL Tools

1.1.6.1. FSHLog tool

The main method on FSHLog offers manual split and dump facilities. Pass it WALs or the product of a split, the content of the recovered.edits. directory.

You can get a textual dump of a WAL file content by doing the following:

 $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.FSHLog --dump hdfs://example.org:8020/hbase/.logs/example.org,60020,1283516293161/10.10.21.10%3A60020.1283973724012 

The return code will be non-zero if issues with the file so you can test wholesomeness of file by redirecting STDOUT to /dev/null and testing the program return.

Similarly you can force a split of a log file directory by doing:

 $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.FSHLog --split hdfs://example.org:8020/hbase/.logs/example.org,60020,1283516293161/
1.1.6.1.1. HLogPrettyPrinter

HLogPrettyPrinter is a tool with configurable options to print the contents of an HLog.

1.1.7. Compression Tool

See ???.

1.1.8. CopyTable

CopyTable is a utility that can copy part or of all of a table, either to the same cluster or another cluster. The target table must first exist. The usage is as follows:

$ ./bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --help        
/bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --help
Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] [--new.name=NEW] [--peer.adr=ADR] <tablename>

Options:
 rs.class     hbase.regionserver.class of the peer cluster, 
              specify if different from current cluster
 rs.impl      hbase.regionserver.impl of the peer cluster,
 startrow     the start row
 stoprow      the stop row
 starttime    beginning of the time range (unixtime in millis)
              without endtime means from starttime to forever
 endtime      end of the time range.  Ignored if no starttime specified.
 versions     number of cell versions to copy
 new.name     new table's name
 peer.adr     Address of the peer cluster given in the format
              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent
 families     comma-separated list of families to copy
              To copy from cf1 to cf2, give sourceCfName:destCfName.
              To keep the same name, just give "cfName"
 all.cells    also copy delete markers and deleted cells

Args:
 tablename    Name of the table to copy

Examples:
 To copy 'TestTable' to a cluster that uses replication for a 1 hour window:
 $ bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 --peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable

For performance consider the following general options:
  It is recommended that you set the following to >=100. A higher value uses more memory but
  decreases the round trip time to the server and may increase performance.
    -Dhbase.client.scanner.caching=100
  The following should always be set to false, to prevent writing data twice, which may produce
  inaccurate results.
    -Dmapred.map.tasks.speculative.execution=false       
      

Scanner Caching

Caching for the input Scan is configured via hbase.client.scanner.caching in the job configuration.

Versions

By default, CopyTable utility only copies the latest version of row cells unless --versions=n is explicitly specified in the command.

See Jonathan Hsieh's Online HBase Backups with CopyTable blog post for more on CopyTable.

1.1.9. Export

Export is a utility that will dump the contents of table to HDFS in a sequence file. Invoke via:

$ bin/hbase org.apache.hadoop.hbase.mapreduce.Export <tablename> <outputdir> [<versions> [<starttime> [<endtime>]]]

Note: caching for the input Scan is configured via hbase.client.scanner.caching in the job configuration.

1.1.10. Import

Import is a utility that will load data that has been exported back into HBase. Invoke via:

$ bin/hbase org.apache.hadoop.hbase.mapreduce.Import <tablename> <inputdir>

To import 0.94 exported files in a 0.96 cluster or onwards, you need to set system property "hbase.import.version" when running the import command as below:

$ bin/hbase -Dhbase.import.version=0.94 org.apache.hadoop.hbase.mapreduce.Import <tablename> <inputdir>

1.1.11. ImportTsv

ImportTsv is a utility that will load data in TSV format into HBase. It has two distinct usages: loading data from TSV format in HDFS into HBase via Puts, and preparing StoreFiles to be loaded via the completebulkload.

To load data via Puts (i.e., non-bulk loading):

$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <hdfs-inputdir>

To generate StoreFiles for bulk-loading:

$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>

These generated StoreFiles can be loaded into HBase via Section 1.1.12, “CompleteBulkLoad”.

1.1.11.1. ImportTsv Options

Running ImportTsv with no arguments prints brief usage information:

Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

Imports the given input directory of TSV data into the specified table.

The column names of the TSV data must be specified using the -Dimporttsv.columns
option. This option takes the form of comma-separated column names, where each
column name is either a simple column family, or a columnfamily:qualifier. The special
column name HBASE_ROW_KEY is used to designate that this column should be used
as the row key for each imported record. You must specify exactly one column
to be the row key, and you must specify a column name for every column that exists in the
input data.

By default importtsv will load data directly into HBase. To instead generate
HFiles of data to prepare for a bulk data load, pass the option:
  -Dimporttsv.bulk.output=/path/for/output
  Note: the target table will be created with default column family descriptors if it does not already exist.

Other options that may be specified with -D include:
  -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
  '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs
  -Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import
  -Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
        

1.1.11.2. ImportTsv Example

For example, assume that we are loading data into a table called 'datatsv' with a ColumnFamily called 'd' with two columns "c1" and "c2".

Assume that an input file exists as follows:

row1	c1	c2
row2	c1	c2
row3	c1	c2
row4	c1	c2
row5	c1	c2
row6	c1	c2
row7	c1	c2
row8	c1	c2
row9	c1	c2
row10	c1	c2
          

For ImportTsv to use this imput file, the command line needs to look like this:

 HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,d:c1,d:c2 -Dimporttsv.bulk.output=hdfs://storefileoutput datatsv hdfs://inputfile
 

... and in this example the first column is the rowkey, which is why the HBASE_ROW_KEY is used. The second and third columns in the file will be imported as "d:c1" and "d:c2", respectively.

1.1.11.3. ImportTsv Warning

If you have preparing a lot of data for bulk loading, make sure the target HBase table is pre-split appropriately.

1.1.11.4. See Also

For more information about bulk-loading HFiles into HBase, see ???

1.1.12. CompleteBulkLoad

The completebulkload utility will move generated StoreFiles into an HBase table. This utility is often used in conjunction with output from Section 1.1.11, “ImportTsv”.

There are two ways to invoke this utility, with explicit classname and via the driver:

$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>

.. and via the Driver..

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar completebulkload <hdfs://storefileoutput> <tablename>

1.1.12.1. CompleteBulkLoad Warning

Data generated via MapReduce is often created with file permissions that are not compatible with the running HBase process. Assuming you're running HDFS with permissions enabled, those permissions will need to be updated before you run CompleteBulkLoad.

For more information about bulk-loading HFiles into HBase, see ???.

1.1.13. WALPlayer

WALPlayer is a utility to replay WAL files into HBase.

The WAL can be replayed for a set of tables or all tables, and a timerange can be provided (in milliseconds). The WAL is filtered to this set of tables. The output can optionally be mapped to another set of tables.

WALPlayer can also generate HFiles for later bulk importing, in that case only a single table and no mapping can be specified.

Invoke via:

$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer [options] <wal inputdir> <tables> [<tableMappings>]>

For example:

$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer /backuplogdir oldTable1,oldTable2 newTable1,newTable2

WALPlayer, by default, runs as a mapreduce job. To NOT run WALPlayer as a mapreduce job on your cluster, force it to run all in the local process by adding the flags -Dmapreduce.jobtracker.address=local on the command line.

1.1.14. RowCounter and CellCounter

RowCounter is a mapreduce job to count all the rows of a table. This is a good utility to use as a sanity check to ensure that HBase can read all the blocks of a table if there are any concerns of metadata inconsistency. It will run the mapreduce all in a single process but it will run faster if you have a MapReduce cluster in place for it to exploit.

$ bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename> [<column1> <column2>...]

Note: caching for the input Scan is configured via hbase.client.scanner.caching in the job configuration.

HBase ships another diagnostic mapreduce job called CellCounter. Like RowCounter, it gathers more fine-grained statistics about your table. The statistics gathered by RowCounter are more fine-grained and include:

  • Total number of rows in the table.

  • Total number of CFs across all rows.

  • Total qualifiers across all rows.

  • Total occurrence of each CF.

  • Total occurrence of each qualifier.

  • Total number of versions of each qualifier.

The program allows you to limit the scope of the run. Provide a row regex or prefix to limit the rows to analyze. Use hbase.mapreduce.scan.column.family to specify scanning a single column family.

$ bin/hbase org.apache.hadoop.hbase.mapreduce.CellCounter <tablename> <outputDir> [regex or prefix]

Note: just like RowCounter, caching for the input Scan is configured via hbase.client.scanner.caching in the job configuration.

1.1.15. mlockall

It is possible to optionally pin your servers in physical memory making them less likely to be swapped out in oversubscribed environments by having the servers call mlockall on startup. See HBASE-4391 Add ability to start RS as root and call mlockall for how to build the optional library and have it run on startup.

1.1.16. Offline Compaction Tool

See the usage for the Compaction Tool. Run it like this ./bin/hbase org.apache.hadoop.hbase.regionserver.CompactionTool

1.1.17. hbase clean

The hbase clean command cleans HBase data from ZooKeeper, HDFS, or both. It is appropriate to use for testing. Run it with no options for usage instructions. The hbase clean command was introduced in HBase 0.98.

$ bin/hbase clean
Usage: hbase clean (--cleanZk|--cleanHdfs|--cleanAll)
Options:
        --cleanZk   cleans hbase related data from zookeeper.
        --cleanHdfs cleans hbase related data from hdfs.
        --cleanAll  cleans hbase related data from both zookeeper and hdfs.        
      

1.1.18. hbase pe

The hbase pe command is a shortcut provided to run the org.apache.hadoop.hbase.PerformanceEvaluation tool, which is used for testing. The hbase pe command was introduced in HBase 0.98.

1.1.19. hbase ltt

The hbase ltt command is a shortcut provided to run the rg.apache.hadoop.hbase.util.LoadTestTool utility, which is used for testing. The hbase ltt command was introduced in HBase 0.98.

1.2. Region Management

1.2.1. Major Compaction

Major compactions can be requested via the HBase shell or HBaseAdmin.majorCompact.

Note: major compactions do NOT do region merges. See ??? for more information about compactions.

1.2.2. Merge

Merge is a utility that can merge adjoining regions in the same table (see org.apache.hadoop.hbase.util.Merge).

$ bin/hbase org.apache.hadoop.hbase.util.Merge <tablename> <region1> <region2>

If you feel you have too many regions and want to consolidate them, Merge is the utility you need. Merge must run be done when the cluster is down. See the O'Reilly HBase Book for an example of usage.

You will need to pass 3 parameters to this application. The first one is the table name. The second one is the fully qualified name of the first region to merge, like "table_name,\x0A,1342956111995.7cef47f192318ba7ccc75b1bbf27a82b.". The third one is the fully qualified name for the second region to merge.

Additionally, there is a Ruby script attached to HBASE-1621 for region merging.

1.3. Node Management

1.3.1. Node Decommission

You can stop an individual RegionServer by running the following script in the HBase directory on the particular node:

$ ./bin/hbase-daemon.sh stop regionserver

The RegionServer will first close all regions and then shut itself down. On shutdown, the RegionServer's ephemeral node in ZooKeeper will expire. The master will notice the RegionServer gone and will treat it as a 'crashed' server; it will reassign the nodes the RegionServer was carrying.

Disable the Load Balancer before Decommissioning a node

If the load balancer runs while a node is shutting down, then there could be contention between the Load Balancer and the Master's recovery of the just decommissioned RegionServer. Avoid any problems by disabling the balancer first. See Load Balancer below.

A downside to the above stop of a RegionServer is that regions could be offline for a good period of time. Regions are closed in order. If many regions on the server, the first region to close may not be back online until all regions close and after the master notices the RegionServer's znode gone. In Apache HBase 0.90.2, we added facility for having a node gradually shed its load and then shutdown itself down. Apache HBase 0.90.2 added the graceful_stop.sh script. Here is its usage:

$ ./bin/graceful_stop.sh
Usage: graceful_stop.sh [--config &conf-dir>] [--restart] [--reload] [--thrift] [--rest] &hostname>
 thrift      If we should stop/start thrift before/after the hbase stop/start
 rest        If we should stop/start rest before/after the hbase stop/start
 restart     If we should restart after graceful stop
 reload      Move offloaded regions back on to the stopped server
 debug       Move offloaded regions back on to the stopped server
 hostname    Hostname of server we are to stop

To decommission a loaded RegionServer, run the following: $ ./bin/graceful_stop.sh HOSTNAME where HOSTNAME is the host carrying the RegionServer you would decommission.

On HOSTNAME

The HOSTNAME passed to graceful_stop.sh must match the hostname that hbase is using to identify RegionServers. Check the list of RegionServers in the master UI for how HBase is referring to servers. Its usually hostname but can also be FQDN. Whatever HBase is using, this is what you should pass the graceful_stop.sh decommission script. If you pass IPs, the script is not yet smart enough to make a hostname (or FQDN) of it and so it will fail when it checks if server is currently running; the graceful unloading of regions will not run.

The graceful_stop.sh script will move the regions off the decommissioned RegionServer one at a time to minimize region churn. It will verify the region deployed in the new location before it will moves the next region and so on until the decommissioned server is carrying zero regions. At this point, the graceful_stop.sh tells the RegionServer stop. The master will at this point notice the RegionServer gone but all regions will have already been redeployed and because the RegionServer went down cleanly, there will be no WAL logs to split.

Load Balancer

It is assumed that the Region Load Balancer is disabled while the graceful_stop script runs (otherwise the balancer and the decommission script will end up fighting over region deployments). Use the shell to disable the balancer:

hbase(main):001:0> balance_switch false
true
0 row(s) in 0.3590 seconds

This turns the balancer OFF. To reenable, do:

hbase(main):001:0> balance_switch true
false
0 row(s) in 0.3590 seconds

The graceful_stop will check the balancer and if enabled, will turn it off before it goes to work. If it exits prematurely because of error, it will not have reset the balancer. Hence, it is better to manage the balancer apart from graceful_stop reenabling it after you are done w/ graceful_stop.

1.3.1.1. Decommissioning several Regions Servers concurrently

If you have a large cluster, you may want to decommission more than one machine at a time by gracefully stopping mutiple RegionServers concurrently. To gracefully drain multiple regionservers at the same time, RegionServers can be put into a "draining" state. This is done by marking a RegionServer as a draining node by creating an entry in ZooKeeper under the hbase_root/draining znode. This znode has format name,port,startcode just like the regionserver entries under hbase_root/rs znode.

Without this facility, decommissioning mulitple nodes may be non-optimal because regions that are being drained from one region server may be moved to other regionservers that are also draining. Marking RegionServers to be in the draining state prevents this from happening[1].

1.3.1.2. Bad or Failing Disk

It is good having ??? set if you have a decent number of disks per machine for the case where a disk plain dies. But usually disks do the "John Wayne" -- i.e. take a while to go down spewing errors in dmesg -- or for some reason, run much slower than their companions. In this case you want to decommission the disk. You have two options. You can decommission the datanode or, less disruptive in that only the bad disks data will be rereplicated, can stop the datanode, unmount the bad volume (You can't umount a volume while the datanode is using it), and then restart the datanode (presuming you have set dfs.datanode.failed.volumes.tolerated > 0). The regionserver will throw some errors in its logs as it recalibrates where to get its data from -- it will likely roll its WAL log too -- but in general but for some latency spikes, it should keep on chugging.

Short Circuit Reads

If you are doing short-circuit reads, you will have to move the regions off the regionserver before you stop the datanode; when short-circuiting reading, though chmod'd so regionserver cannot have access, because it already has the files open, it will be able to keep reading the file blocks from the bad disk even though the datanode is down. Move the regions back after you restart the datanode.

1.3.2. Rolling Restart

Some cluster configuration changes require either the entire cluster, or the RegionServers, to be restarted in order to pick up the changes. In addition, rolling restarts are supported for upgrading to a minor or maintenance release, and to a major release if at all possible. See the release notes for release you want to upgrade to, to find out about limitations to the ability to perform a rolling upgrade.

There are multiple ways to restart your cluster nodes, depending on your situation. These methods are detailed below.

1.3.2.1. Using the rolling-restart.sh Script

HBase ships with a script, bin/rolling-restart.sh, that allows you to perform rolling restarts on the entire cluster, the master only, or the RegionServers only. The script is provided as a template for your own script, and is not explicitly tested. It requires password-less SSH login to be configured and assumes that you have deployed using a tarball. The script requires you to set some environment variables before running it. Examine the script and modify it to suit your needs.

Example 1.1. rolling-restart.sh General Usage

$ ./bin/rolling-restart.sh --help
Usage: rolling-restart.sh [--config <hbase-confdir>] [--rs-only] [--master-only] [--graceful] [--maxthreads xx]          
        

Rolling Restart on RegionServers Only

To perform a rolling restart on the RegionServers only, use the --rs-only option. This might be necessary if you need to reboot the individual RegionServer or if you make a configuration change that only affects RegionServers and not the other HBase processes.

If you need to restart only a single RegionServer, or if you need to do extra actions during the restart, use the bin/graceful_stop.sh command instead. See Section 1.3.2.2, “Manual Rolling Restart”.

Rolling Restart on Masters Only

To perform a rolling restart on the active and backup Masters, use the --master-only option. You might use this if you know that your configuration change only affects the Master and not the RegionServers, or if you need to restart the server where the active Master is running.

If you are not running backup Masters, the Master is simply restarted. If you are running backup Masters, they are all stopped before any are restarted, to avoid a race condition in ZooKeeper to determine which is the new Master. First the main Master is restarted, then the backup Masters are restarted. Directly after restart, it checks for and cleans out any regions in transition before taking on its normal workload.

Graceful Restart

If you specify the --graceful option, RegionServers are restarted using the bin/graceful_stop.sh script, which moves regions off a RegionServer before restarting it. This is safer, but can delay the restart.

Limiting the Number of Threads

To limit the rolling restart to using only a specific number of threads, use the --maxthreads option.

1.3.2.2. Manual Rolling Restart

To retain more control over the process, you may wish to manually do a rolling restart across your cluster. This uses the graceful-stop.sh command Section 1.3.1, “Node Decommission”. In this method, you can restart each RegionServer individually and then move its old regions back into place, retaining locality. If you also need to restart the Master, you need to do it separately, and restart the Master before restarting the RegionServers using this method. The following is an example of such a command. You may need to tailor it to your environment. This script does a rolling restart of RegionServers only. It disables the load balancer before moving the regions.

$ for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh --restart --reload --debug $i; done &> /tmp/log.txt &;     
        

Monitor the output of the /tmp/log.txt file to follow the progress of the script.

1.3.2.3. Logic for Crafting Your Own Rolling Restart Script

Use the following guidelines if you want to create your own rolling restart script.

  1. Extract the new release, verify its configuration, and synchronize it to all nodes of your cluster using rsync, scp, or another secure synchronization mechanism.

  2. Use the hbck utility to ensure that the cluster is consistent.

    $ ./bin/hbck            
              

    Perform repairs if required. See Section 1.1.4, “HBase hbck for details.

  3. Restart the master first. You may need to modify these commands if your new HBase directory is different from the old one, such as for an upgrade.

    $ ./bin/hbase-daemon.sh stop master; ./bin/hbase-daemon.sh start master            
              
  4. Gracefully restart each RegionServer, using a script such as the following, from the Master.

    $ for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh --restart --reload --debug $i; done &> /tmp/log.txt &            
              

    If you are running Thrift or REST servers, pass the --thrift or --rest options. For other available options, run the bin/graceful-stop.sh --help command.

    It is important to drain HBase regions slowly when restarting multiple RegionServers. Otherwise, multiple regions go offline simultaneously and must be reassigned to other nodes, which may also go offline soon. This can negatively affect performance. You can inject delays into the script above, for instance, by adding a Shell command such as sleep. To wait for 5 minutes between each RegionServer restart, modify the above script to the following:

    $ for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh --restart --reload --debug $i & sleep 5m; done &> /tmp/log.txt &            
              
  5. Restart the Master again, to clear out the dead servers list and re-enable the load balancer.

  6. Run the hbck utility again, to be sure the cluster is consistent.

1.3.3. Adding a New Node

Adding a new regionserver in HBase is essentially free, you simply start it like this: $ ./bin/hbase-daemon.sh start regionserver and it will register itself with the master. Ideally you also started a DataNode on the same machine so that the RS can eventually start to have local files. If you rely on ssh to start your daemons, don't forget to add the new hostname in conf/regionservers on the master.

At this point the region server isn't serving data because no regions have moved to it yet. If the balancer is enabled, it will start moving regions to the new RS. On a small/medium cluster this can have a very adverse effect on latency as a lot of regions will be offline at the same time. It is thus recommended to disable the balancer the same way it's done when decommissioning a node and move the regions manually (or even better, using a script that moves them one by one).

The moved regions will all have 0% locality and won't have any blocks in cache so the region server will have to use the network to serve requests. Apart from resulting in higher latency, it may also be able to use all of your network card's capacity. For practical purposes, consider that a standard 1GigE NIC won't be able to read much more than 100MB/s. In this case, or if you are in a OLAP environment and require having locality, then it is recommended to major compact the moved regions.

1.4. HBase Metrics

HBase emits metrics which adhere to the Hadoop metrics API. Starting with HBase 0.95, HBase is configured to emit a default set of metrics with a default sampling period of every 10 seconds. You can use HBase metrics in conjunction with Ganglia. You can also filter which metrics are emitted and extend the metrics framework to capture custom metrics appropriate for your environment.

1.4.1. Metric Setup

For HBase 0.95 and newer, HBase ships with a default metrics configuration, or sink. This includes a wide variety of individual metrics, and emits them every 10 seconds by default. To configure metrics for a given region server, edit the conf/hadoop-metrics2-hbase.properties file. Restart the region server for the changes to take effect.

To change the sampling rate for the default sink, edit the line beginning with *.period. To filter which metrics are emitted or to extend the metrics framework, see http://hadoop.apache.org/docs/current/api/org/apache/hadoop/metrics2/package-summary.html

HBase Metrics and Ganglia

By default, HBase emits a large number of metrics per region server. Ganglia may have difficulty processing all these metrics. Consider increasing the capacity of the Ganglia server or reducing the number of metrics emitted by HBase. See Metrics Filtering.

1.4.2. Disabling Metrics

To disable metrics for a region server, edit the conf/hadoop-metrics2-hbase.properties file and comment out any uncommented lines. Restart the region server for the changes to take effect.

1.4.3. Discovering Available Metrics

Rather than listing each metric which HBase emits by default, you can browse through the available metrics, either as a JSON output or via JMX. At this time, the JSON output does not include the description field which is included in the JMX view. Different metrics are exposed for the Master process and each region server process.

Procedure 1.1. Access a JSON Output of Available Metrics

  1. After starting HBase, access the region server's web UI, at http://localhost:60030 by default.

  2. Click the Metrics Dump link near the top. The metrics for the region server are presented as a dump of the JMX bean in JSON format.

  3. To view metrics for the Master, connect to the Master's web UI instead (defaults to http://localhost:60010) and click its Metrics Dump link.

Procedure 1.2. Browse the JMX Output of Available Metrics

You can use many different tools to view JMX content by browsing MBeans. This procedure uses jvisualvm, which is an application usually available in the JDK.

  1. Start HBase, if it is not already running.

  2. Run the command jvisualvm command on a host with a GUI display. You can launch it from the command line or another method appropriate for your operating system.

  3. Be sure the VisualVM-MBeans plugin is installed. Browse to ToolsPlugins. Click Installed and check whether the plugin is listed. If not, click Available Plugins, select it, and click Install. When finished, click Close.

  4. To view details for a given HBase process, double-click the process in the Local sub-tree in the left-hand panel. A detailed view opens in the right-hand panel. Click the MBeans tab which appears as a tab in the top of the right-hand panel.

  5. To access the HBase metrics, navigate to the appropriate sub-bean:

    • Master: HadoopHBaseMasterServer

    • RegionServer: HadoopHBaseRegionServerServer

  6. The name of each metric and its current value is displayed in the Attributes tab. For a view which includes more details, including the description of each attribute, click the Metadata tab.

1.4.4. Most Important RegionServer Metrics

Previously, this section contained a list of the most important RegionServer metrics. However, the list was extremely out of date. In some cases, the name of a given metric has changed. In other cases, the metric seems to no longer be exposed. An effort is underway to create automatic documentation for each metric based upon information pulled from its implementation.

1.5. HBase Monitoring

1.5.1. Overview

The following metrics are arguably the most important to monitor for each RegionServer for "macro monitoring", preferably with a system like OpenTSDB. If your cluster is having performance issues it's likely that you'll see something unusual with this group.

OS:

  • IO Wait

  • User CPU

Java:

  • GC

For more information on HBase metrics, see Section 1.4, “HBase Metrics”.

1.5.2. Slow Query Log

The HBase slow query log consists of parseable JSON structures describing the properties of those client operations (Gets, Puts, Deletes, etc.) that either took too long to run, or produced too much output. The thresholds for "too long to run" and "too much output" are configurable, as described below. The output is produced inline in the main region server logs so that it is easy to discover further details from context with other logged events. It is also prepended with identifying tags (responseTooSlow), (responseTooLarge), (operationTooSlow), and (operationTooLarge) in order to enable easy filtering with grep, in case the user desires to see only slow queries.

1.5.2.1. Configuration

There are two configuration knobs that can be used to adjust the thresholds for when queries are logged.

  • hbase.ipc.warn.response.time Maximum number of milliseconds that a query can be run without being logged. Defaults to 10000, or 10 seconds. Can be set to -1 to disable logging by time.

  • hbase.ipc.warn.response.size Maximum byte size of response that a query can return without being logged. Defaults to 100 megabytes. Can be set to -1 to disable logging by size.

1.5.2.2. Metrics

The slow query log exposes to metrics to JMX.

  • hadoop.regionserver_rpc_slowResponse a global metric reflecting the durations of all responses that triggered logging.

  • hadoop.regionserver_rpc_methodName.aboveOneSec A metric reflecting the durations of all responses that lasted for more than one second.

1.5.2.3. Output

The output is tagged with operation e.g. (operationTooSlow) if the call was a client operation, such as a Put, Get, or Delete, which we expose detailed fingerprint information for. If not, it is tagged (responseTooSlow) and still produces parseable JSON output, but with less verbose information solely regarding its duration and size in the RPC itself. TooLarge is substituted for TooSlow if the response size triggered the logging, with TooLarge appearing even in the case that both size and duration triggered logging.

1.5.2.4. Example

2011-09-08 10:01:25,824 WARN org.apache.hadoop.ipc.HBaseServer: (operationTooSlow): {"tables":{"riley2":{"puts":[{"totalColumns":11,"families":{"actions":[{"timestamp":1315501284459,"qualifier":"0","vlen":9667580},{"timestamp":1315501284459,"qualifier":"1","vlen":10122412},{"timestamp":1315501284459,"qualifier":"2","vlen":11104617},{"timestamp":1315501284459,"qualifier":"3","vlen":13430635}]},"row":"cfcd208495d565ef66e7dff9f98764da:0"}],"families":["actions"]}},"processingtimems":956,"client":"10.47.34.63:33623","starttimems":1315501284456,"queuetimems":0,"totalPuts":1,"class":"HRegionServer","responsesize":0,"method":"multiPut"}

Note that everything inside the "tables" structure is output produced by MultiPut's fingerprint, while the rest of the information is RPC-specific, such as processing time and client IP/port. Other client operations follow the same pattern and the same general structure, with necessary differences due to the nature of the individual operations. In the case that the call is not a client operation, that detailed fingerprint information will be completely absent.

This particular example, for example, would indicate that the likely cause of slowness is simply a very large (on the order of 100MB) multiput, as we can tell by the "vlen," or value length, fields of each put in the multiPut.

1.5.3. Block Cache Monitoring

Starting with HBase 0.98, the HBase Web UI includes the ability to monitor and report on the performance of the block cache. To view the block cache reports, click TasksShow Non-RPC TasksBlock Cache. Following are a few examples of the reporting capabilities.

Figure 1.1. Basic Info

Basic Info

Figure 1.2. Config

Config

Figure 1.3. Stats

Stats

Figure 1.4. L1 and L2

L1 and L2

This is not an exhaustive list of all the screens and reports available. Have a look in the Web UI.

1.6. Cluster Replication

Note

This information was previously available at Cluster Replication.

HBase provides a replication mechanism to copy data between HBase clusters. Replication can be used as a disaster recovery solution and as a mechanism for high availability. You can also use replication to separate web-facing operations from back-end jobs such as MapReduce.

In terms of architecture, HBase replication is master-push. This takes advantage of the fact that each region server has its own write-ahead log (WAL). One master cluster can replicate to any number of slave clusters, and each region server replicates its own stream of edits. For more information on the different properties of master/slave replication and other types of replication, see the article How Google Serves Data From Multiple Datacenters.

Replication is asynchronous, allowing clusters to be geographically distant or to have some gaps in availability. This also means that data between master and slave clusters will not be instantly consistent. Rows inserted on the master are not immediately available or consistent with rows on the slave clusters. rows inserted on the master cluster won’t be available at the same time on the slave clusters. The goal is eventual consistency.

The replication format used in this design is conceptually the same as the statement-based replication design used by MySQL. Instead of SQL statements, entire WALEdits (consisting of multiple cell inserts coming from Put and Delete operations on the clients) are replicated in order to maintain atomicity.

The WALs for each region server must be kept in HDFS as long as they are needed to replicate data to any slave cluster. Each region server reads from the oldest log it needs to replicate and keeps track of the current position inside ZooKeeper to simplify failure recovery. That position, as well as the queue of WALs to process, may be different for every slave cluster.

The clusters participating in replication can be of different sizes. The master cluster relies on randomization to attempt to balance the stream of replication on the slave clusters

HBase supports master/master and cyclic replication as well as replication to multiple slaves.

Figure 1.5. Replication Architecture Overview

Replication Architecture Overview

Enabling and Configuring Replication. See the API documentation for replication for information on enabling and configuring replication.

1.6.1. Life of a WAL Edit

A single WAL edit goes through several steps in order to be replicated to a slave cluster.

When the slave responds correctly:

  1. A HBase client uses a Put or Delete operation to manipulate data in HBase.

  2. The region server writes the request to the WAL in a way that would allow it to be replayed if it were not written successfully.

  3. If the changed cell corresponds to a column family that is scoped for replication, the edit is added to the queue for replication.

  4. In a separate thread, the edit is read from the log, as part of a batch process. Only the KeyValues that are eligible for replication are kept. Replicable KeyValues are part of a column family whose schema is scoped GLOBAL, are not part of a catalog such as hbase:meta, and did not originate from the target slave cluster, in the case of cyclic replication.

  5. The edit is tagged with the master's UUID and added to a buffer. When the buffer is filled, or the reader reaches the end of the file, the buffer is sent to a random region server on the slave cluster.

  6. The region server reads the edits sequentially and separates them into buffers, one buffer per table. After all edits are read, each buffer is flushed using HTable, HBase's normal client. The master's UUID is preserved in the edits they are applied, in order to allow for cyclic replication.

  7. In the master, the offset for the WAL that is currently being replicated is registered in ZooKeeper.

When the slave does not respond:

  1. The first three steps, where the edit is inserted, are identical.

  2. Again in a separate thread, the region server reads, filters, and edits the log edits in the same way as above. The slave region server does not answer the RPC call.

  3. The master sleeps and tries again a configurable number of times.

  4. If the slave region server is still not available, the master selects a new subset of region server to replicate to, and tries again to send the buffer of edits.

  5. Meanwhile, the WALs are rolled and stored in a queue in ZooKeeper. Logs that are archived by their region server, by moving them from the region server's log directory to a central log directory, will update their paths in the in-memory queue of the replicating thread.

  6. When the slave cluster is finally available, the buffer is applied in the same way as during normal processing. The master region server will then replicate the backlog of logs that accumulated during the outage.

Preserving Tags During Replication

By default, the codec used for replication between clusters strips tags, such as cell-level ACLs, from cells. To prevent the tags from being stripped, you can use a different codec which does not strip them. Configure hbase.replication.rpc.codec to use org.apache.hadoop.hbase.codec.KeyValueCodecWithTags, on both the source and sink RegionServers involved in the replication. This option was introduced in HBASE-10322.

1.6.2. Replication Internals

Replication State in ZooKeeper

HBase replication maintains its state in ZooKeeper. By default, the state is contained in the base node /hbase/replication. This node contains two child nodes, the Peers znode and the RS znode.

Warning

Replication may be disrupted and data loss may occur if you delete the replication tree (/hbase/replication/) from ZooKeeper. This is despite the information about invariants at ???. Follow progress on this issue at HBASE-10295.

The Peers Znode

The peers znode is stored in /hbase/replication/peers by default. It consists of a list of all peer replication clusters, along with the status of each of them. The value of each peer is its cluster key, which is provided in the HBase Shell. The cluster key contains a list of ZooKeeper nodes in the cluster's quorum, the client port for the ZooKeeper quorum, and the base znode for HBase in HDFS on that cluster.

/hbase/replication/peers
  /1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
  /2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]            
          

Each peer has a child znode which indicates whether or not replication is enabled on that cluster. These peer-state znodes do not contain any child znodes, but only contain a Boolean value. This value is read and maintained by the ReplicationPeer.PeerStateTracker class.

/hbase/replication/peers
  /1/peer-state [Value: ENABLED]
  /2/peer-state [Value: DISABLED]
          
The RS Znode

The rs znode contains a list of WAL logs which need to be replicated. This list is divided into a set of queues organized by region server and the peer cluster the region server is shipping the logs to. The rs znode has one child znode for each region server in the cluster. The child znode name is the region server's hostname, client port, and start code. This list includes both live and dead region servers.

/hbase/replication/rs
  /hostname.example.org,6020,1234
  /hostname2.example.org,6020,2856            
          

Each rs znode contains a list of WAL replication queues, one queue for each peer cluster it replicates to. These queues are represented by child znodes named by the cluster ID of the peer cluster they represent.

/hbase/replication/rs
  /hostname.example.org,6020,1234
    /1
    /2            
          

Each queue has one child znode for each WAL log that still needs to be replicated. the value of these child znodes is the last position that was replicated. This position is updated each time a WAL log is replicated.

/hbase/replication/rs
  /hostname.example.org,6020,1234
    /1
      23522342.23422 [VALUE: 254]
      12340993.22342 [VALUE: 0]            
          

1.6.3. Replication Configuration Options

OptionDescriptionDefault

zookeeper.znode.parent

The name of the base ZooKeeper znode used for HBase

/hbase

zookeeper.znode.replication

The name of the base znode used for replication

replication

zookeeper.znode.replication.peers

The name of the peer znode

peers

zookeeper.znode.replication.peers.state

The name of peer-state znode

peer-state

zookeeper.znode.replication.rs

The name of the rs znode

rs

hbase.replication

Whether replication is enabled or disabled on a given cluster

false

eplication.sleep.before.failover

How many milliseconds a worker should sleep before attempting to replicate a dead region server's WAL queues.

replication.executor.workers

The number of region servers a given region server should attempt to failover simultaneously.

1

1.6.4. Replication Implementation Details

Choosing Region Servers to Replicate To. When a master cluster region server initiates a replication source to a slave cluster, it first connects to the slave's ZooKeeper ensemble using the provided cluster key . It then scans the rs/ directory to discover all the available sinks (region servers that are accepting incoming streams of edits to replicate) and randomly chooses a subset of them using a configured ratio which has a default value of 10%. For example, if a slave cluster has 150 machines, 15 will be chosen as potential recipient for edits that this master cluster region server sends. Because this selection is performed by each master region server, the probability that all slave region servers are used is very high, and this method works for clusters of any size. For example, a master cluster of 10 machines replicating to a slave cluster of 5 machines with a ratio of 10% causes the master cluster region servers to choose one machine each at random.

A ZooKeeper watcher is placed on the ${zookeeper.znode.parent}/rs node of the slave cluster by each of the master cluster's region servers. This watch is used to monitor changes in the composition of the slave cluster. When nodes are removed from the slave cluster, or if nodes go down or come back up, the master cluster's region servers will respond by selecting a new pool of slave region servers to replicate to.

Keeping Track of Logs. Each master cluster region server has its own znode in the replication znodes hierarchy. It contains one znode per peer cluster (if 5 slave clusters, 5 znodes are created), and each of these contain a queue of WALs to process. Each of these queues will track the WALs created by that region server, but they can differ in size. For example, if one slave cluster becomes unavailable for some time, the WALs should not be deleted, so they need to stay in the queue while the others are processed. See Region Server Failover for an example.

When a source is instantiated, it contains the current WAL that the region server is writing to. During log rolling, the new file is added to the queue of each slave cluster's znode just before it is made available. This ensures that all the sources are aware that a new log exists before the region server is able to append edits into it, but this operations is now more expensive. The queue items are discarded when the replication thread cannot read more entries from a file (because it reached the end of the last block) and there are other files in the queue. This means that if a source is up to date and replicates from the log that the region server writes to, reading up to the "end" of the current file will not delete the item in the queue.

A log can be archived if it is no longer used or if the number of logs exceeds hbase.regionserver.maxlogs because the insertion rate is faster than regions are flushed. When a log is archived, the source threads are notified that the path for that log changed. If a particular source has already finished with an archived log, it will just ignore the message. If the log is in the queue, the path will be updated in memory. If the log is currently being replicated, the change will be done atomically so that the reader doesn't attempt to open the file when has already been moved. Because moving a file is a NameNode operation , if the reader is currently reading the log, it won't generate any exception.

Reading, Filtering and Sending Edits. By default, a source attempts to read from a WAL and ship log entries to a sink as quickly as possible. Speed is limited by the filtering of log entries Only KeyValues that are scoped GLOBAL and that do not belong to catalog tables will be retained. Speed is also limited by total size of the list of edits to replicate per slave, which is limited to 64 MB by default. With this configuration, a master cluster region server with three slaves would use at most 192 MB to store data to replicate. This does not account for the data which was filtered but not garbage collected.

Once the maximum size of edits has been buffered or the reader reaces the end of the WAL, the source thread stops reading and chooses at random a sink to replicate to (from the list that was generated by keeping only a subset of slave region servers). It directly issues a RPC to the chosen region server and waits for the method to return. If the RPC was successful, the source determines whether the current file has been emptied or it contains more data which needs to be read. If the file has been emptied, the source deletes the znode in the queue. Otherwise, it registers the new offset in the log's znode. If the RPC threw an exception, the source will retry 10 times before trying to find a different sink.

Cleaning Logs. If replication is not enabled, the master's log-cleaning thread deletes old logs using a configured TTL. This TTL-based method does not work well with replication, because archived logs which have exceeded their TTL may still be in a queue. The default behavior is augmented so that if a log is past its TTL, the cleaning thread looks up every queue until it finds the log, while caching queues it has found. If the log is not found in any queues, the log will be deleted. The next time the cleaning process needs to look for a log, it starts by using its cached list.

Region Server Failover. When no region servers are failing, keeping track of the logs in ZooKeeper adds no value. Unfortunately, region servers do fail, and since ZooKeeper is highly available, it is useful for managing the transfer of the queues in the event of a failure.

Each of the master cluster region servers keeps a watcher on every other region server, in order to be notified when one dies (just as the master does). When a failure happens, they all race to create a znode called lock inside the dead region server's znode that contains its queues. The region server that creates it successfully then transfers all the queues to its own znode, one at a time since ZooKeeper does not support renaming queues. After queues are all transferred, they are deleted from the old location. The znodes that were recovered are renamed with the ID of the slave cluster appended with the name of the dead server.

Next, the master cluster region server creates one new source thread per copied queue, and each of the source threads follows the read/filter/ship pattern. The main difference is that those queues will never receive new data, since they do not belong to their new region server. When the reader hits the end of the last log, the queue's znode is deleted and the master cluster region server closes that replication source.

Given a master cluster with 3 region servers replicating to a single slave with id 2, the following hierarchy represents what the znodes layout could be at some point in time. The region servers' znodes all contain a peers znode which contains a single queue. The znode names in the queues represent the actual file names on HDFS in the form address,port.timestamp.

/hbase/replication/rs/
  1.1.1.1,60020,123456780/
    2/
      1.1.1.1,60020.1234  (Contains a position)
      1.1.1.1,60020.1265
  1.1.1.2,60020,123456790/
    2/
      1.1.1.2,60020.1214  (Contains a position)
      1.1.1.2,60020.1248
      1.1.1.2,60020.1312
  1.1.1.3,60020,    123456630/
    2/
      1.1.1.3,60020.1280  (Contains a position)            
          

Assume that 1.1.1.2 loses its ZooKeeper session. The survivors will race to create a lock, and, arbitrarily, 1.1.1.3 wins. It will then start transferring all the queues to its local peers znode by appending the name of the dead server. Right before 1.1.1.3 is able to clean up the old znodes, the layout will look like the following:

/hbase/replication/rs/
  1.1.1.1,60020,123456780/
    2/
      1.1.1.1,60020.1234  (Contains a position)
      1.1.1.1,60020.1265
  1.1.1.2,60020,123456790/
    lock
    2/
      1.1.1.2,60020.1214  (Contains a position)
      1.1.1.2,60020.1248
      1.1.1.2,60020.1312
  1.1.1.3,60020,123456630/
    2/
      1.1.1.3,60020.1280  (Contains a position)

    2-1.1.1.2,60020,123456790/
      1.1.1.2,60020.1214  (Contains a position)
      1.1.1.2,60020.1248
      1.1.1.2,60020.1312            
          

Some time later, but before 1.1.1.3 is able to finish replicating the last WAL from 1.1.1.2, it dies too. Some new logs were also created in the normal queues. The last region server will then try to lock 1.1.1.3's znode and will begin transferring all the queues. The new layout will be:

/hbase/replication/rs/
  1.1.1.1,60020,123456780/
    2/
      1.1.1.1,60020.1378  (Contains a position)

    2-1.1.1.3,60020,123456630/
      1.1.1.3,60020.1325  (Contains a position)
      1.1.1.3,60020.1401

    2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
      1.1.1.2,60020.1312  (Contains a position)
  1.1.1.3,60020,123456630/
    lock
    2/
      1.1.1.3,60020.1325  (Contains a position)
      1.1.1.3,60020.1401

    2-1.1.1.2,60020,123456790/
      1.1.1.2,60020.1312  (Contains a position)            
          

Replication Metrics. The following metrics are exposed at the global region server level and (since HBase 0.95) at the peer level:

source.sizeOfLogQueue

number of WALs to process (excludes the one which is being processed) at the Replication source

source.shippedOps

number of mutations shipped

source.logEditsRead

number of mutations read from HLogs at the replication source

source.ageOfLastShippedOp

age of last batch that was shipped by the replication source

1.7. HBase Backup

There are two broad strategies for performing HBase backups: backing up with a full cluster shutdown, and backing up on a live cluster. Each approach has pros and cons.

For additional information, see HBase Backup Options over on the Sematext Blog.

1.7.1. Full Shutdown Backup

Some environments can tolerate a periodic full shutdown of their HBase cluster, for example if it is being used a back-end analytic capacity and not serving front-end web-pages. The benefits are that the NameNode/Master are RegionServers are down, so there is no chance of missing any in-flight changes to either StoreFiles or metadata. The obvious con is that the cluster is down. The steps include:

1.7.1.1. Stop HBase

1.7.1.2. Distcp

Distcp could be used to either copy the contents of the HBase directory in HDFS to either the same cluster in another directory, or to a different cluster.

Note: Distcp works in this situation because the cluster is down and there are no in-flight edits to files. Distcp-ing of files in the HBase directory is not generally recommended on a live cluster.

1.7.1.3. Restore (if needed)

The backup of the hbase directory from HDFS is copied onto the 'real' hbase directory via distcp. The act of copying these files creates new HDFS metadata, which is why a restore of the NameNode edits from the time of the HBase backup isn't required for this kind of restore, because it's a restore (via distcp) of a specific HDFS directory (i.e., the HBase part) not the entire HDFS file-system.

1.7.2. Live Cluster Backup - Replication

This approach assumes that there is a second cluster. See the HBase page on replication for more information.

1.7.3. Live Cluster Backup - CopyTable

The Section 1.1.8, “CopyTable” utility could either be used to copy data from one table to another on the same cluster, or to copy data to another table on another cluster.

Since the cluster is up, there is a risk that edits could be missed in the copy process.

1.7.4. Live Cluster Backup - Export

The Section 1.1.9, “Export” approach dumps the content of a table to HDFS on the same cluster. To restore the data, the Section 1.1.10, “Import” utility would be used.

Since the cluster is up, there is a risk that edits could be missed in the export process.

1.8. HBase Snapshots

HBase Snapshots allow you to take a snapshot of a table without too much impact on Region Servers. Snapshot, Clone and restore operations don't involve data copying. Also, Exporting the snapshot to another cluster doesn't have impact on the Region Servers.

Prior to version 0.94.6, the only way to backup or to clone a table is to use CopyTable/ExportTable, or to copy all the hfiles in HDFS after disabling the table. The disadvantages of these methods are that you can degrade region server performance (Copy/Export Table) or you need to disable the table, that means no reads or writes; and this is usually unacceptable.

1.8.1. Configuration

To turn on the snapshot support just set the hbase.snapshot.enabled property to true. (Snapshots are enabled by default in 0.95+ and off by default in 0.94.6+)

  <property>
    <name>hbase.snapshot.enabled</name>
    <value>true</value>
  </property>
        

1.8.2. Take a Snapshot

You can take a snapshot of a table regardless of whether it is enabled or disabled. The snapshot operation doesn't involve any data copying.

$ ./bin/hbase shell
hbase> snapshot 'myTable', 'myTableSnapshot-122112'
        

Take a Snapshot Without Flushing. The default behavior is to perform a flush of data in memory before the snapshot is taken. This means that data in memory is included in the snapshot. In most cases, this is the desired behavior. However, if your set-up can tolerate data in memory being excluded from the snapshot, you can use the SKIP_FLUSH option of the snapshot command to disable and flushing while taking the snapshot.

hbase> snapshot 'mytable', 'snapshot123', {SKIP_FLUSH => true}

Warning

There is no way to determine or predict whether a very concurrent insert or update will be included in a given snapshot, whether flushing is enabled or disabled. A snapshot is only a representation of a table during a window of time. The amount of time the snapshot operation will take to reach each Region Server may vary from a few seconds to a minute, depending on the resource load and speed of the hardware or network, among other factors. There is also no way to know whether a given insert or update is in memory or has been flushed.

1.8.3. Listing Snapshots

List all snapshots taken (by printing the names and relative information).

$ ./bin/hbase shell
hbase> list_snapshots
        

1.8.4. Deleting Snapshots

You can remove a snapshot, and the files retained for that snapshot will be removed if no longer needed.

$ ./bin/hbase shell
hbase> delete_snapshot 'myTableSnapshot-122112'
        

1.8.5. Clone a table from snapshot

From a snapshot you can create a new table (clone operation) with the same data that you had when the snapshot was taken. The clone operation, doesn't involve data copies, and a change to the cloned table doesn't impact the snapshot or the original table.

$ ./bin/hbase shell
hbase> clone_snapshot 'myTableSnapshot-122112', 'myNewTestTable'
        

1.8.6. Restore a snapshot

The restore operation requires the table to be disabled, and the table will be restored to the state at the time when the snapshot was taken, changing both data and schema if required.

$ ./bin/hbase shell
hbase> disable 'myTable'
hbase> restore_snapshot 'myTableSnapshot-122112'
        

Note

Since Replication works at log level and snapshots at file-system level, after a restore, the replicas will be in a different state from the master. If you want to use restore, you need to stop replication and redo the bootstrap.

In case of partial data-loss due to misbehaving client, instead of a full restore that requires the table to be disabled, you can clone the table from the snapshot and use a Map-Reduce job to copy the data that you need, from the clone to the main one.

1.8.7. Snapshots operations and ACLs

If you are using security with the AccessController Coprocessor (See ???), only a global administrator can take, clone, or restore a snapshot, and these actions do not capture the ACL rights. This means that restoring a table preserves the ACL rights of the existing table, while cloning a table creates a new table that has no ACL rights until the administrator adds them.

1.8.8. Export to another cluster

The ExportSnapshot tool copies all the data related to a snapshot (hfiles, logs, snapshot metadata) to another cluster. The tool executes a Map-Reduce job, similar to distcp, to copy files between the two clusters, and since it works at file-system level the hbase cluster does not have to be online.

To copy a snapshot called MySnapshot to an HBase cluster srv2 (hdfs:///srv2:8082/hbase) using 16 mappers:

$ bin/hbase class org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase -mappers 16

Limiting Bandwidth Consumption. You can limit the bandwidth consumption when exporting a snapshot, by specifying the -bandwidth parameter, which expects an integer representing megabytes per second. The following example limits the above example to 200 MB/sec.

$ bin/hbase class org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase -mappers 16 -bandwidth 200

1.9. Capacity Planning and Region Sizing

There are several considerations when planning the capacity for an HBase cluster and performing the initial configuration. Start with a solid understanding of how HBase handles data internally.

1.9.1. Node count and hardware/VM configuration

1.9.1.1. Physical data size

Physical data size on disk is distinct from logical size of your data and is affected by the following:

  • Increased by HBase overhead

    • See ??? and ???. At least 24 bytes per key-value (cell), can be more. Small keys/values means more relative overhead.

    • KeyValue instances are aggregated into blocks, which are indexed. Indexes also have to be stored. Blocksize is configurable on a per-ColumnFamily basis. See ???.

  • Decreased by ??? and data block encoding, depending on data. See also this thread. You might want to test what compression and encoding (if any) make sense for your data.

  • Increased by size of region server ??? (usually fixed and negligible - less than half of RS memory size, per RS).

  • Increased by HDFS replication - usually x3.

Aside from the disk space necessary to store the data, one RS may not be able to serve arbitrarily large amounts of data due to some practical limits on region count and size (see below).

1.9.1.2. Read/Write throughput

Number of nodes can also be driven by required thoughput for reads and/or writes. The throughput one can get per node depends a lot on data (esp. key/value sizes) and request patterns, as well as node and system configuration. Planning should be done for peak load if it is likely that the load would be the main driver of the increase of the node count. PerformanceEvaluation and ??? tools can be used to test single node or a test cluster.

For write, usually 5-15Mb/s per RS can be expected, since every region server has only one active WAL. There's no good estimate for reads, as it depends vastly on data, requests, and cache hit rate. ??? might be helpful.

1.9.1.3. JVM GC limitations

RS cannot currently utilize very large heap due to cost of GC. There's also no good way of running multiple RS-es per server (other than running several VMs per machine). Thus, ~20-24Gb or less memory dedicated to one RS is recommended. GC tuning is required for large heap sizes. See ???, ??? and elsewhere (TODO: where?)

1.9.2. Determining region count and size

Generally less regions makes for a smoother running cluster (you can always manually split the big regions later (if necessary) to spread the data, or request load, over the cluster); 20-200 regions per RS is a reasonable range. The number of regions cannot be configured directly (unless you go for fully ???); adjust the region size to achieve the target region size given table size.

When configuring regions for multiple tables, note that most region settings can be set on a per-table basis via HTableDescriptor, as well as shell commands. These settings will override the ones in hbase-site.xml. That is useful if your tables have different workloads/use cases.

Also note that in the discussion of region sizes here, HDFS replication factor is not (and should not be) taken into account, whereas other factors above should be. So, if your data is compressed and replicated 3 ways by HDFS, "9 Gb region" means 9 Gb of compressed data. HDFS replication factor only affects your disk usage and is invisible to most HBase code.

1.9.2.1. Number of regions per RS - upper bound

In production scenarios, where you have a lot of data, you are normally concerned with the maximum number of regions you can have per server. ??? has technical discussion on the subject; in short, maximum number of regions is mostly determined by memstore memory usage. Each region has its own memstores; these grow up to a configurable size; usually in 128-256Mb range, see ???. There's one memstore per column family (so there's only one per region if there's one CF in the table). RS dedicates some fraction of total memory (see ???) to region memstores. If this memory is exceeded (too much memstore usage), undesirable consequences such as unresponsive server, or later compaction storms, can result. Thus, a good starting point for the number of regions per RS (assuming one table) is:

(RS memory)*(total memstore fraction)/((memstore size)*(# column families))

E.g. if RS has 16Gb RAM, with default settings, it is 16384*0.4/128 ~ 51 regions per RS is a starting point. The formula can be extended to multiple tables; if they all have the same configuration, just use total number of families.

This number can be adjusted; the formula above assumes all your regions are filled at approximately the same rate. If only a fraction of your regions are going to be actively written to, you can divide the result by that fraction to get a larger region count. Then, even if all regions are written to, all region memstores are not filled evenly, and eventually jitter appears even if they are (due to limited number of concurrent flushes). Thus, one can have as many as 2-3 times more regions than the starting point; however, increased numbers carry increased risk.

For write-heavy workload, memstore fraction can be increased in configuration at the expense of block cache; this will also allow one to have more regions.

1.9.2.2. Number of regions per RS - lower bound

HBase scales by having regions across many servers. Thus if you have 2 regions for 16GB data, on a 20 node machine your data will be concentrated on just a few machines - nearly the entire cluster will be idle. This really can't be stressed enough, since a common problem is loading 200MB data into HBase and then wondering why your awesome 10 node cluster isn't doing anything.

On the other hand, if you have a very large amount of data, you may also want to go for a larger number of regions to avoid having regions that are too large.

1.9.2.3. Maximum region size

For large tables in production scenarios, maximum region size is mostly limited by compactions - very large compactions, esp. major, can degrade cluster performance. Currently, the recommended maximum region size is 10-20Gb, and 5-10Gb is optimal. For older 0.90.x codebase, the upper-bound of regionsize is about 4Gb, with a default of 256Mb.

The size at which the region is split into two is generally configured via ???; for details, see ???.

If you cannot estimate the size of your tables well, when starting off, it's probably best to stick to the default region size, perhaps going smaller for hot tables (or manually split hot regions to spread the load over the cluster), or go with larger region sizes if your cell sizes tend to be largish (100k and up).

In HBase 0.98, experimental stripe compactions feature was added that would allow for larger regions, especially for log data. See ???.

1.9.2.4. Total data size per region server

According to above numbers for region size and number of regions per region server, in an optimistic estimate 10 GB x 100 regions per RS will give up to 1TB served per region server, which is in line with some of the reported multi-PB use cases. However, it is important to think about the data vs cache size ratio at the RS level. With 1TB of data per server and 10 GB block cache, only 1% of the data will be cached, which may barely cover all block indices.

1.9.3. Initial configuration and tuning

First, see ???. Note that some configurations, more than others, depend on specific scenarios. Pay special attention to:

  • ??? - request handler thread count, vital for high-throughput workloads.

  • ??? - the blocking number of WAL files depends on your memstore configuration and should be set accordingly to prevent potential blocking when doing high volume of writes.

Then, there are some considerations when setting up your cluster and tables.

1.9.3.1. Compactions

Depending on read/write volume and latency requirements, optimal compaction settings may be different. See ??? for some details.

When provisioning for large data sizes, however, it's good to keep in mind that compactions can affect write throughput. Thus, for write-intensive workloads, you may opt for less frequent compactions and more store files per regions. Minimum number of files for compactions (hbase.hstore.compaction.min) can be set to higher value; ??? should also be increased, as more files might accumulate in such case. You may also consider manually managing compactions: ???

1.9.3.2. Pre-splitting the table

Based on the target number of the regions per RS (see above) and number of RSes, one can pre-split the table at creation time. This would both avoid some costly splitting as the table starts to fill up, and ensure that the table starts out already distributed across many servers.

If the table is expected to grow large enough to justify that, at least one region per RS should be created. It is not recommended to split immediately into the full target number of regions (e.g. 50 * number of RSes), but a low intermediate value can be chosen. For multiple tables, it is recommended to be conservative with presplitting (e.g. pre-split 1 region per RS at most), especially if you don't know how much each table will grow. If you split too much, you may end up with too many regions, with some tables having too many small regions.

For pre-splitting howto, see ???.

1.10. Table Rename

In versions 0.90.x of hbase and earlier, we had a simple script that would rename the hdfs table directory and then do an edit of the hbase:meta table replacing all mentions of the old table name with the new. The script was called ./bin/rename_table.rb. The script was deprecated and removed mostly because it was unmaintained and the operation performed by the script was brutal.

As of hbase 0.94.x, you can use the snapshot facility renaming a table. Here is how you would do it using the hbase shell:

hbase shell> disable 'tableName'
hbase shell> snapshot 'tableName', 'tableSnapshot'
hbase shell> clone_snapshot 'tableSnapshot', 'newTableName'
hbase shell> delete_snapshot 'tableSnapshot'
hbase shell> drop 'tableName'

or in code it would be as follows:

void rename(HBaseAdmin admin, String oldTableName, String newTableName) {
  String snapshotName = randomName();
  admin.disableTable(oldTableName);
  admin.snapshot(snapshotName, oldTableName);
  admin.cloneSnapshot(snapshotName, newTableName);
  admin.deleteSnapshot(snapshotName);
  admin.deleteTable(oldTableName);
}


[1] See this blog post for more details.

comments powered by Disqus