Chapter 1. Apache HBase Performance Tuning

Table of Contents

1.1. Operating System
1.1.1. Memory
1.1.2. 64-bit
1.1.3. Swapping
1.2. Network
1.2.1. Single Switch
1.2.2. Multiple Switches
1.2.3. Multiple Racks
1.2.4. Network Interfaces
1.3. Java
1.3.1. The Garbage Collector and Apache HBase
1.4. HBase Configurations
1.4.1. Number of Regions
1.4.2. Managing Compactions
1.4.3. hbase.regionserver.handler.count
1.4.4. hfile.block.cache.size
1.4.7. hbase.hstore.blockingStoreFiles
1.4.8. hbase.hregion.memstore.block.multiplier
1.4.9. hbase.regionserver.checksum.verify
1.5. ZooKeeper
1.6. Schema Design
1.6.1. Number of Column Families
1.6.2. Key and Attribute Lengths
1.6.3. Table RegionSize
1.6.4. Bloom Filters
1.6.5. ColumnFamily BlockSize
1.6.6. In-Memory ColumnFamilies
1.6.7. Compression
1.7. HBase General Patterns
1.7.1. Constants
1.8. Writing to HBase
1.8.1. Batch Loading
1.8.2. Table Creation: Pre-Creating Regions
1.8.3. Table Creation: Deferred Log Flush
1.8.4. HBase Client: AutoFlush
1.8.5. HBase Client: Turn off WAL on Puts
1.8.6. HBase Client: Group Puts by RegionServer
1.8.7. MapReduce: Skip The Reducer
1.8.8. Anti-Pattern: One Hot Region
1.9. Reading from HBase
1.9.1. Scan Caching
1.9.2. Scan Attribute Selection
1.9.3. MapReduce - Input Splits
1.9.4. Close ResultScanners
1.9.5. Block Cache
1.9.6. Optimal Loading of Row Keys
1.9.7. Concurrency: Monitor Data Spread
1.9.8. Bloom Filters
1.10. Deleting from HBase
1.10.1. Using HBase Tables as Queues
1.10.2. Delete RPC Behavior
1.11. HDFS
1.11.1. Current Issues With Low-Latency Reads
1.11.2. Leveraging local data
1.11.3. Performance Comparisons of HBase vs. HDFS
1.12. Amazon EC2
1.13. Collocating HBase and MapReduce
1.14. Case Studies

1.1. Operating System

1.1.1. Memory

RAM, RAM, RAM. Don't starve HBase.

1.1.2. 64-bit

Use a 64-bit platform (and 64-bit JVM).

1.1.3. Swapping

Watch out for swapping. Set swappiness to 0.

1.2. Network

Perhaps the most important factor in avoiding network issues degrading Hadoop and HBbase performance is the switching hardware that is used, decisions made early in the scope of the project can cause major problems when you double or triple the size of your cluster (or more).

Important items to consider:

  • Switching capacity of the device
  • Number of systems connected
  • Uplink capacity

1.2.1. Single Switch

The single most important factor in this configuration is that the switching capacity of the hardware is capable of handling the traffic which can be generated by all systems connected to the switch. Some lower priced commodity hardware can have a slower switching capacity than could be utilized by a full switch.

1.2.2. Multiple Switches

Multiple switches are a potential pitfall in the architecture. The most common configuration of lower priced hardware is a simple 1Gbps uplink from one switch to another. This often overlooked pinch point can easily become a bottleneck for cluster communication. Especially with MapReduce jobs that are both reading and writing a lot of data the communication across this uplink could be saturated.

Mitigation of this issue is fairly simple and can be accomplished in multiple ways:

  • Use appropriate hardware for the scale of the cluster which you're attempting to build.
  • Use larger single switch configurations i.e. single 48 port as opposed to 2x 24 port
  • Configure port trunking for uplinks to utilize multiple interfaces to increase cross switch bandwidth.

1.2.3. Multiple Racks

