Chapter 1. Apache HBase Coprocessors

Table of Contents

1.1. Coprocessor Framework
1.2. Examples
1.3. Building A Coprocessor
1.3.1. Load from Configuration
1.3.2. Load from the HBase Shell
1.4. Check the Status of a Coprocessor
1.5. Status of Coprocessors in HBase

HBase coprocessors are modeled after the coprocessors which are part of Google's BigTable[1]. Coprocessors function in a similar way to Linux kernel modules. They provide a way to run server-level code against locally-stored data. The functionality they provide is very powerful, but also carries great risk and can have adverse effects on the system, at the level of the operating system. The information in this chapter is primarily sourced and heavily reused from Mingjie Lai's blog post at https://blogs.apache.org/hbase/entry/coprocessor_introduction.

Coprocessors are not designed to be used by end users of HBase, but by HBase developers who need to add specialized functionality to HBase. One example of the use of coprocessors is pluggable compaction and scan policies, which are provided as coprocessors in HBASE-6427.

1.1. Coprocessor Framework

The implementation of HBase coprocessors diverges from the BigTable implementation. The HBase framework provides a library and runtime environment for executing user code within the HBase region server and master processes.

The framework API is provided in the coprocessor package.

Two different types of coprocessors are provided by the framework, based on their scope.

Types of Coprocessors

System Coprocessors

System coprocessors are loaded globally on all tables and regions hosted by a region server.

Table Coprocessors

You can specify which coprocessors should be loaded on all regions for a table on a per-table basis.

The framework provides two different aspects of extensions as well: observers and endpoints.

Observers

Observers are analogous to triggers in conventional databases. They allow you to insert user code by overriding upcall methods provided by the coprocessor framework. Callback functions are executed from core HBase code when events occur. Callbacks are handled by the framework, and the coprocessor itself only needs to insert the extended or alternate functionality.

Provided Observer Interfaces

RegionObserver

A RegionObserver provides hooks for data manipulation events, such as Get, Put, Delete, Scan. An instance of a RegionObserver coprocessor exists for each table region. The scope of the observations a RegionObserver can make is constrained to that region.

RegionServerObserver

A RegionServerObserver provides for operations related to the RegionServer, such as stopping the RegionServer and performing operations before or after merges, commits, or rollbacks.

WALObserver

A WALObserver provides hooks for operations related to the write-ahead log (WAL). You can observe or intercept WAL writing and reconstruction events. A WALObserver runs in the context of WAL processing. A single WALObserver exists on a single region server.

MasterObserver

A MasterObserver provides hooks for DDL-type operation, such as create, delete, modify table. The MasterObserver runs within the context of the HBase master.

More than one observer of a given type can be loaded at once. Multiple observers are chained to execute sequentially by order of assigned priority. Nothing prevents a coprocessor implementor from communicating internally among its installed observers.

An observer of a higher priority can preempt lower-priority observers by throwing an IOException or a subclass of IOException.

Endpoints (HBase 0.96.x and later)

The implementation for endpoints changed significantly in HBase 0.96.x due to the introduction of protocol buffers (protobufs) (HBASE-5488). If you created endpoints before 0.96.x, you will need to rewrite them. Endpoints are now defined and callable as protobuf services, rather than endpoint invocations passed through as Writable blobs

Dynamic RPC endpoints resemble stored procedures. An endpoint can be invoked at any time from the client. When it is invoked, it is executed remotely at the target region or regions, and results of the executions are returned to the client.

The endpoint implementation is installed on the server and is invoked using HBase RPC. The client library provides convenience methods for invoking these dynamic interfaces.

An endpoint, like an observer, can communicate with any installed observers. This allows you to plug new features into HBase without modifying or recompiling HBase itself.

Steps to Implement an Endpoint

  • Define the coprocessor service and related messages in a .proto file

  • Run the protoc command to generate the code.

  • Write code to implement the following:

  • The client calls the new HTable.coprocessorService() methods to perform the endpoint RPCs.

For more information and examples, refer to the API documentation for the coprocessor package, as well as the included RowCount example in the /hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/ of the HBase source.

Endpoints (HBase 0.94.x and earlier)

Dynamic RPC endpoints resemble stored procedures. An endpoint can be invoked at any time from the client. When it is invoked, it is executed remotely at the target region or regions, and results of the executions are returned to the client.

The endpoint implementation is installed on the server and is invoked using HBase RPC. The client library provides convenience methods for invoking these dynamic interfaces.

An endpoint, like an observer, can communicate with any installed observers. This allows you to plug new features into HBase without modifying or recompiling HBase itself.

Steps to Implement an Endpoint

  • Server-Side

    • Create new protocol interface which extends CoprocessorProtocol.

    • Implement the Endpoint interface. The implementation will be loaded into and executed from the region context.

    • Extend the abstract class BaseEndpointCoprocessor. This convenience class hides some internal details that the implementer does not need to be concerned about, ˆ such as coprocessor framework class loading.

  • Client-Side

    Endpoint can be invoked by two new HBase client APIs:

    • HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row) for executing against a single region

    • HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable) for executing over a range of regions

1.2. Examples