Multiple rack configurations carry the same potential issues as multiple switches, and can suffer performance degradation from two main areas:

  • Poor switch capacity performance
  • Insufficient uplink to another rack

If the the switches in your rack have appropriate switching capacity to handle all the hosts at full speed, the next most likely issue will be caused by homing more of your cluster across racks. The easiest way to avoid issues when spanning multiple racks is to use port trunking to create a bonded uplink to other racks. The downside of this method however, is in the overhead of ports that could potentially be used. An example of this is, creating an 8Gbps port channel from rack A to rack B, using 8 of your 24 ports to communicate between racks gives you a poor ROI, using too few however can mean you're not getting the most out of your cluster.

Using 10Gbe links between racks will greatly increase performance, and assuming your switches support a 10Gbe uplink or allow for an expansion card will allow you to save your ports for machines as opposed to uplinks.

1.2.4. Network Interfaces

Are all the network interfaces functioning correctly? Are you sure? See the Troubleshooting Case Study in ???.

1.3. Java

1.3.1. The Garbage Collector and Apache HBase Long GC pauses

In his presentation, Avoiding Full GCs with MemStore-Local Allocation Buffers, Todd Lipcon describes two cases of stop-the-world garbage collections common in HBase, especially during loading; CMS failure modes and old generation heap fragmentation brought. To address the first, start the CMS earlier than default by adding -XX:CMSInitiatingOccupancyFraction and setting it down from defaults. Start at 60 or 70 percent (The lower you bring down the threshold, the more GCing is done, the more CPU used). To address the second fragmentation issue, Todd added an experimental facility, , that must be explicitly enabled in Apache HBase 0.90.x (Its defaulted to be on in Apache 0.92.x HBase). See hbase.hregion.memstore.mslab.enabled to true in your Configuration. See the cited slides for background and detail[1]. Be aware that when enabled, each MemStore instance will occupy at least an MSLAB instance of memory. If you have thousands of regions or lots of regions each with many column families, this allocation of MSLAB may be responsible for a good portion of your heap allocation and in an extreme case cause you to OOME. Disable MSLAB in this case, or lower the amount of memory it uses or float less regions per server.

For more information about GC logs, see ???.

1.4. HBase Configurations

See ???.

1.4.1. Number of Regions

The number of regions for an HBase table is driven by the ???. Also, see the architecture section on ???

1.4.2. Managing Compactions

For larger systems, managing compactions and splits may be something you want to consider.

1.4.3. hbase.regionserver.handler.count

See ???.

1.4.4. hfile.block.cache.size

See ???. A memory setting for the RegionServer process.


See ???. This memory setting is often adjusted for the RegionServer process depending on needs.


See ???. This memory setting is often adjusted for the RegionServer process depending on needs.

1.4.7. hbase.hstore.blockingStoreFiles

See ???. If there is blocking in the RegionServer logs, increasing this can help.

1.4.8. hbase.hregion.memstore.block.multiplier

See ???. If there is enough RAM, increasing this can help.

1.4.9. hbase.regionserver.checksum.verify

Have HBase write the checksum into the datablock and save having to do the checksum seek whenever you read.

See Section 1.4.9, “hbase.regionserver.checksum.verify, ??? and ??? For more information see the release note on HBASE-5074 support checksums in HBase block cache.

1.5. ZooKeeper

See ??? for information on configuring ZooKeeper, and see the part about having a dedicated disk.

1.6. Schema Design

1.6.1. Number of Column Families

See ???.

1.6.2. Key and Attribute Lengths

See ???. See also Section, “However...” for compression caveats.

1.6.3. Table RegionSize

The regionsize can be set on a per-table basis via setFileSize on HTableDescriptor in the event where certain tables require different regionsizes than the configured default regionsize.

See Section 1.4.1, “Number of Regions” for more information.

1.6.4. Bloom Filters

Bloom Filters can be enabled per-ColumnFamily. Use HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) to enable blooms per Column Family. Default = NONE for no bloom filters. If ROW, the hash of the row will be added to the bloom on each insert. If ROWCOL, the hash of the row + column family + column family qualifier will be added to the bloom on each key insert.

See HColumnDescriptor and Section 1.9.8, “Bloom Filters” for more information or this answer up in quora, How are bloom filters used in HBase?.

1.6.5. ColumnFamily BlockSize

The blocksize can be configured for each ColumnFamily in a table, and this defaults to 64k. Larger cell values require larger blocksizes. There is an inverse relationship between blocksize and the resulting StoreFile indexes (i.e., if the blocksize is doubled then the resulting indexes should be roughly halved).

See HColumnDescriptor and ???for more information.

1.6.6. In-Memory ColumnFamilies

ColumnFamilies can optionally be defined as in-memory. Data is still persisted to disk, just like any other ColumnFamily. In-memory blocks have the highest priority in the ???, but it is not a guarantee that the entire table will be in memory.

See HColumnDescriptor for more information.

1.6.7. Compression

Production systems should use compression with their ColumnFamily definitions. See ??? for more information. However...

Compression deflates data on disk. When it's in-memory (e.g., in the MemStore) or on the wire (e.g., transferring between RegionServer and Client) it's inflated. So while using ColumnFamily compression is a best practice, but it's not going to completely eliminate the impact of over-sized Keys, over-sized ColumnFamily names, or over-sized Column names.

See ??? on for schema design tips, and ??? for more information on HBase stores data internally.

1.7. HBase General Patterns

1.7.1. Constants

When people get started with HBase they have a tendency to write code that looks like this:

Get get = new Get(rowkey);
Result r = htable.get(get);
byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr"));  // returns current version of value

But especially when inside loops (and MapReduce jobs), converting the columnFamily and column-names to byte-arrays repeatedly is surprisingly expensive. It's better to use constants for the byte-arrays, like this:

public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR = "attr".getBytes();
Get get = new Get(rowkey);
Result r = htable.get(get);
byte[] b = r.getValue(CF, ATTR);  // returns current version of value

1.8. Writing to HBase

1.8.1. Batch Loading

Use the bulk load tool if you can. See ???. Otherwise, pay attention to the below.

1.8.2.  Table Creation: Pre-Creating Regions

Tables in HBase are initially created with one region by default. For bulk imports, this means that all clients will write to the same region until it is large enough to split and become distributed across the cluster. A useful pattern to speed up the bulk import process is to pre-create empty regions. Be somewhat conservative in this, because too-many regions can actually degrade performance.

There are two different approaches to pre-creating splits. The first approach is to rely on the default HBaseAdmin strategy (which is implemented in Bytes.split)...

byte[] startKey = ...;   	// your lowest keuy
byte[] endKey = ...;   		// your highest key
int numberOfRegions = ...;	// # of regions to create
admin.createTable(table, startKey, endKey, numberOfRegions);

And the other approach is to define the splits yourself...

byte[][] splits = ...;   // create your own splits
admin.createTable(table, splits);

See ??? for issues related to understanding your keyspace and pre-creating regions.

1.8.3.  Table Creation: Deferred Log Flush

The default behavior for Puts using the Write Ahead Log (WAL) is that HLog edits will be written immediately. If deferred log flush is used, WAL edits are kept in memory until the flush period. The benefit is aggregated and asynchronous HLog- writes, but the potential downside is that if the RegionServer goes down the yet-to-be-flushed edits are lost. This is safer, however, than not using WAL at all with Puts.

Deferred log flush can be configured on tables via HTableDescriptor. The default value of hbase.regionserver.optionallogflushinterval is 1000ms.

1.8.4. HBase Client: AutoFlush

When performing a lot of Puts, make sure that setAutoFlush is set to false on your HTable instance. Otherwise, the Puts will be sent one at a time to the RegionServer. Puts added via htable.add(Put) and htable.add( <List> Put) wind up in the same write buffer. If autoFlush = false, these messages are not sent until the write-buffer is filled. To explicitly flush the messages, call flushCommits. Calling close on the HTable instance will invoke flushCommits.