An example of an observer is included in hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java. Several endpoint examples are included in the same directory.

1.3. Building A Coprocessor

Before you can build a processor, it must be developed, compiled, and packaged in a JAR file. The next step is to configure the coprocessor framework to use your coprocessor. You can load the coprocessor from your HBase configuration, so that the coprocessor starts with HBase, or you can configure the coprocessor from the HBase shell, as a table attribute, so that it is loaded dynamically when the table is opened or reopened.

1.3.1. Load from Configuration

To configure a coprocessor to be loaded when HBase starts, modify the RegionServer's hbase-site.xml and configure one of the following properties, based on the type of observer you are configuring:

  • hbase.coprocessor.region.classesfor RegionObservers and Endpoints

  • hbase.coprocessor.wal.classesfor WALObservers

  • hbase.coprocessor.master.classesfor MasterObservers

Example 1.1. Example RegionObserver Configuration

In this example, one RegionObserver is configured for all the HBase tables.

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>          
        

If multiple classes are specified for loading, the class names must be comma-separated. The framework attempts to load all the configured classes using the default class loader. Therefore, the jar file must reside on the server-side HBase classpath.

Coprocessors which are loaded in this way will be active on all regions of all tables. These are the system coprocessor introduced earlier. The first listed coprocessors will be assigned the priority Coprocessor.Priority.SYSTEM. Each subsequent coprocessor in the list will have its priority value incremented by one (which reduces its priority, because priorities have the natural sort order of Integers).

When calling out to registered observers, the framework executes their callbacks methods in the sorted order of their priority. Ties are broken arbitrarily.

1.3.2. Load from the HBase Shell

You can load a coprocessor on a specific table via a table attribute. The following example will load the FooRegionObserver observer when table t1 is read or re-read.

Example 1.2. Load a Coprocessor On a Table Using HBase Shell

hbase(main):005:0>  alter 't1', METHOD => 'table_att', 
  'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.0730 seconds

hbase(main):006:0> describe 't1'
DESCRIPTION                                                        ENABLED                             
 {NAME => 't1', coprocessor$1 => 'hdfs:///foo.jar|com.foo.FooRegio false                               
 nObserver|1001|arg1=1,arg2=2', FAMILIES => [{NAME => 'c1', DATA_B                                     
 LOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE                                     
  => '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS =>                                      
 '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZ                                     
 E => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLO                                     
 CKCACHE => 'true'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE',                                     
  BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3'                                     
 , COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647'                                     
 , KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY                                      
 => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}                                         
1 row(s) in 0.0190 seconds
        

The coprocessor framework will try to read the class information from the coprocessor table attribute value. The value contains four pieces of information which are separated by the | character.

  • File path: The jar file containing the coprocessor implementation must be in a location where all region servers can read it. You could copy the file onto the local disk on each region server, but it is recommended to store it in HDFS.

  • Class name: The full class name of the coprocessor.

  • Priority: An integer. The framework will determine the execution sequence of all configured observers registered at the same hook using priorities. This field can be left blank. In that case the framework will assign a default priority value.

  • Arguments: This field is passed to the coprocessor implementation.

Example 1.3. Unload a Coprocessor From a Table Using HBase Shell

hbase(main):007:0> alter 't1', METHOD => 'table_att_unset', 
hbase(main):008:0*   NAME => 'coprocessor$1'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.1130 seconds

hbase(main):009:0> describe 't1'
DESCRIPTION                                                        ENABLED                             
 {NAME => 't1', FAMILIES => [{NAME => 'c1', DATA_BLOCK_ENCODING => false                               
  'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSION                                     
 S => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '214                                     
 7483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN                                     
 _MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true                                     
 '}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>                                      
 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION =>                                     
  'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_C                                     
 ELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCO                                     
 DE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}                                                         
1 row(s) in 0.0180 seconds          
        

Warning

There is no guarantee that the framework will load a given coprocessor successfully. For example, the shell command neither guarantees a jar file exists at a particular location nor verifies whether the given class is actually contained in the jar file.

1.4. Check the Status of a Coprocessor

To check the status of a coprocessor after it has been configured, use the status HBase Shell command.

hbase(main):020:0> status 'detailed'
version 0.92-tm-6
0 regionsInTransition
master coprocessors: []
1 live servers
    localhost:52761 1328082515520
        requestsPerSecond=3, numberOfOnlineRegions=3, usedHeapMB=32, maxHeapMB=995
        -ROOT-,,0
            numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, 
storefileIndexSizeMB=0, readRequestsCount=54, writeRequestsCount=1, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, 
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[]
        .META.,,1
            numberOfStores=1, numberOfStorefiles=0, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, 
storefileIndexSizeMB=0, readRequestsCount=97, writeRequestsCount=4, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, 
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[]
        t1,,1328082575190.c0491168a27620ffe653ec6c04c9b4d1.
            numberOfStores=2, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, 
storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, 
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, 
coprocessors=[AggregateImplementation]
0 dead servers      
    

1.5. Status of Coprocessors in HBase

Coprocessors and the coprocessor framework are evolving rapidly and work is ongoing on several different JIRAs.



comments powered by Disqus