1.8.5. HBase Client: Turn off WAL on Puts

A frequently discussed option for increasing throughput on Puts is to call writeToWAL(false). Turning this off means that the RegionServer will not write the Put to the Write Ahead Log, only into the memstore, HOWEVER the consequence is that if there is a RegionServer failure there will be data loss. If writeToWAL(false) is used, do so with extreme caution. You may find in actuality that it makes little difference if your load is well distributed across the cluster.

In general, it is best to use WAL for Puts, and where loading throughput is a concern to use bulk loading techniques instead.

1.8.6. HBase Client: Group Puts by RegionServer

In addition to using the writeBuffer, grouping Puts by RegionServer can reduce the number of client RPC calls per writeBuffer flush. There is a utility HTableUtil currently on TRUNK that does this, but you can either copy that or implement your own verison for those still on 0.90.x or earlier.

1.8.7. MapReduce: Skip The Reducer

When writing a lot of data to an HBase table from a MR job (e.g., with TableOutputFormat), and specifically where Puts are being emitted from the Mapper, skip the Reducer step. When a Reducer step is used, all of the output (Puts) from the Mapper will get spooled to disk, then sorted/shuffled to other Reducers that will most likely be off-node. It's far more efficient to just write directly to HBase.

For summary jobs where HBase is used as a source and a sink, then writes will be coming from the Reducer step (e.g., summarize values then write out result). This is a different processing problem than from the the above case.

1.8.8. Anti-Pattern: One Hot Region

If all your data is being written to one region at a time, then re-read the section on processing timeseries data.

Also, if you are pre-splitting regions and all your data is still winding up in a single region even though your keys aren't monotonically increasing, confirm that your keyspace actually works with the split strategy. There are a variety of reasons that regions may appear "well split" but won't work with your data. As the HBase client communicates directly with the RegionServers, this can be obtained via HTable.getRegionLocation.

See Section 1.8.2, “ Table Creation: Pre-Creating Regions ”, as well as Section 1.4, “HBase Configurations”

1.9. Reading from HBase

1.9.1. Scan Caching

If HBase is used as an input source for a MapReduce job, for example, make sure that the input Scan instance to the MapReduce job has setCaching set to something greater than the default (which is 1). Using the default value means that the map-task will make call back to the region-server for every record processed. Setting this value to 500, for example, will transfer 500 rows at a time to the client to be processed. There is a cost/benefit to have the cache value be large because it costs more in memory for both client and RegionServer, so bigger isn't always better. Scan Caching in MapReduce Jobs

Scan settings in MapReduce jobs deserve special attention. Timeouts can result (e.g., UnknownScannerException) in Map tasks if it takes longer to process a batch of records before the client goes back to the RegionServer for the next set of data. This problem can occur because there is non-trivial processing occuring per row. If you process rows quickly, set caching higher. If you process rows more slowly (e.g., lots of transformations per row, writes), then set caching lower.

Timeouts can also happen in a non-MapReduce use case (i.e., single threaded HBase client doing a Scan), but the processing that is often performed in MapReduce jobs tends to exacerbate this issue.

1.9.2. Scan Attribute Selection

Whenever a Scan is used to process large numbers of rows (and especially when used as a MapReduce source), be aware of which attributes are selected. If scan.addFamily is called then all of the attributes in the specified ColumnFamily will be returned to the client. If only a small number of the available attributes are to be processed, then only those attributes should be specified in the input scan because attribute over-selection is a non-trivial performance penalty over large datasets.

1.9.3. MapReduce - Input Splits

For MapReduce jobs that use HBase tables as a source, if there a pattern where the "slow" map tasks seem to have the same Input Split (i.e., the RegionServer serving the data), see the Troubleshooting Case Study in ???.

1.9.4. Close ResultScanners

This isn't so much about improving performance but rather avoiding performance problems. If you forget to close ResultScanners you can cause problems on the RegionServers. Always have ResultScanner processing enclosed in try/catch blocks...

Scan scan = new Scan();
// set attrs...
ResultScanner rs = htable.getScanner(scan);
try {
  for (Result r =; r != null; r = {
  // process result...
} finally {
  rs.close();  // always close the ResultScanner!

1.9.5. Block Cache

Scan instances can be set to use the block cache in the RegionServer via the setCacheBlocks method. For input Scans to MapReduce jobs, this should be false. For frequently accessed rows, it is advisable to use the block cache.

1.9.6. Optimal Loading of Row Keys

When performing a table scan where only the row keys are needed (no families, qualifiers, values or timestamps), add a FilterList with a MUST_PASS_ALL operator to the scanner using setFilter. The filter list should include both a FirstKeyOnlyFilter and a KeyOnlyFilter. Using this filter combination will result in a worst case scenario of a RegionServer reading a single value from disk and minimal network traffic to the client for a single row.

1.9.7. Concurrency: Monitor Data Spread

When performing a high number of concurrent reads, monitor the data spread of the target tables. If the target table(s) have too few regions then the reads could likely be served from too few nodes.

See Section 1.8.2, “ Table Creation: Pre-Creating Regions ”, as well as Section 1.4, “HBase Configurations”

1.9.8. Bloom Filters

Enabling Bloom Filters can save your having to go to disk and can help improve read latencys.

Bloom filters were developed over in HBase-1200 Add bloomfilters.[2][3]

See also Section 1.6.4, “Bloom Filters”. Bloom StoreFile footprint

Bloom filters add an entry to the StoreFile general FileInfo data structure and then two extra entries to the StoreFile metadata section. BloomFilter in the StoreFile FileInfo data structure

FileInfo has a BLOOM_FILTER_TYPE entry which is set to NONE, ROW or ROWCOL. BloomFilter entries in StoreFile metadata

BLOOM_FILTER_META holds Bloom Size, Hash Function used, etc. Its small in size and is cached on StoreFile.Reader load

BLOOM_FILTER_DATA is the actual bloomfilter data. Obtained on-demand. Stored in the LRU cache, if it is enabled (Its enabled by default). Bloom Filter Configuration io.hfile.bloom.enabled global kill switch

io.hfile.bloom.enabled in Configuration serves as the kill switch in case something goes wrong. Default = true. io.hfile.bloom.error.rate

io.hfile.bloom.error.rate = average false positive rate. Default = 1%. Decrease rate by ½ (e.g. to .5%) == +1 bit per bloom entry. io.hfile.bloom.max.fold

io.hfile.bloom.max.fold = guaranteed minimum fold rate. Most people should leave this alone. Default = 7, or can collapse to at least 1/128th of original size. See the Development Process section of the document BloomFilters in HBase for more on what this option means.

1.10. Deleting from HBase

1.10.1. Using HBase Tables as Queues

HBase tables are sometimes used as queues. In this case, special care must be taken to regularly perform major compactions on tables used in this manner. As is documented in ???, marking rows as deleted creates additional StoreFiles which then need to be processed on reads. Tombstones only get cleaned up with major compactions.

See also ??? and HBaseAdmin.majorCompact.

1.10.2. Delete RPC Behavior

Be aware that htable.delete(Delete) doesn't use the writeBuffer. It will execute an RegionServer RPC with each invocation. For a large number of deletes, consider htable.delete(List).


1.11. HDFS

Because HBase runs on ??? it is important to understand how it works and how it affects HBase.

1.11.1. Current Issues With Low-Latency Reads

The original use-case for HDFS was batch processing. As such, there low-latency reads were historically not a priority. With the increased adoption of Apache HBase this is changing, and several improvements are already in development. See the Umbrella Jira Ticket for HDFS Improvements for HBase.

1.11.2. Leveraging local data

Since Hadoop 1.0.0 (also 0.22.1, 0.23.1, CDH3u3 and HDP 1.0) via HDFS-2246, it is possible for the DFSClient to take a "short circuit" and read directly from disk instead of going through the DataNode when the data is local. What this means for HBase is that the RegionServers can read directly off their machine's disks instead of having to open a socket to talk to the DataNode, the former being generally much faster[4]. Also see HBase, mail # dev - read short circuit thread for more discussion around short circuit reads.

To enable "short circuit" reads, you must set two configurations. First, the hdfs-site.xml needs to be amended. Set the property dfs.block.local-path-access.user to be the only user that can use the shortcut. This has to be the user that started HBase. Then in hbase-site.xml, set to be true

For optimal performance when short-circuit reads are enabled, it is recommended that HDFS checksums are disabled. To maintain data integrity with HDFS checksums disabled, HBase can be configured to write its own checksums into its datablocks and verify against these. See Section 1.4.9, “hbase.regionserver.checksum.verify. When both local short-circuit reads and hbase level checksums are enabled, you SHOULD NOT disable configuration parameter "", which will cause skipping checksum on non-hfile reads. HBase already manages that setting under the covers.

The DataNodes need to be restarted in order to pick up the new configuration. Be aware that if a process started under another username than the one configured here also has the shortcircuit enabled, it will get an Exception regarding an unauthorized access but the data will still be read.

1.11.3. Performance Comparisons of HBase vs. HDFS

A fairly common question on the dist-list is why HBase isn't as performant as HDFS files in a batch context (e.g., as a MapReduce source or sink). The short answer is that HBase is doing a lot more than HDFS (e.g., reading the KeyValues, returning the most current row or specified timestamps, etc.), and as such HBase is 4-5 times slower than HDFS in this processing context. There is room for improvement and this gap will, over time, be reduced, but HDFS will always be faster in this use-case.

1.12. Amazon EC2

Performance questions are common on Amazon EC2 environments because it is a shared environment. You will not see the same throughput as a dedicated server. In terms of running tests on EC2, run them several times for the same reason (i.e., it's a shared environment and you don't know what else is happening on the server).

If you are running on EC2 and post performance questions on the dist-list, please state this fact up-front that because EC2 issues are practically a separate class of performance issues.

1.13. Collocating HBase and MapReduce

It is often recommended to have different clusters for HBase and MapReduce. A better qualification of this is: don't collocate a HBase that serves live requests with a heavy MR workload. OLTP and OLAP-optimized systems have conflicting requirements and one will lose to the other, usually the former. For example, short latency-sensitive disk reads will have to wait in line behind longer reads that are trying to squeeze out as much throughput as possible. MR jobs that write to HBase will also generate flushes and compactions, which will in turn invalidate blocks in the ???.

If you need to process the data from your live HBase cluster in MR, you can ship the deltas with ??? or use replication to get the new data in real time on the OLAP cluster. In the worst case, if you really need to collocate both, set MR to use less Map and Reduce slots than you'd normally configure, possibly just one.

When HBase is used for OLAP operations, it's preferable to set it up in a hardened way like configuring the ZooKeeper session timeout higher and giving more memory to the MemStores (the argument being that the Block Cache won't be used much since the workloads are usually long scans).

1.14. Case Studies

For Performance and Troubleshooting Case Studies, see ???.

[1] The latest jvms do better regards fragmentation so make sure you are running a recent release. Read down in the message, Identifying concurrent mode failures caused by fragmentation.

[2] For description of the development process -- why static blooms rather than dynamic -- and for an overview of the unique properties that pertain to blooms in HBase, as well as possible future directions, see the Development Process section of the document BloomFilters in HBase attached to HBase-1200.

[3] The bloom filters described here are actually version two of blooms in HBase. In versions up to 0.19.x, HBase had a dynamic bloom option based on work done by the European Commission One-Lab Project 034819. The core of the HBase bloom work was later pulled up into Hadoop to implement Version 1 of HBase blooms never worked that well. Version 2 is a rewrite from scratch though again it starts with the one-lab work.

[4] See JD's Performance Talk

comments powered by Disqus