Abstract

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. The system is centrally managed and allows for intelligent dynamic management. It uses a simple extensible data model that allows for online analytic applications.

Introduction

Flume is a distributed, reliable, and available service for efficiently moving large amounts of data soon after the data is produced. This release provides a scalable conduit to move data around a cluster as well as reliable logging.

The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS).

The system was designed with these four key goals in mind:

  • Reliability

  • Scalability

  • Manageability

  • Extensibility

This section provides a high-level overview of Flume’s architecture and describes how the four design goals are achieved.

Architecture

Flume’s architecture is simple, robust, and flexible. The main abstraction in Flume is a stream-oriented data flow. A data flow describes the way a single stream of data is transferred and processed from its point of generation to its eventual destination. Data flows are composed of logical nodes that can transform or aggregate the events they receive. Logical nodes are wired together in chains to form a data flow. The way in which they are wired is called the logical node’s configuration.

Controlling all this is the Flume Master, which is a separate service with knowledge of all the physical and logical nodes in a Flume installation. The Master assigns configurations to logical nodes, and is responsible for communicating configuration updates by the user to all logical nodes. In turn, the logical nodes periodically contact the master so they can share monitoring information and check for updates to their configuration.

architecture.png

The graph above shows a typical deployment of Flume that collects log data from a set of application servers. The deployment consists of a number of logical nodes, arranged into three tiers. The first tier is the agent tier. Agent nodes are typically installed on the machines that generate the logs and are your data’s initial point of contact with Flume. They forward data to the next tier of collector nodes, which aggregate the separate data flows and forward them to the final storage tier.

For example, the agents could be machines listening for syslog data or monitoring the logs of a service such as a web server or the Hadoop JobTracker. The agents produce streams of data that are sent to the collectors; the collectors then aggregate the streams into larger streams which can be written efficiently to a storage tier such as HDFS.

Logical nodes are a very flexible abstraction. Every logical node has just two components - a source and a sink. The source tells a logical node where to collect data, and the sink tells it where to send the data. The only difference between two logical nodes is how the source and sink are configured. Both source and sink can additionally be configured with decorators which perform some simple processing on data as it passes through. In the previous example, the collector and the agents are running the same node software. The Master assigns a configuration to each logical node at run-time - all components of a node’s configuration are instantiated dynamically at run-time, and therefore configurations can be changed many times throughout the lifetime of a Flume service without having to restart any Java processes or log into the machines themselves. In fact, logical nodes themselves can be created and deleted dynamically.

The source, sink, and optional decorators are a powerful set of primitives. Flume uses this architecture to provide per-flow data properties (for example durability guarantees, compression, or batching), or to compute event metadata, or even generate new events that are inserted into data flow. A logical node can also send data downstream to several logical nodes. This allows for multiple flows and each subflow can potentially be configured differently. For example, it’s possible to have one flow be a collection path, delivering data reliably to a persistent store, while another branch computes lightweight analytics to be delivered to an alerting system.

Reliability

Reliability, the ability to continue delivering events in the face of failures without losing data, is a vital feature of Flume. Large distributed systems can and do suffer partial failures in many ways - physical hardware can fail, resources such as network bandwidth or memory can become scarce, or software can crash or run slowly. Flume emphasizes fault-tolerance as a core design principle and keeps running and collecting data even when many components have failed.

Flume can guarantee that all data received by an agent node will eventually make it to the collector at the end of its flow as long as the agent node keeps running. That is, data can be reliably delivered to its eventual destination.

However, reliable delivery can be very resource intensive and is often a stronger guarantee than some data sources require. Therefore, Flume allows the user to specify, on a per-flow basis, the level of reliability required. There are three supported reliability levels:

  • End-to-end

  • Store on failure

  • Best effort

The end-to-end reliability level guarantees that once Flume accepts an event, that event will make it to the endpoint - as long as the agent that accepted the event remains live long enough. The first thing the agent does in this setting is write the event to disk in a 'write-ahead log' (WAL) so that, if the agent crashes and restarts, knowledge of the event is not lost. After the event has successfully made its way to the end of its flow, an acknowledgment is sent back to the originating agent so that it knows it no longer needs to store the event on disk. This reliability level can withstand any number of failures downstream of the initial agent.

The store on failure reliability level causes nodes to only require an acknowledgement from the node one hop downstream. If the sending node detects a failure, it will store data on its local disk until the downstream node is repaired, or an alternate downstream destination can be selected. While this is effective, data can be lost if a compound or silent failure occurs.

The best-effort reliability level sends data to the next hop with no attempts to confirm or retry delivery. If nodes fail, any data that they were in the process of transmitting or receiving can be lost. This is the weakest reliability level, but also the most lightweight.

Scalability

Scalability is the ability to increase system performance linearly - or better - by adding more resources to the system. Flume’s goal is horizontal scalability — the ability to incrementally add more machines to the system to increase throughput. A key performance measure in Flume is the number or size of events entering the system and being delivered. When load increases, it is simple to add more resources to the system in the form of more machines to handle the increased load.

As seen in the preceding example installation, there are three separate components of Flume that require different approaches to scalability: the collector tier, the master, and the storage tier.

The collector tier needs to be able to scale in order to handle large volumes of data coming from large numbers of agent nodes. This workload is write heavy, partitionable, and thus parallelizable. By adding more machines to the collector tier, you can increase the number of agents and the maximum available throughput of the system.

An individual collector can usually handle many agents (up to hundreds) because each individual agent often produces only small amounts of log data compared to the full bandwidth available to the collector. Therefore, Flume balances flows from agents across different collectors. (One flow from an agent will talk to the same collector.) Flume uses a randomized algorithm to evenly assign lists of collectors to flows. This automatically spreads the load, and also keeps the load spread in the case where a collector fails.

As the number of nodes in the system increases, the volume of traffic on the control path to and from the Flume Master may become a bottleneck. The Flume Master also supports horizontal scaling by adding more machines - although just a small number of commodity servers can serve a large installation of nodes. The state of the Flume Master is kept synchronized and fully replicated, which ensures that it is both fault tolerant and highly scalable.

Finally, Flume can only write data through a flow at the rate that the final destinations can accept. Although Flume is able to buffer data inside a flow to smooth out high-volume bursts, the output rate needs to be equal on average to the input rate to avoid log jams. Thus, writing to a scalable storage tier is advisable. For example, HDFS has been shown to scale to thousands of machines and can handle many petabytes of data.

Manageability

Manageability is the ability to control data flows, monitor nodes, modify settings, and control outputs of a large system. Manually managing the data flow from the sources to the end point is tedious, error prone, and a major pain point. With the potential to have thousands of log-generating applications and services, it’s important to have a centralized management point to monitor and change data flows, and the ability to dynamically handle different conditions or problems.

The Flume Master is the point where global state such as the data flows can be managed. Via the Flume Master, users can monitor flows and reconfigure them on the fly. The Flume Master has the information required to automatically respond to system changes such as load imbalances, partial failures, or newly provisioned hardware.

You can dynamically reconfigure nodes by using the Flume Master. Although this guide describes examples of traditional three-tier deployments, the flexibility of the nodes allow for arbitrary node topologies. You can reconfigure nodes by using small scripts written in a flexible dataflow specification language, which can be submitted via the Flume Master interface.

You can administer the Flume Master by using either of two interfaces: a web interface or the scriptable Flume command shell. The web interface provides interactive updates of the system’s state. The shell enables administration via manually crafted scripts or machine-generated scripts.

Extensibility

Extensibility is the ability to add new functionality to a system. For example, you can extend Flume by adding connectors to existing storage layers or data platforms. This is made possible by simple interfaces, separation of functional concerns into simple composable pieces, a flow specification language, and a simple but flexible data model.

Flume provides many common input and output connectors. When new input connectors (sources) are added, extra metadata fields specific to that source can be attached to each event it produces. Flume reuses the common components that provide particular reliability and resource usage properties. Some general sources include files from the file system, syslog and syslog-ng emulation, or the standard output of a process. More specific sources such as IRC channels and Twitter streams can also be added. Similarly, there are many output destinations for events. Although HDFS is the primary output destination, events can be sent to local files, or to monitoring and alerting applications such as Ganglia or communication channels such as IRC.

To enable easy integration with HDFS, MapReduce, and Hive, Flume provides simple mechanisms for output file management and output format management. Data gathered by Flume can be processed easily with Hadoop and Hive.

Section summary

The preceding Introduction section describes the high level goals and features of Flume. The following sections of this guide describe how to set up and use Flume:

  • Step-by-step tutorial that introduces a single Flume node

  • Introduction to the Flume Master and a pseudo-distributed mode that includes multiple nodes coordinated by the Flume Master

  • Description of a fully-distributed setup that also removes single points of failure

  • Flume use cases and a description of how to integrate Flume with existing sources of data

  • How to set up Flume’s output so that integration with heavyweight analysis systems such as Hadoop and Hive

  • How to deploy Flume, set up arbitrary flows, and a specification of Flume’s data flow specification language

  • Catalog of components available via the language

  • A description of experimental features

  • Troubleshooting information

Flume Single Node Quick Start

In this section, you will learn how to get a single Flume node running and transmitting data. You will also learn about some data sources, and how to configure Flume flows on a per-node basis.

Each logical node consists of a event-producing source and an event- consuming sink. Nodes pull data from their sources, and push data out through their sink.

Note
This section assumes that the Flume node and Flume Master are running in the foreground and not as daemons. You can stop the daemons by using /etc/ init.d/flume-master stop and /etc/init.d/flume-node stop.

Sources and the dump command

Start by getting a Flume node running that echoes data written to standard input from the console back out to the console on stdout. You do this by using the dump command.

$ flume dump console
Tip
The Flume program has the general form flume <command> [args ...]. If you installed from the tarball package, the command can be found in $FLUME_HOME/bin/. If you installed from either RPM or DEB, then flume should already be in your path.
Tip
The example above uses the dump command and console is the argument. The command’s syntax is flume dump <source>. It prints data from <source> to the console.
Note
Some flume configurations by default write to local disk. Initially the default is /tmp/flume. This is good for initial testing but for production environments the flume.agent.logdir property should be set to a more durable location.
Note
If the node refuses to run and exits with this message, agent.FlumeNode: Aborting: Unexpected problem with environment.Failure to write in log directory: /tmp/flume. Check permissions?, then check the / tmp/flume directory to make sure you have write permissions to it (change the owner or have the user join the group). This is, by default, where various logging information is kept.

You have started a Flume node where console is the source of incoming data. When you run it, you should see some logging messages displayed to the console. For now, you can ignore messages about Masters, back-off and failed connections (these are explained in later sections). When you type at the console and press a new line, you should see a new log entry line appear showing the data that you typed. If you entered This is a test, it should look similar to this:

hostname [INFO Thu Nov 19 08:37:13 PST 2009] This is a test

To exit the program, press ^C.

Note
Some sources do not automatically exit and require a manual ^C to exit.

Reading from a text file, text

You can also specify other sources of events. For example, if you want a text file where each line represents a new event, run the following command.

$ flume dump 'text("/etc/services")'

This command reads the file, and then outputs each line as a new event.

Note
The default console output escapes special characters with Java-style escape sequences. Characters such as " and \ are prefaced with an extra \.
Note
You can try this command with other files such as /var/log/messages, /var/log/syslog, or /var/log/hadoop/hadoop.log also. However, Flume must run with appropriate permissions to read the files.

Tailing a file name, tail and multitail

If you want to tail a file instead of just reading it, specify another source by using tail instead of text.

$ flume dump 'tail("testfile")'

This command pipes data from the file into Flume and then out to the console.

This message appears: "File testfile does not currently exist, waiting for file to appear".

In another terminal, you can create and write data to the file:

$ echo Hello world! >> testfile

New data should appear.

When you delete the file:

$ rm testfile

The tail sink detects this. If you then recreate the file, the tail source detects the new file and follows it:

$ echo Hello world again! >> testfile

You should see your new message appear in the Flume node console.

You can also use the multitail source to follow multiple files by file name:

$ flume dump 'multitail("test1", "test2")'

And send it data coming from the two different files:

$ echo Hello world test1! >> test1
$ echo Hello world test2! >> test2

Synthetic sources, synth

Here’s one more example where you use the synth sources to generate events:

$ flume dump 'asciisynth(20,30)'

You should get 20 events, each with 30 random ASCII bytes.

Syslog as a source, syslogUdp and syslogTcp

As with files, you can also accept data from well known wire formats such as syslog. For example, you can start a traditional syslog-like UDP server listening on port 5140 (the normal syslog UDP port is the privileged port 514) by running this command:

$ flume dump 'syslogUdp(5140)'

You can feed the source data by using netcat to send syslog formatted data as shown in the example below:

$ echo "<37>hello via syslog"  | nc -u localhost 5140
Tip
You may need to press ^C to exit this command.
Note
The extra <37> is a syslog wireformat encoding of a message category and priority level.

Similarly, you can set up a syslog-ng compatible source that listens on TCP port 5140 (the normal syslog-ng TCP port is the privileged port 514):

$ flume dump 'syslogTcp(5140)'

And send it data:

$ echo "<37>hello via syslog" | nc -t localhost 5140
Tip
You may need to press ^C to exit this command.

Syslog backwards-compatibility allows data normally created from syslog, rsyslog, or syslog-ng to be sent to and processed by Flume.

Anatomy of an Event

This section describes a number of sources of data that Flume can interoperate with. Before going any further, it will be helpful for you to understand what Flume is actually sending and processing internally.

Flume internally converts every external source of data into a stream of events. Events are Flume’s unit of data and are a simple and flexible representation. An event is composed of a body and metadata. The event body is a string of bytes representing the content of an event. For example, a line in a log file is represented as an event whose body was the actual byte representation of that line. The event metadata is a table of key / value pairs that capture some detail about the event, such as the time it was created or the name of the machine on which it originated. This table can be appended as an event travels along a Flume flow, and the table can be read to control the operation of individual components of that flow. For example, the machine name attached to an event can be used to control the output path where the event is written at the end of the flow.

An event’s body can be up to 32KB long - although this limit can be controlled via a system property, it is recommended that it is not changed in order to preserve performance.

Section Summary

In this section, you learned how to use Flume’s dump command to print data from a variety of different input sources to the console. You also learned about the event, the fundamental unit of data transfer in Flume.

The following table summarizes the sources described in this section.

Flume Event
Sources console

Stdin console

text("filename")

One shot text file source. One line is one event

tail("filename")

Similar to Unix’s tail -F. One line is one event. Stays open for more data and follows filename if file rotated.

multitail("file1"[, "file2"[, …]])

Similar to tail source but follows multiple files.

asciisynth(msg_count,msg_size)

A source that synthetically generates msg_count random messages of size msg_size. This converts all characters into printable ASCII characters.

syslogUdp(port)

Syslog over UDP port, port. This is syslog compatible.

syslogTcp(port)

Syslog over TCP port, port. This is syslog-ng compatible.

Pseudo-distributed Mode

Flume is intended to be run as a distributed system with processes spread out across many machines. It can also be run as several processes on a single machine, which is called “pseudo-distributed” mode. This mode is useful for debugging Flume data flows and getting a better idea of how Flume components interact.

The previous section described a Flume node and introduced the concept of Flume sources. This section introduces some new concepts required for a distributed setup: the Flume master server, the specification of sources and sinks, and connecting multiple Flume nodes.

Starting Pseudo-distributed Flume Daemons

There are two kinds of processes in the system: the Flume master and the Flume node. The Flume Master is the central management point and controls the data flows of the nodes. It is the single logical entity that holds global state data and controls the Flume node data flows and monitors Flume nodes. Flume nodes serve as the data path for streams of events. They can be the sources, conduits, and consumers of event data. The nodes periodically contact the Master to transmit a heartbeat and to get their data flow configuration.

In order to get a distributed Flume system working, you must start a single Flume Master and some Flume nodes that interact with the Master. You’ll start with a Master and one Flume node and then expand.

The Master

The Master can be manually started by executing the following command:

$ flume master

After the Master is started, you can access it by pointing a web browser to http://localhost:35871/. This web page displays the status of all Flume nodes that have contacted the Master, and shows each node’s currently assigned configuration. When you start this up without Flume nodes running, the status and configuration tables will be empty.

master-empty.png

The web page contains four tables — the Node status table, the Node configuration table, the Physical/Logical Node mapping table, and a Command history table. The information in these tables represent the current global state of the Flume system.

The Master’s Node status table contains the names of all of the Flume Nodes talking to the Master, their current configuration version (initially "none"), their status (such as IDLE), and when it last reported to the Master. The name of each Flume node should be the same as running hostname from Unix prompt.

The Master’s Node configuration table contains the logical name of a node, the configuration version assigned to it, and a specification of its sources and its sinks. Initially, this table is empty, but after you change values you can view this web page to see the updates. There are two sets of columns - - the user entered version/source/sink, and translated version/source/sink. A later section of this guide describes translated configs.

The Master’s Physical/Logical Node mapping table contains the mapping of logical nodes to their physical nodes.

The Master’s Command history table contains the state of commands. In general, commands modify the Master’s global state. Commands are processed serially on a Master and are assigned a unique command ID number. Each command has a state (for example, SUCCEEDED, FAILED, or PENDING), a command line, and a message which often contains information about its execution attempt.

The Flume Node

To start a Flume node, invoke the following command in another terminal.

$ flume node_nowatch
Note
Normally, you start a node using flume node or run the node as a daemon. For these examples, you disable the watchdog feature by starting with the node_nowatch option. This option enables you to interact with a node via the console. The other options disable stdin.

To check whether a Flume node is up, point your browser to the Flume Node status page at http://localhost:35862/. Each node displays its own data on a single table that includes diagnostics and metrics data about the node, its data flows, and the system metrics about the machine it is running on.

If the node is up, you should also refresh the Master’s status page (http:// localhost:35871) to make sure that the node has contacted the Master. You brought up one node (assume the node is named host), so you should have one node listed in the Master’s node status table, and an entry in the logical node mapping table that links the host logical node to the host physical nodes.

Configuring a Node via the Master

Requiring nodes to contact the Master to get their configuration enables you to dynamically change the configuration of nodes without having to log in to the remote machine to restart the daemon. You can quickly change the node’s previous data flow configuration to a new one.

The following describes how to "wire" nodes using the Master’s web interface.

On the Master’s web page, click on the config link. You are presented with two forms. These are web interfaces for setting the node’s data flows. When Flume nodes contact the Master, they will notice that the data flow version has changed, instantiate, and activate the configuration.

For this example, you will do the steps you did in the Quick Start section. Enter the following values into the "Configure a node" form, and then click Submit.

Node name: host
Source: console
Sink: console

Refresh the Master page and notice that the version stamp changed to a current time, and that the src and sink fields of the configs updated. After the status changes to "ACTIVE", it is ready to receive console traffic.

On the terminal where your Flume node is running, you should be able to type a few lines and then get output back showing your new log message.

Node name: host
Source: text("/etc/services")
Sink: console
Note
You may need to press Enter in the Flume node console.

Or use these new values to tail a file:

Node name: host
Source: tail("/etc/services")
Sink: console

You can now change the configuration of different nodes in the system to gather data from a variety of sources by going through the Master.

Introducing Sinks

Thus far, you have seen that Flume has a variety of sources that generate or accept new events that are fed into the system. You have limited the output of these messages to the console sink. As you would expect, Flume also provides a wide variety of event sinks — destinations for all of the events.

There are many possible destinations for events — to local disk, to HDFS, to the console, or forwarding across the network. You use the sink abstractions as an interface for forwarding events to any of these destinations.

You can connect different sources to different sinks by specifying the new configuration and submitting it to the Master. For example, with the data flow below, you can make a copy of /etc/services.

Node name: host
Source: text("/etc/services")
Sink: text("services.copy")
Warning
The text sinks overwrite if a file previously exists.

Notice that the file is copied as is. Sinks have optional arguments for output format, which you can use to write data in other serialization formats. For example, instead of copying a file using the default "raw" formatter, you can format the output using other formatters such as "avrojson" (the Avro serialization json format), "avrodata" (the Avro serialization binary data format), or a "debug" mode (this is the formatter used by the console sink).

If you enter:

Node name: host
Source: text("/etc/services")
Sink: console("avrojson")

You get the file with each record in JSON format displayed to the console.

If you enter:

Node name: host
Source: text("/etc/services")
Sink: text("services.json", "avrojson")

The newly written local services.json file is output in avro’s json format.

There are several sinks you can use. The following list is a subset; see the Appendix for more sinks.

Flume Event Sinks
null

Null sink. Events are dropped.

console[("format")]

Console sink. Display to console’s stdout. The "format" argument is optional and defaults to the "debug" output format.

text("txtfile"[,"format"])

Textfile sink. Write the events to text file txtfile using output format "format". The default format is "raw" event bodies with no metadata.

dfs("dfsfile")

DFS seqfile sink. Write serialized Flume events to a dfs path such as hdfs://namenode/file or file:///file in Hadoop’s seqfile format. Note that because of the hdfs write semantics, no data for this sink write until the sink is closed.

syslogTcp("host",port)

Syslog TCP sink. Forward to events to host on TCP port port in syslog wire format (syslog-ng compatible), or to other Flume nodes setup to listen for syslogTcp.

Warning
Using dfs has some restrictions and requires some extra setup. Files contents will not become available until after the sink has been closed. See the Troubleshooting section for details.

Aggregated Configurations

Using the form for single node configuration of a small number of machines is manageable, but for larger numbers it is more efficient to maintain or auto-generate the configuration for all of the machines in a single file. Flume allows you to set the configurations of many machines in a single aggregated configuration submission.

Instead of the method you used in the "Configuring a Node via the Master" section, put the following configuration line into the "Configure Nodes" form and then submit:

host : text("/etc/services") | console ;

Or:

host: text("/etc/services") | text("services.copy");

The general format is:

<node1> : <source> | <sink> ;
<node2> : <source> | <sink> ;
<node3> : <source> | <sink> ;
...

The remainder of this guide uses this format to configure nodes.

Tiering Flume Nodes: Agents and Collectors

A simple network connection is abstractly just another sink. It would be great if sending events over the network was easy, efficient, and reliable. In reality, collecting data from a distributed set of machines and relying on networking connectivity greatly increases the likelihood and kinds of failures that can occur. The bottom line is that providing reliability guarantees introduces complexity and many tradeoffs.

Flume simplifies these problems by providing a predefined topology and tunable reliability that only requires you to give each Flume node a role. One simple Flume node topology classifies Flume nodes into two roles — a Flume agent tier and the Flume collector tier. The agent tier Flume nodes are co-located on machines with the service that is producing logs. For example, you could specify a Flume agent configured to have syslogTcp as a source, and configure the syslog generating server to send its logs to the specified local port. This Flume agent would have an agentSink as its sink which is configured to forward data to a node in the collector tier.

Nodes in the collector tier listen for data from multiple agents, aggregates logs, and then eventually write the data to HDFS.

Warning
In the next few sections, all host arguments currently use the physical IP or DNS name of the node as opposed to the Flume node name (set by -n name option). The default Flume node name is the host name unless you override it on the command line.

To demonstrate the new sinks in pseudo-distributed mode, you will instantiate another Flume node (a physical node) on the local box. To do this, you need to start a Flume node with some extra options. The command line below starts a physical node named collector (-n collector):

$ flume node_nowatch -n collector

On the Master’s web page, you should eventually see two nodes: host and collector. The Flume Node status web pages should be available at http:// localhost:35862 and http://localhost:35863. Port bindings are dependent on instantiation order — the first physical node instantiated binds on 35862 and the second binds to 35863.

Next, configure collector to take on the role of a collector, set up host to send data from the console to the collector by using the aggregated multiple configuration form. The agent uses the agentSink, a high reliability network sink. The collector node’s source is configured to be a collectorSource, and its sink is configured to be the console.

host : console | agentSink("localhost",35853) ;
collector : collectorSource(35853) | console ;
tiers-console.png

When you type lines in host's console, events are forwarded to the collector. Currently, there is a bit of latency (15s or so) before the forwarded message shows up on collector. This is actually a configurable setting whose default is set to a value that is amenable for high event throughputs. Later sections describe how to tune Flume.

You have successfully made a event flow from an agent to the collector.

Tip
You can check to see if agentSink is working by looking in the write- ahead logging directory for the sink. The default location is /tmp/flume/ <nodename>. Because the OS periodically cleans this directory, this configuration property (flume.agent.logdir) should be changed a production environment.

A more interesting setup is to have the agent tailing a local file (using the tail source) or listening for local syslog data (using the syslogTcp or syslogUdp sources and modifying the syslog daemon’s configuration). Instead of writing to a console, the collector would write to a collectorSink, a smarter sink that writes to disk or HDFS, periodically rotates files, and manages acknowledgements.

tiers-hdfs.png

The following configuration is for an agent that listens for syslog messages and forwards to a collector which writes files to local directory /tmp/flume/ collected/.

host : syslogTcp(5140) | agentSink("localhost",35853) ;
collector : collectorSource(35853) | collectorSink("file:///tmp/flume/collected", "syslog");

In the following slightly modified configuration, the collector writes to an HDFS cluster (assuming the HDFS nameNode is called namenode):

host : syslogTcp(5140) | agentSink("localhost",35853) ;
collector : collectorSource(35853) | collectorSink("hdfs://namenode/user/flume/
","syslog");

Section Summary

This section describes how to start a Master and a node, and configure a node via the Master. The next section describes a more concise way of specifying many configurations, explains agents and collectors, and how to build an agent-collector pipeline on a single machine in a three-tier topology.

Fully-distributed Mode

The main goal for Flume is to collect logs and data from many different hosts and to scale and intelligently handle different cluster and network topologies.

To deploy Flume on your cluster, do the following steps.

Steps to Deploy Flume On a Cluster
  • Install Flume on each machine.

  • Select one or more nodes to be the Master.

  • Modify a static configuration file to use site specific properties.

  • Start the Flume Master node on at least one machine.

  • Start a Flume node on each machine.

The following section describes how to manually configure the properties file to specify the Master for each node, and how to set default values for parameters. Sections afterwards describe a data flow configuration for a larger system, how to add more capacity by adding collectors, and how to improve the reliability of the Master by adding multiple Masters.

Static Configuration Files

In the previous sections, you used Flume on a single machine with the default configuration settings. With the default settings, nodes automatically search for a Master on localhost on a standard port. In order for the Flume nodes to find the Master in a fully distributed setup, you must specify site-specific static configuration settings.

Site-specific settings for Flume nodes and Masters are configured by properties in the conf/flume-site.xml file found on each machine. If this file is not present, the commands default to the settings found in conf/flume-conf.xml. In the following example, you set up the property that points a Flume node to search for its Master at a machine called master.

conf/flume-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl"  href="configuration.xsl"?>

<configuration>
<property>
<name>flume.master.servers</name>
<value>master</value>
</property>
</configuration>

Using Default Values

When you are using agent/collector roles, you can add the following configuration properties to your flume-site.xml file to set up the default hosts used as collector.

...
<property>
<name>flume.collector.event.host</name>
<value>collector</value>
<description>This is the host name of the default "remote"     collector.
</description>
</property>
  <property>
<name>flume.collector.port</name>
<value>35853</value>
<description>This default tcp port that the collector listens to     in order to receive events it is collecting.
</description>
</property>

...

This will make the agentSink with no arguments default to using flume.collector.event.host and flume.collector.port for their default target and port.

In the following example, a larger setup with several agents push data to a collector. There are seven Flume nodes — six in the agent tier, and one in the collector tier.

singleCollector.png

An explicit configuration fills in all of the parameters:

agentA : src | agentSink("collector",35853);
agentB : src | agentSink("collector",35853);
agentC : src | agentSink("collector",35853);
agentD : src | agentSink("collector",35853);
agentE : src | agentSink("collector",35853);
agentF : src | agentSink("collector",35853);
collector : collectorSource(35853) | collectorSink("hdfs://namenode/flume/","srcdata");
Note
When specifying destinations for agentSinks, use the hostname and port of the target machine. The default name for a node is its hostname. However, if there are multiple logical nodes, you must use the machine’s host name, not the name of the logical node. In the preceding examples, agent[A-F] and collector are the physical host names of the machines where these configurations are running.

You can rely on the default ports set in the configuration files:

agentA : src | agentSink("collector");
agentB : src | agentSink("collector");
agentC : src | agentSink("collector");
agentD : src | agentSink("collector");
agentE : src | agentSink("collector");
agentF : src | agentSink("collector");
collector : collectorSource | collectorSink("hdfs://namenode/flume/","srcdata");

You can rely on the default ports and default collector host:

agentA : src | agentSink
agentB : src | agentSink
agentC : src | agentSink
agentD : src | agentSink
agentE : src | agentSink
agentF : src | agentSink
collector : collectorSource | collectorSink("hdfs://namenode/flume/","srcdata");
Warning
Using defaults can make writing data flow configurations more concise, but may obscure the details about how different nodes are connected to each other.

Multiple Collectors

Having multiple collectors can increase the log collection throughput and can improve the timeliness of event delivery by increasing collector availability. Data collection is parallelizable; thus, load from many agents can be shared across many several collectors.

Partitioning Agents across Multiple Collectors

The preceding graph and dataflow spec shows a typical topology for Flume nodes. For reliable delivery, in the event that the collector stops operating or disconnects from the agents, the agents would need to store their events to their respective disks locally. The agents would then periodically attempt to recontact a collector. Because the collector is down, any analysis or processing downstream is blocked.

multiCollector.png

When you have multiple collectors as in the preceding graph and dataflow spec, downstream progress is still made even in the face of a collector’s failure. If collector B goes down, agent A, agent B, agent E, and agent F continue to deliver events via collector A and collector C respectively. Agent C and agent D may have to queue their logs until collector B (or its replacement) comes back online.

The following configuration partitions the work from the set of agents across many collectors. In this example, each of the collectors specify the same DFS output directory and file prefixes, aggregating all of the logs into the same directory.

agentA : src | agentE2ESink("collectorA",35853);
agentB : src | agentE2ESink("collectorA",35853);
agentC : src | agentE2ESink("collectorB",35853);
agentD : src | agentE2ESink("collectorB",35853);
agentE : src | agentE2ESink("collectorC",35853);
agentF : src | agentE2ESink("collectorC",35853);
collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");

Manually Specifying Failover Chains

failoverCollector.png

When you have multiple collectors writing to the same storage location, instead of having agent C and agent D queue indefinitely, you can instead have them fail over to other collectors. In this scenario, you could have agent C and agent D fail over to collector A and collector C respectively, while periodically checking to see if collector B has returned.

To specify these setups, use agents with failover chains. Similarly to single collector agents, there are three levels of reliability for the failover chain agents: agentE2EChain, agentDFOChain, and agentBEChain.

In the following example, you manually specify the failover chain using agentE2EChain, an agent with end-to-end reliability with multiple failover collectors. agentA in this situation will initially attempt to send to collectorA on port 35853. The second argument in agentA 's sink specifies the collector to fall back onto if the first fails. You can specify an arbitrary number of collectors, but you must specify at least one.

agentA : src | agentE2EChain("collectorA:35853","collectorB:35853");
agentB : src | agentE2EChain("collectorA:35853","collectorC:35853");
agentC : src | agentE2EChain("collectorB:35853","collectorA:35853");
agentD : src | agentE2EChain("collectorB:35853","collectorC:35853");
agentE : src | agentE2EChain("collectorC:35853","collectorA:35853");
agentF : src | agentE2EChain("collectorC:35853","collectorB:35853");
collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");
Note
In this section, agent[A-F] and collector[A-B] are physical host names.

As in the single collector case, if no port number is specified, the agent defaults to using the flume.collector.port.

agentA : src | agentE2EChain("collectorA","collectorB");
agentB : src | agentE2EChain("collectorA","collectorC");
agentC : src | agentE2EChain("collectorB","collectorA");
agentD : src | agentE2EChain("collectorB","collectorC");
agentE : src | agentE2EChain("collectorC","collectorA");
agentF : src | agentE2EChain("collectorC","collectorB"); collectorA : collectorSource | collectorSink("hdfs://...","src");
collectorB : collectorSource | collectorSink("hdfs://...","src");
collectorC : collectorSource | collectorSink("hdfs://...","src");

Automatic Failover Chains

Flume also provides a mechanism that automatically assigns failover chains based on how nodes are configured. As collector nodes are assigned in the Flume Master, the Master attempts to distribute the agents evenly amongst the collectors. In the face of failure, each agent is assigned a different failover chain. This mitigates the chances of another collector becoming overloaded in the event of failure of a collector.

To specify a node to use the failover chains, use either the autoE2EChain, autoDFOChain, or autoBEChain agent sink. Because the Master calculates the failover chains, these sinks take no explicit arguments.

agentA : src | autoE2EChain ;
agentB : src | autoE2EChain ;
agentC : src | autoE2EChain ;
agentD : src | autoE2EChain ;
agentE : src | autoE2EChain ;
agentF : src | autoE2EChain ;
collectorA : collectorSource | collectorSink("hdfs://...", "src");
collectorB : collectorSource | collectorSink("hdfs://...", "src");
collectorC : collectorSource | collectorSink("hdfs://...", "src");

The Master updates the configuration of the agents based on the current collectors in the system. When new collectors are added to the system, the Master updates the failover chains of agents to rebalance.

Note
If the Master has no nodes with collectorSource as its source, the agent’s automatic chains will report an fail("…") chain which and wait for collectorSource s to be specified. If the nodes are not mapped, they will report a different fail sink notifying you that the node is unmapped (isn’t associated with a host/port).
Tip
You can see the translation of the auto*Chain configuration in the node configuration table under the translated configuration column. This is a little declarative specification of the failure recovery behavior of the sink. More details on this are in the Advanced section of this guide, and in future revisions the translations for the agents and other chains will also be presented.

Logical Configurations

Manually configuring nodes in Flume is manageable for a small number of nodes, but can become burdensome for an operator as demands inevitably grow. Ideally, the operator only has to assign a role to a particular machine. Because configuration management is centralized via the Master, the Master potentially has all the information necessary to intelligently create a node topology and isolate flows of data from each other.

To explain how this can be done, the concept of a logical node is introduced. To manage communications between logical nodes, the concepts of logical sources and logical sinks are introduced. To isolate different groups of nodes, the concept of a flow is introduced that allows you to group agents and collectors into separate and isolated groups.

Logical Nodes

The logical node abstraction allows for each JVM instance (a physical node) to contain multiple logical nodes. This allows for the processing of many source sink combinations on many threads of execution to occur on a single JVM instance.

Each logical node has a name that may be completely different from its physical name or hostname. You now need new operations that enable you to spawn a new node, map logical nodes to physical nodes, and decommission existing logical nodes.

Note
The following commands are entered via the web interface using the "raw command" web page on the Master. You might prefer using the Flume command shell (described in a later section) for these operations. The same commands described in this section can be entered in web interface or entered at the command shell by prefixing the command with exec.

Suppose that initially you know you want an agent-collector topology, but you don’t know the particular names of the exact machines. For now, you can specify the configuration of the logical nodes without specifying any physical machine names.

agent1 : _source_ | autoBEChain ;
collector1 : collectorSource | collectorSink("hdfs://....") ;

Later you learn that host1 is the name of the agent1 machine and host2 is the name of the collector machine. You can spawn logical nodes onto the physical Flume instances on host1 and host2 by issuing the following spawn commands:

spawn host1 agent1
spawn host2 collector1

Afterwards, the node status table should display a new row of information for each logical node. Each logical node reports its own execution state, configuration, and heartbeat. There is also a new entry in the logical node mapping table showing that the logical node has been placed on the specified physical node. To configure the node’s sources and sinks, use exactly the same mechanisms described in the previous sections.

You can also remove a logical node by using the decommission command. Suppose you no longer needed agent1 and wanted to "turn it off". You can do so by entering the following command:

decomission agent1

This terminates the thread and removes the configuration associated with a logical node, and the mapping between the logical node and physical node.

You can also move a logical node from one physical node to another by first unmapping a logical node and then spawning it on another physical node. In this scenario, you change the collector1 from being on host2 to host3.

unmap host2 collector1

At this point, the logical node mapping is removed, and colletor1 is not active anywhere. You can then map collector1 onto host3 by using the spawn command:

spawn host3 colletor1
Note
There are some limitations that need to be further described in this section.

Logical Sources and Logical Sinks

In the previous example, we used two abstractions under-the-covers that allow the specifications of a graph topology for communications without having to use physical hostnames and ports. These abstractions — the logical source and logical sink — allow you to create a different graph topology without having to know physical machines until they are mapped.

Suppose you have two nodes producing data and sending it to the consumer:

dataProducer1 : console | logicalSink("dataConsumer") ;
dataProducer2 : console | logicalSink("dataConsumer") ;
dataConsumer : logicalSource | console ;

Note that in this example, the destination argument is the logical name of the node and not a specific host/port combination.

To implement these features, there is a generalized mechanism where users enter logical configurations that are translated by the Master to a physical configuration.

When the logical nodes get mapped to physical nodes:

spawn host1 dataProducer1
spawn host2 dataProducer2
spawn host3 dataConsumer

and after the Master learns the host names (the host1, host2, and host3 machine’s heartbeat against the Master), the Master has enough information to translate configurations with physical hostnames and ports. A possible translation would replace the logicalSource with a rpcSources and the logicalSink with an rpcSinks:

dataProducer1 : console | rpcSink("host3",56789) ;
dataProducer2 : console | rpcSink("host3",56789) ;
dataConsumer : rpcSource(56789) | console ;

In fact, auto agents and collectors, are another example of translated sources and sinks. These translate auto*Chain sinks and collectorSource into a configuration that uses logicalSinks and logicalSources which in turn are translated into phsyical rpcSource and rpcSinks instances.

Tip
Translations are powerful and can be fairly smart; if new collectors are added, they will become new failover options. If collectors are removed, then the removed collectors will be automatically replaced by other failover nodes.

Flow Isolation

What happens if you want collect different kinds of data from the same physical node? For example, suppose you wanted to collect httpd logs as well as syslog logs from the same physical machine. Suppose also you want to write all of the syslog data from the cluster in one directory tree, and all of the httpd logs from the cluster in another.

One approach is to tag all the data with source information and then push all the data down the same pipe. This could then be followed by some post- processing that demultiplexes (demuxes) the data into different buckets. Another approach is to keep the two sets of data logically isolated from each other the entire time and avoid post processing.

Flume can do both approaches but enables the latter lower-latency approach, by introducing the concept of grouping nodes into flows. Since there are logical nodes that allow for multiple nodes on a single JVM, you can have a node for each different kinds of produced data.

The following example shows how flows can be used. Start by having six logical nodes in the system.

fooNode1 : fooSrc | autoBEChain ;
barNode1 : barSrc | autoBEChain ;
forNode2 : fooSrc | autoBEChain ;
barNode2 : barSrc | autoBEChain ;
fooConsumer : collectorSource | collectorSink("hdfs://nn/foodir") ;
barConsumer : collectorSource | collectorSink("hdfs://nn/bardir") ;

In the scenario, there are two physical machines that produce both kinds of data — foo data and bar data. You want to send data to single collector that collects both foo data and bar data and writes it to different HDFS directories. You could then spawn the nodes onto physical nodes:

spawn host1 fooNode1
spawn host1 barNode1
spawn host2 fooNode2
spawn host2 barNode2
spawn host3 fooConsumer
spawn host3 barConsumer
single-flow.png
Figure 1: Flume Flows: Single Flow

This setup essentially instantiates the first approach. It mixes foo and bar data together since the translation of autoBEChain would see two collectorSources that the Master considers to be equivalent. Foo data will likely be sent to the barConsumer and bar data will likely be sent to fooConsumer.

You really wanted to separate sources of information into logically isolated streams of data. Flume provides a grouping abstraction called a flow. A flow groups particular logical nodes together so that the different logical data types remain isolated.

More concretely, it allows for a different failover chain for each kind of data in the Flume cluster. The auto*Chain based agents would only send data to collectors in the same flow group. This isolates data so that it only flows to nodes within the group.

You specify flow groups by adding an extra parameter to the spawn command:

spawn host1 fooNode1 flowfoo
spawn host1 barNode1 flowbar
spawn host2 fooNode2 flowfoo
spawn host2 barNode2 flowbar
spawn host3 fooConsumer flowfoo
spawn host3 barConsumer flowbar
multi-flow.png
Figure 2: Flume Flows: Multiple Flows

By using this command, the data from fooNode1 and fooNode2 will only be sent to fooConsumer, and barNode1 and barNode2’s data will only be sent to barConsumer. Data from one node is not mixed with other data from other nodes unless explicitly connected.

Tip
In practice it is a good idea to use different node names and different flow ids for different kinds of data. When node names are reused, the default behavior is to attempt to recover from failures assuming that leftover data from a crashed execution or previous source/sink configuration version are still producing the same kind of data.

Section Summary

This section introduced logical nodes, logical sources, logical sinks, and flows and showed how these abstractions enable you to automatically deal with manageability problems.

  • Only one input source per physical node.

  • Multiple sets of isolated flows.

  • Being machine specific, having to know all physical host names and ports.

The translation mechanism can be quite powerful. When coupled with metrics information, this could be used to perform automated dynamic configuration changes. A possible example would be to automatically commission or decommission new collectors to match diurnal traffic and load patterns.

Multiple Masters

The Master has two main jobs to perform. The first is to keep track of all the nodes in a Flume deployment and to keep them informed of any changes to their configuration. The second is to track acknowledgements from the end of a Flume flow that is operating in reliable mode so that the source at the top of that flow knows when to stop transmitting an event.

Both these jobs are critical to the operation of a Flume deployment. Therefore, it is ill-advised to have the Master live on a single machine, as this represents a single point of failure for the whole Flume service (see failure modes for more detail).

Flume therefore supports the notion of multiple Masters which run on physically separate nodes and co-ordinate amongst themselves to stay synchronized. If a single Master should fail, the other Masters can take over its duties and keep all live flows functioning. This all happens transparently with a little effort at configuration time. Nodes will automatically fail over to a working Master when they lose contact with their current Master.

Standalone Mode Compared to Distributed Mode

The Flume Master can be run in one of two ways.

  • Standalone mode - this is where the Master runs on a single machine. This is easy to administer, and simple to set-up, but has disadvantages when it comes to scalability and fault-tolerance.

  • Distributed mode - this is where the Master is configured to run on several machines - usually three or five. This option scales to serve many Flows, and also has good fault-tolerance properties.

Large production deployments of Flume should run a distributed Master so that inevitable machine failures do not impact the availability of Flume itself. For small deployments the issue is less clear-cut - a distributed Master means reserving more computing resources that could be used instead for nodes or other services, and it is possible to recover from many failure modes in a timely manner with human intervention. The choice between distributed and standalone Masters is ultimately dependent both on your use case and your operating requirements.

Running in Standalone Mode

Whether the Flume Master starts in distributed or standalone mode is indirectly controlled by how many machines are configured to run as Master servers. To run in standalone mode, a single configuration property flume.master.servers must be set:

<property>
<name>flume.master.servers</name>
<value>hostA</value>
</property>

The value of flume.master.servers is a comma-separated list of all the machine names (or IP addresses) that will be Master servers. If this list contains only machine name, the Flume Master will start in standalone mode. If there is more than one machine name in the list, the Flume Master will start in distributed mode.

There’s no other configuration required for standalone mode. Flume will use reasonable default values for any other master-related variables. To start the Master, from the command prompt type:

$ flume master

from $FLUME_HOME. A number of log messages should print to the screen. After the server is running, you can check that everything is working properly by visiting the web interface at http://master-node-ip:35871/, where master-node-ip is the IP address (or hostname) of the Master node. If you see a web page, the Master is running.

Running in Distributed Mode

How many machines do I need?

The distributed Flume Master will continue to work correctly as long as more than half the physical machines running it are still working and haven’t crashed. Therefore if you want to survive one fault, you need three machines (because 3-1 = 2 > 3/2). For every extra fault you want to tolerate, add another two machines, so for two faults you need five machines. Note that having an even number of machines doesn’t make the Flume Master any more fault-tolerant - four machines only tolerate one failure, because if two were to fail only two would be left functioning, which is not more than half of four. Common deployments should be well served by three or five machines.

Note

flume.master.serverid is the only Flume Master property that must be different on every machine in the ensemble. *

masterA
<property>
<name>flume.master.serverid</name>
<value>0</value>
</property>
masterB
<property>
<name>flume.master.serverid</name>
<value>1</value>
</property>
masterC
<property>
<name>flume.master.serverid</name>
<value>2</value>
</property>

The value for flume.master.serverid for each node is the index of that node’s hostname in the list in flume.master.ensemble, starting at 0. For example masterB has index 1 in that list. The purpose of this property is to allow each node to uniquely identify itself to the other nodes in the Flume Master.

This is all the configuration required to start a three-node distributed Flume Master. To test this out, we can start the Master process on all three machines:

[flume@masterA] flume master

[flume@masterB] flume master

[flume@masterC] flume master

Each Master process will initially try and contact all other nodes in the ensemble. Until more than half (in this case, two) nodes are alive and contactable, the configuration store will be unable to start, and the Flume Master will not be able to read or write configuration data.

You can check the current state of the ensemble by inspecting the web page for any of the Flume Master machines which by default will be found at, for example, http://masterA:35871.

Configuration Stores

The Flume Master stores all its data in a configuration store. Flume has a pluggable configuration store architecture, and supports two implementations.

  • The Memory-Backed Config Store (MBCS) stores configurations temporarily in memory. If the master node fails and reboots, all the configuration data will be lost. The MBCS is incompatible with distributed masters. However, it is very easy to administer, computationally lightweight, and good for testing and experimentation.

  • The ZooKeeper-Backed Config Store (ZBCS) stores configurations persistently and takes care of synchronizing them between multiple masters.

Flume and Apache ZooKeeper

Flume relies on the Apache ZooKeeper coordination platform to provide reliable, consistent, and persistent storage for node configuration data. A ZooKeeper ensemble is made up of two or more nodes which communicate regularly with each other to make sure each is up to date. Flume embeds a ZooKeeper server inside the Master process, so starting and maintaining the service is taken care of. However, if you have an existing ZooKeeper service running, Flume supports using that external cluster as well.

Which Configuration Store Should I Use?

In almost all cases, you should use the ZBCS. It is more reliable and fault-tolerant, and will recover configurations after a restart. It is compatible with both standalone and distributed deployments of the Flume Master.

The MBCS is appropriate if you are experimenting with Flume and can stand to lose configuration if the machine fails.

ZBCS is the default configuration store. The choice of which configuration store to use is controlled by the flume.master.store system property.

<property>
<name>flume.master.store</name>
<value>zookeeper</value>
</property>

If set to memory, the Flume Master will use MBCS instead. This is only supported in standalone mode.

Configuring the ZBCS

Most deployments using the ZBCS can use Flume’s default configuration. However, where more control over the precise configuration of the Flume Master is needed, there are several properties that you can set.

Log Directory - flume.master.zk.logdir

To ensure reliability and the ability to restore its state in the event of a failure, ZBCS continually logs all updates it sees to the directory in flume.master.zk.logdir. This directory must be writable by the user as which Flume is running, and will be created if it doesn’t exist at start-up time. WARNING: Do not delete this directory, or any files inside it. If deleted, all your configuration information will be lost.

ZBCS Server Ports

Each machine in the distributed Flume Master communicates with every other on the TCP ports set by flume.master.zk.server.quorum.port and flume.master.zk.server.election.port. The defaults are 3182 and 3183 respectively. Note that these settings control both the port on which the ZBCS listens, and on which it looks for other machines in the ensemble.

ZBCS Client Port - flume.master.zk.client.port

The Flume Master process communicates with ZooKeeper (either on the same machine, or remotely on another Master server) via a client TCP port, which is set by flume.master.zk.client.port. The default is 3181.

Gossip in Distributed Mode

Flume Master servers also use a gossip protocol to exchange information between themselves. Each server periodically wakes and picks another machine to send new data to. This protocol by default uses TCP port 57890, but this is controlled via the flume.master.gossipport property:

<property>
<name>flume.master.gossip.port</name>
<value>57890</value>
</property>

In standalone mode, there is no need to use gossip, so this port is unused.

Diagrams: How the Masters and Nodes talk to each other

master-zk-standalone.png
Figure 3: Flume Master: Standalone Mode
master-zk-internal.png
Figure 4: Flume Master: Distributed Mode

Configuring Flume Nodes to Connect to Multiple Master Servers

One property needs to be set to configure a Flume Node to connect to multiple Masters: flume.master.servers.

<property>
<name>flume.master.servers</name>
<value>masterA,masterB,masterC</value>
</property>

The nodes connect over the port flume.master.heartbeat.port on each machine in the Flume Master - this is the port that the Master servers listen on for node heartbeats.

If a Master server fails, nodes will automatically fail over to the next randomly selected Master server that they can establish a connection to.

External ZooKeeper Cluster

In some cases you may want a ZBCS that relies on an externally managed ZooKeeper service. The most common example of this is where multiple services which rely on ZooKeeper are being used (Flume and Hbase for example). In the following example zkServer{A,B,C}:2181 should be replaced with the hostname/port of the ZooKeeper servers which make up your ensemble.

conf/flume-site.xml
<property>
  <name>flume.master.zk.use.external</name>
  <value>true</value>
</property>

<property>
  <name>flume.master.zk.servers</name>
  <value>zkServerA:2181,zkServerB:2181,zkServerC:2181</value>
</property>

Section Summary

This section described installing, deploying, and configuring a set of Flume nodes in a fully distributed setting. You should now be able to collect streams of logs with Flume.

You also used some roles for sources and sinks to connect nodes together. You now have an understanding of the basics of setting up a set of Flume nodes. Here’s the new sources and sinks introduced in this subsection.

Flume’s Tiered Event Sources
collectorSource[(port)]

Collector source. Listens for data from agentSinks forwarding to port port. If port is not specified, the node default collector TCP port, 35853.

Integrating Flume with your Data Sources

Flume’s source interface is designed to be simple yet powerful and enable logging of all kinds of data — from unstructured blobs of byte, semi-structured blobs with structured metadata, to completed structured data.

In this section we describe some of the basic mechanisms that can be used to pull in data. Generally, this approach has three flavors. Pushing data to Flume, having Flume polling for data, or embedding Flume or Flume components into an application.

These mechanisms have different trade-offs — based on the semantics of the operation.

Also, some sources can be one shot or continuous sources.

Push Sources

syslogTcp, syslogUdp

wire-compatibility with syslog, and syslog-ng logging protocols.

scribe

wire-compatibility with the scribe log collection system.

Polling Sources

tail, multitail

watches a file(s) for appends.

exec

This is good for extracting custom data by using existing programs.

poller

We can gather information from Flume nodes themselves.

Embedding Sources

Warning
these features are incomplete.

log4j

simple client library

// move this to gathering data from sources

Logging via log4j Directly

Example of Logging Hadoop Jobs

Logging Hadoop Daemons

Using Data Collected by Flume

The first goal of Flume is to collect data and reliably write it to HDFS. Once data arrives, one wants the ability to control where and in what format data is stored. Flume provides basic output control mechanisms via the properties configuration and in the dataflow language. This gives the user the ability to control the output format and output bucketing of incoming data, and simplifies integration with other HDFS data consumers such as Hive and HBase.

Here are some example use cases:

  • When monitoring a web server, you want to bucket logs based on time, the page hit, and the browser being used.

  • When tracking particular data nodes, you want to bucket logs based on time and the data node name.

  • When tracking a feed of JIRA tickets from the Apache feed, you want to group based on the project identifier or a particular person.

  • When collecting data from scribe sources, you want to use its bucket data based on its the event’s category information.

To support these kinds of features, Flume uses a simple data model, provides a mechanism for bucketing events, and also provides basic extraction operations for specifying custom bucketing discriminators.

The Data Model of a Flume Event

A Flume event has these six main fields:

  • Unix timestamp

  • Nanosecond timestamp

  • Priority

  • Source host

  • Body

  • Metadata table with an arbitrary number of attribute value pairs.

All events are guaranteed to have all of these elements. However, the body may have zero length, and the metadata table can be empty.

The Unix timestamp is measured in milliseconds and is Unix time stamp from the source machine. The nanosecond timestamp is machine specific nanosecond counter also from the source machine. It is safe to assume that the nanotime from a machine is monotonically increasing — i.e. if event A has a larger nanotime than event B from the same machine, event A was initially received before event B.

Currently the priority of a message can have one of 6 values: TRACE, DEBUG, INFO, WARN, ERROR, or FATAL. These values are often provided by logging systems such as syslog or log4j.

The source host is the name of the machine or the IP (whatever hostname call returns).

The body is the raw log entry body. The default is to truncate the body to a maximum of 32KB per event. This is a configurable value and can be changed by modifying the flume.event.max.size.bytes property.

Finally there is the metatdata table which is a map from a string attribute name to an arbitrary array of bytes. This allows for custom bucketing attributes and will be described in more depth in the Advanced Usage section of this guide.

Output Bucketing

You can control the output of events to particular directories or files based on the values of an event’s fields. To enable this, you provide an escaping mechanism that outputs data to a particular path.

For example, here is an output spec:

collectorSink("hdfs://namenode/flume/webdata/%H00/", "%{host}-")

The first argument is the directory where data is to be written. The second is a filename prefix where events are written. Suppose you get an event from a machine called server1 generated at time 18:58. The events would get written to HDFS with namenode namenode, in a directory called /flume/webdata/1800/, with files named server1-xxx where xxx is some extra data for unique file names.

What happened here? Flume replaced the %H with a string that represent the hour of the timestamp found in the event’s data. Likewise, the %o was replace with the hostname field from the event.

What happens if the server1’s message had been delayed and the message wasn’t sent downstream until 19:05? Since the value of the timestamp on the event was during the 18:00 hour, the event would be written into that directory.

Event data escape sequences
[horizontal] %{host}

host

%{nanos}

nanos

%{priority}

priority string

%{body}

body

%%

a % character.

%t

Unix time in millis

Because bucketing by date is a frequently-requested feature, there are escape sequences for finer control of date values that allow you to bucket data based on date.

Here is another output spec:

collectorSink("hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/", "web-")

This would create directories for each day, each with a subdirectory for each hour with filenames prefixed "web-".

Fine grained escape sequences date and times
%a

locale’s short weekday name (Mon, Tue, …)

%A

locale’s full weekday name (Monday, Tuesday, …)

%b

locale’s short month name (Jan, Feb,…)

%B

locale’s long month name (January, February,…)

%c

locale’s date and time (Thu Mar 3 23:05:25 2005)

%d

day of month (01)

%D

date; same as %m/%d/%y

%H

hour (00..23)

%I

hour (01..12)

%j

day of year (001..366)

%k

hour ( 0..23)

%l

hour ( 1..12)

%m

month (01..12)

%M

minute (00..59)

%P

locale’s equivalent of am or pm

%s

seconds since 1970-01-01 00:00:00 UTC

%S

second (00..60)

%y

last two digits of year (00..99)

%Y

year (2010)

%z

+hhmm numeric timezone (for example, -0400)

Output Format

Now that you have control of where files go, this section describes how you can control the output format of data. Currently, this is set via the flume.collector.output.format property set in the flume-site.xml file. The output formats are:

Output formats
syslog

a syslog like text output format

log4j

a log4j pattern similar to that used by CDH output pattern.

avrojson

JSON encoded date generated by avro

avrodata

Binary encoded data written in the avro binary format.

default

a debugging format.

<property>
  <name>flume.collector.output.format</name>
  <value>avrojson</value>
  <description>This is the output format for the data written to the
  collector.  There are several formats available:
    syslog - outputs events in a syslog-like format
    log4j - outputs events in a pattern similar to Hadoop's log4j pattern
    avrojson - this outputs data as json encoded by avro
    avrodata - this outputs data as a avro binary encoded data
    default - this is a format for debugging
  </description>
</property>

Small Files Compared to High Latency

For all versions Hadoop’s file system that are <= 0.20.x, HDFS has write-once read-many semantics. Thus, the only way to reliably flush an HDFS file is to close the file. Moreover, once a file is closed, no new data can be appended to the file. This presents a tension between getting data written quickly to HDFS and potentially having many small files (which is a potential scalability bottleneck of HDFS).

On one side, to minimize the load and data stored throughput the system, ideally one would flush data to HDFS as soon as it arrives. Flushing frequently is in conflict with efficiently storing data to HDFS because this could result in many small files, which eventually will stress an HDFS namenode. A compromise is to pick a reasonable trigger that has a collector close "reasonably-sized" files (ideally larger than a single HDFS block, 64MB by default).

When Flume is deployed at a scale where data collection volumes are small, it may take a long time to reach the ideal minimum file size (a block size, typically 64MB). For example, if a single web server produces 10k of logs a second (approx. 100 hit logs/s at 100B per log), it will take about 2 hours (6400 seconds) before an ideal file size can reached.

In these situations, lean towards having more small files. Small files cause a few problems downstream. These include potential scaling limitations of Hadoop’s HDFS, and performance penalties when using MapReduce’s default input processing mechanisms within Hadoop.

The following sections describe two mechanisms to mitigate these potential problems:

  • Rolling up many small data files into larger batches

  • Using a CombinedFileInputFormat

This particular problem becomes less of an issue when the scale of logging goes up. If a hundred machines were generating the same amount of logs, you would reach reasonable files sizes every 64 seconds.

Future versions of Hadoop will mitigate this problem by providing a flush/sync operation for currently open HDFS files (patch is already slated for Hadoop HDFS 0.21.x).

Gzip Compression for files written to HDFS.

Flume now supports basic gzip compression for all log files that are written to HDFS. Compressed files are automatically suffixed with a .gz extension and follow the same naming format + directory structure as regularlog files.

  <property>
    <name>flume.collector.dfs.compress.gzip</name>
    <value>true</value>
    <description>Writes compressed output in gzip format to dfs. Setting value to false or empty writes uncompressed data to disk
    </description>
  </property>

Advanced Flume Usage

This section describes in further detail, how to automate the control of Flume nodes via the FlumeShell, a deep dive into Flume’s dataflow specification language, internals of the reliability mechanisms, how to do metadata manipulations, and how to install source and sink plugins.

The Flume Command Shell

So far, you have been modifying the state of Flume using a simple (but primitive) web interface to a Master server.

Flume also provides a shell, which allows the user to type commands into a terminal and have them executed on a Flume deployment.

All of the commands available in the web form are available in the Flume Shell. The Flume Shell, however, actually has extra controls for command submission control and state checking that aid scriptability.

Using the Flume Command Shell

You can start the FlumeShell by running flume shell in a terminal window. The connect command can be used to establish a connection to any Master server.

hostname:~/flume$ flume shell
[flume (disconnected)] connect localhost:35873
Connecting to Flume master localhost:35873...
[flume localhost:35873]
hostname:~/flume$ flume shell -c localhost:35873
Connecting to Flume master localhost:35873...
[flume localhost:35873]

The command line parameters for the Flume Shell are as follows:

usage: FlumeShell [-c <arg>] [-e <arg>] [-q] [-s <arg>]
 -?         Command line usage help
 -c <arg>   Connect to master:port
 -e <arg>   Run a single command
 -q         Run in quiet mode - only print command results
 -s <arg>   Run a FlumeShell script

The FlumeShell makes scripting Flume possible - either by using a single invocation with -e or by running a script of commands with -s. It is also possible to pipe stdin to the FlumeShell as in the following example:

echo "connect localhost:35873\ngetconfigs\nquit" | flume shell -q
Flume Commands

You can press Tab any time for some hints on available commands. If you start typing a command you can use TAB to complete command.

help

List the commands available in the shell.

connect master:port

connect to a master at machine master on port port.

config logicalnode source sink

configure a single logical node logicalnode with source source and sink sink. source and sink will likely need quotes to support some of the Flume configuration syntax.

getnodestatus

Output the status of the nodes the master knows about. Nodes are in either HELLO, CONFIGURING, ACTIVE, IDLE, ERROR, DECOMMISSIONED, or LOST states. When a node shows up initially it is HELLO state. When a node is being configured, it is in CONFIGURING state. Once events are being pumped from source to sink, the node is in ACTIVE state. If a node has drained its source (and the source is not "endless") it will enter IDLE state. If a node encountered an unrecoverable error or exited without flushing, it will be in ERROR state. A node is DECOMMISSIONED if it is removed on the master, and LOST if it has not been seen by the master for a "long time".

getconfigs

This gets and dumps the configuration specifications of all the logical nodes the master knows about.

exec

Synchronously execute a command on the master. This command will block until it is completed.

source file

Reads the specified file and attempts to execute all of the specified commands.

submit

Asynchronously execute a command on the master. This command will return immediately and allows the submission of other commands. The command ID of the last command submitted is recorded.

wait ms [cmdid]

This commands blocks for up to ms milliseconds until cmdid has entered the SUCCEEDED or FAILED state. If ms is 0 the command may block forever. If the command times out, the shell will disconnect. This is useful in conjunction with submitted commands.

waitForNodesActive ms node1 [node2 […]]

This command blocks for up to ms milliseconds until the specified list of nodes have entered the ACTIVE or CONFIGURING state. If ms==0 then the command may block forever.

waitForNodesDone ms node1 [node2 […]]

This command blocks for up to ms milliseconds until the specified list of nodes have entered the IDLE, ERROR, or LOST state.

quit

Exit the shell.

Exec and Submit commands

Both the web form and the FlumeShell are interfaces to the same command processing infrastructure inside Flume. This section introduces the FlumeShell and show how you can use it to make administering Flume more simple.

These commands are issued and run as if run from the master. In the command shell they have the form:

exec command [arg1 [_arg2 [ … ] ] ]

submit command [arg1 [_arg2 [ … ] ] ]

Complex arguments like those with spaces, or non alpha-numeric characters can be expressed by using "double quotes"s and `single quotes’s. If enclosed in double quotes, the bodies of the strings are Java string unescaped. If they are enclosed in single quotes, arbitrary characters can be included except for the ' character.

exec commands block until they are completed. submit commands are asynchronously sent to the master in order to be executed. wait are essentially joins for recently submit ted commands.

noop

This command contacts the master and issues a noop (no operation) command.

config logicalnode source sink

This command configures a node. This is nearly identical to the config command.

multiconfig flumespec

This command configures a set of nodes on the master using the aggregated format.

unconfig logicalnode

This command changes the configuration of a particular node to have a null source and a null state. +refresh

logicalnode+

This command refreshes the current configuration of a logical node. This forces the logicalnode to stop and then restart. This also causes a master re-evaluation that may change the failover lists.

refreshAll logicalnode

This atomically issues a refresh command to all of the logical nodes.

save filename

This saves the current configuration to the master’s disk.

load filename

This augments the current configuration with the logical node specifications found in filename.

spawn physicalnode logicalnode

This creates a new logical node logicalnode on physical node physicalnode. The node starts with a null source and a null sink, and updates its configuration specified at the master when it begins heartbeating. Thus if a logical node configuration already exists and a new node is spawned, it will pick up the configuration for the logical node.

decommission logicalnode

This removes a logical node from the logical node configuration table, and unmaps it from any physical nodes it may be installed on.

unmap physicalnode logicalnode

This command breaks the assignment of a logicalnode from machine physicalnode. A logical node can be reassigned to another physical node using the spawn command.

unmapAll

This command breaks the assignment of all logical node from physical nodes. A logical node can be reassigned to another physical node using the spawn command.

Flume’s Dataflow Specification Language

Using the Flume node roles (collector, agent) is the simplest method to get up and running with Flume. Under the covers, these sources and sinks (collectorSink, collectorSource, and agentSource) are actually composed of primitive sinks that have been augmented with translated components with role defaults, special sinks and sink decorators. These components make configuration more flexible, but also make configuration more complicated. The combination of special sinks and decorators expose a lot of details of the underlying mechanisms but are a powerful and expressive way to encode rich behavior.

Flume enables users to enter their own composition of sinks, sources, and decorators by using a domain specific data flow language. The following sections will describe this in more details.

nodeName      ::=  NodeId
simpleSource  ::=  SourceId args?
simpleSink    ::=  SinkId args?
decoratorSink ::=  DecoId args?

source ::= simpleSource

sink ::=   simpleSink            // single sink
     |     [ sink (, sink)* ]    // fanout sink
     |     { decoratorSink => sink }  // decorator sink
     |     < sink ? sink >           // failover / choice sink
     |     roll(...) { sink }           // roll sink
     |     let SinkId := sink in sink  // let expression

logicalNode ::= NodeId : source | sink ;

spec   ::=  (logicalNode)*

Special Sinks: Fan out, Fail over, and Roll

Three special sinks are FanOutSinks, FailoverSinks, and RollSinks.

Fanout sinks send any incoming events to all of the sinks specified to be its children. These can be used for data replication or for processing data off of the main reliable data flow path. Fanout is similar to the Unix tee command or logically acts like an AND operator where the event is sent to each subsink.

The syntax for a FanoutSink is :

[ console, collectorSink ]

FailoverSinks are used to handle failures when appending new events. FailoverSinks can be used to specify alternate collectors to contact in the event the primary collector fails, or a local disk sink to store data until the primary collector recovers. Failover is similar to exception handling and logically acts like an OR operator. If a failover is successful, one of the subsinks has received the event.

The syntax for a FailoverSink is :

< logicalSink("collector1") ? logicalSink("collector2") >

So, you could configure node "agent1" to have a failover to collector2 if collector1 fails (for example, if the connection to collector1 goes down or if collector1's HDFS becomes full):

agent1 : source | < logicalSink("collector1") ? logicalSink("collector2")
> ;

Roll sink opens and closes a new instance of its subsink every millis milliseconds. A roll is an atomic transition that closes the current instance of the sub-sink, and then opens a new instance of as an escape sequence to differentiate data in different roll periods. This can be used to make every roll produce a new file with a unique name.

The syntax for a roll sink is:

roll(millis) sink

These can be composed to have even richer behavior. For example, this sink outputs to the console and has a failover collector node.

[ console, < logicalSink("collector1") ? logicalSink("collector2") > ]

This one rolls the collector every 1000 milliseconds writing to a different HDFS file after each roll.

roll(1000) [ console, escapedCustomDfs("hdfs://namenode/flume/file-%{rolltag}") ]

Introducing Sink Decorators

Fan out and failover affect where messages go in the system but do not modify the messages themselves. To augment or filter events as they pass through the dataflow, you can use sink decorators.

Sink decorators can add properties to the sink and can modify the data streams that pass through them. For example, you can use them to increase reliability via write ahead logging, increase network throughput via batching/compression, sampling, benchmarking, and even lightweight analytics.

The following simple sampling example uses an intervalSampler which is configured to send every 10th element from source "source" to the sink "sink":

flumenode: source | { intervalSampler(10) => sink };

Here’s an example that batches every 100 events together.

flumenode: source | { batch(100) => sink };

Like fanout and failover, decorators are also composable. Here is an example that creates batches of 100 events and then compresses them before moving them off to the sink:

flumenode: source | { batch(100) =>  { gzip => sink } };

Translations of High-level Sources and Sinks

Internally, Flume translates sinks into compositions of other simpler decorators, failovers, rollers to add properties. The proper compositions create pipelines that provide different levels of reliability in Flume.

Note
This section describes how agents work, but this version of Flume does not expose the translations the same way the auto*Chains are exposed. A future version of Flume will expose these details. The exact translations are still under development.

Suppose you have nodes using the different agent sinks:

node1 : tail("foo") | agentE2ESink("bar");
node2 : tail("foo") | agentDFOSink("bar");
node3 : tail("foo") | agentBESink("bar");

In the translation phases, agentE2ESink is actually converted into these Flume sinks:

node1 : tail("foo") | { ackedWriteAhead => { lazyOpen => { stubbornAppend
=> logicalSink("bar") } } } ;
node2 : tail("foo") | let primary := { lazyOpen -> {stubbornAppend => logicalSink("bar") } } in < primary ? { diskFailover => { insistentOpen => primary} } >;
node3 : tail("foo") | { lazyOpen => { stubbornAppend =>  logicalSink("bar")  } };

ackedWriteAhead is actually a complicated decorator that internally uses rolls and some other special decorators. This decorator interface allows us to manually specify wire batching and compression options. For example, you could compress every 100 messages using gzip compression:

node1 : tail("foo") | { ackedWriteAhead => { batch(100) => { gzip => {
lazyOpen => { stubbornAppend => logicalSink("bar") } } } } };

collectorSink("xxx","yyy",15000) is also a bit complicated with some custom decorators to handle acks. Under the covers however, it depends on a roller with a escapedCustomDfsSink inside of it.

roll(15000) { collectorMagic =>  escapedCustomDfs("xxx", "yyy-%rolltag") }

Another place translations happen is with logical nodes. Lets start of with a few nodes:

node1 : tail("foo") | { .... => logicalSink("node2") };
node2 : logicalSource | collectorSink("...");

The translation mechanisms converts logicalSources and logicalSinks into lower level physical rpcSources and rpcSinks. Lets assume that node1 is on machine host1 and node2 is on machine host2. After the translation you end up with the following configurations:

node1 : tail("foo") | { .... => rpcSink("host2",12345) };
node2 : rpcSource(12345) | collectorSink("..."");

Suppose that you swap the mapping so that node2 is now on host1 and node1 is on host2.

# flume shell commands
exec unmapAll
exec spawn host1 node2
exec spawn host2 node1

The original configuration will now be translated to:

node1 : tail("foo") | { .... => rpcSink("host1",12345) };
node2 : rpcSource(12345) | collectorSink("..."");

Custom Metadata Extraction

While Flume can take in raw data, you can add structure to data based on the nodes they flowed through, and filter out portions of data to minimize the amount of raw data needed if only a portion is needed.

The simplest is the value decorator. This takes two arguments: an attribute name, and a value to add to the event’s metadata. This can be used to annotate constant information about the source, or arbitrary data to a particular node. It could also be used to naively track the provenance of data through Flume.

Extractors

Extraction operations are also available to extract values from logs that have known structure.

One example is the regex decorator. This takes three arguments: a regular expression, an index, and an attribute name which allows the user to use a extract a particular regex group out of the body of the event and write it as the value of the specified attribute.

Similarly the split decorator also takes three arguments: a regular expression, an index, and an attribute name. This splits the body based on the regular expression, extracts the text group after the instance of the separator, and writes the value to the specified attribute. One can think of this as a much simplified version of the awk Unix utility.

Meta Data Filtering and Transformations

Flume enforces an invariant that prevents the modification of an attribute that has already been written upstream. This simplifies debugging of dataflows, and improves the visibility of data when debugging.

If many stages are used, however, frequently a lot of extra metadata ends up being moved around. To deal with this, a few extra operations are available.

A select operation is available. This operation is like SQL select, which provides a relational calculus’s set projection operation that modifies an event so that the specified metadata fields are forwarded.

A mask operation is also available that forwards all metadata attributes except for the attributes specified.

A format decorator is also available that uses the escape mechanisms to rewrite the body of an event to a user customizable message. This is useful for outputting summary data to low volume sources. Example: writing summary information out to an IRC channel periodically.

Role Defaults

To simplify things for users, you can assign a particular role to a logical node — think of these as an "automatic" specification that have default settings. Two roles currently provided are the agent role and the collector role. Since there are many nodes with the same role, each of these are called a tier. So, for example, the agent tier consists of all the nodes that are in the agent role. Nodes that have the collector role are in the collector tier.

Agents and collectors roles have defaults specified in the conf/flume-site.xml file. Look at the conf/flume-conf.xml file for properties prefixed with flume.agent.* and flume.collector.* for descriptions of the configuration options.

Each node maintains it own node state and has its own configuration. If the master does not have a data flow configuration for the logical node, the logical node will remain in IDLE state. If a configuration is present, the logical node will attempt to instantiate the data flow and have it work concurrently with other data flows.

This means that each machine essentially only lives in one tier. In a more complicated setup, it is possible to have a machine that contains many logical nodes, and because each of these nodes can take on different roles, the machine lives in multiple tiers.

Arbitrary Data Flows and Custom Architectures

With tsinks and tsource, data can be sent through multiple nodes. If ack injection and ack checking decorators are properly inserted, you can achieve reliability.

Extending via Sink/Source/Decorator Plugins

An experimental plugin mechanism is provided that allows you to add new custom sources, sinks, and decorators to the system.

Two steps are required to use this feature.
  1. First, add the jar with the new plugin classes to flume’s classpath. If the plugin requires dlls/so’s make sure these are in the LD_LIBRARY_PATH (unix .so) or PATH (windows .dll)

  2. Second, in flume-site.xml, add the class names of the new sources, sinks, and/or decorators to the flume.plugin.classes property. Multiple classes can be specified by comma separating the list. Java reflection is used to find some special static methods that add new components to the system and data flow language’s library.

An example component has been "pluginified" — the "HelloWorld" source, sink, and decorator. This plugin does something very simple; the source generates the text "hello world!" every three seconds, the sink writes events to a "helloworld.txt" text file, and the decorator prepends "hello world!" to any event it encounters.

  1. cd into the plugins/helloworld directory and type ant, a helloworld_plugin.jar file will be generated

  2. Add the following to flume-site.xml (create it if it doesn’t already exist) "helloworld.HelloWorldSink,helloworld.HelloWorldSource,helloworld.HelloWorldDecorator" to the flume.plugin.classes property in flume-site.xml.

    Important
    if you use the provided flume-site.xml.template file to create your flume-site.xml be sure to comment out or remove any example properties contained in the sample template.
    Example flume-site.xml contents
    <configuration>
      <property>
        <name>flume.plugin.classes</name>
        <value>helloworld.HelloWorldSink,helloworld.HelloWorldSource,helloworld.HelloWorldDecorator</value>
        <description>Comma separated list of plugins</description>
      </property>
    </configuration>
  3. Start the Flume master and at least one logical node in separate terminals

    1. in each terminal cd into the toplevel flume directory, should be just above plugins

    2. Add helloworld_plugin.jar to the FLUME_CLASSPATH in both terminals

      export FLUME_CLASSPATH=`pwd`/plugins/helloworld/helloworld_plugin.jar
    3. in terminal 1 run bin/flume master

    4. in terminal 2 run bin/flume node -n hello1

  4. At this point the master and hello1 nodes should be started and will have loaded the plugin

    You should see log output similar to the following in both master and hello1:
    10/07/29 17:35:28 INFO conf.SourceFactoryImpl: Found source builder helloWorldSource in helloworld.HelloWorldSource
    10/07/29 17:35:28 INFO conf.SinkFactoryImpl: Found sink builder helloWorldSink in helloworld.HelloWorldSink
    10/07/29 17:35:28 INFO conf.SinkFactoryImpl: Found sink decorator helloWorldDecorator in helloworld.HelloWorldDecorator
    Tip
    Another way to verify that your plugin is loaded is to check if it is displayed on this page http://localhost:35871/masterext.jsp
  5. Configure hello1

    Tip
    The easiest way to do this is open the configuration page of the master in a browser, typically this link http://localhost:35871/flumeconfig.jsp
    1. load the helloworld source/sink into our hello1 node (the bottom text box, then submit button if you are using the master’s web interface"

      hello1: helloWorldSource() | helloWorldSink();
    2. you could also try the hello world decorator

      hello1: helloWorldSource() | { helloWorldDecorator() => helloWorldSink() };

      In either case hello1 will output a helloworld.txt file into it’s current working directory. Every 3 seconds a new "hello world!" line will be output to the file.

Appendix

Flume Source Catalog

Flume’s Tiered Event Sources

These sources and sinks are actually translated from their original specification into compositions of more specific lower level configurations. They generally have reasonable default arguments assigned by the properties xml file or by the Master. The defaults can be overridden by the users.

collectorSource[(port)]

Collector source. Listens for data from agentSinks forwarding to port port. If port is not specified, the node default collector TCP port, 35863. This source registers itself at the Master so that its failover chains can automatically be determined.

logicalSource

Logical Source. This source has a port assigned to it by the Master and listens for rpcSink formatted data.

Flume’s Basic Sources

These sources are untranslated and generally need all of their arguments.

null

Null source. Opens, closes, and returns null (last record) on next().

console

Stdin console. A flume node must be started with the flume node_nowatch — the watchdog does not allow console input.

rpcSource(port)

A remote procedure call (RPC) server that is configured to listen on TCP port port. Currently, the Apache-Thrift RPC framework is used.

text("filename")

One-time text file source. One event per \n delimited line.

tail("filename"[, startFromEnd=false])

Similar to Unix’s tail utility. One line is one event. Generates events for the entire file then stays open for more data, and follows filename. (e.g. if tailing file "foo" and then "foo" is moved to "bar" and a new file appears named "foo", it will finish reading the new "bar" file and then start from the beginning of "foo"). If the startFromEnd parameter is false, tail will re-read from the beginning of the file. If it is true, it will only start reading from the current end of file.

multitail("filename"[, file2 [,file3 … ] ])

Like tail but can follow multiple files concurrently.

tailDir("dirname"[, fileregex=".*"])

Tails all files in a directory dirname that match the specified fileregex. Be careful and make sure because the regex argument requires java style escaping of \ and \". For example \w+ would have to be written as "\\w+". If a new file appears, it is added to the list of files to tail. If pointed at a new directory, it will attempt to read all files that match!

seqfile("filename")

Read from a Hadoop sequence file formatted file, with com.cloudera.flume.handlers.hdfs.WriteableEventKey and com.cloudera.flume.handlers.hdfs.WriteableEvent values. Conveniently, this source can read files generated by the seqfile sink.

syslogUdp(port)

Syslog over UDP port. This is syslog compatible.

syslogTcp(port)

Syslog over TCP port. This is syslog-ng compatible. This is a server that can listen and receive on many concurrent connections.

syslogTcp1(port)

Syslog over TCP port. This is syslog-ng compatible. This is only available for a single connection and then shuts down afterwards.

execPeriodic("cmdline", ms)

Execute an arbitrary program specified by cmdline. The entire output of the execution becomes the body of generated messages. ms specifies the number of milliseconds to wait before the next execution (and next event). Ideally the program is short lived. This does not process shell pipes or redirection operations — for these write a script and use the script as the cmdline argument.

execStream("cmdline")

Execute an arbitrary program specified by cmdline. Each line outputted will become a new event. Ideally the program is long lived. This does not process shell pipes or redirection operations — for these write a script and use the script as the cmdline argument.

exec("cmdline"[, aggregate=false[, restart=false[,period=0]]])

Execute an arbitrary program specified by cmdline. If the aggregate argument is true entire program output is considered an event; otherwise, each line is considered a new event. If the restart argument is true, then the program is restarted after it exits after waiting for period milliseconds. execStream("foo") is equivalent to exec("foo", false, false, 0). execPeriodic("foo", 1000) is equivalent to exec("foo", true, true, 1000)

synth(msgCount,msgSize)

A source that synthetically generates msgCount random messages of size msgSize. This will generate non printable characters.

nonlsynth(msgCount,msgSize)

A source that synthetically generates msgCount random messages of size msgSize. This converts all '\n' chars into ' ' chars. This will generate non-printable characters but since all randomly generated \n are converted, sources dependent on \n as a record separator can get uniformly sized data.

asciisynth(msgCount,msgSize)

A source that synthetically generates msgCount random messages of size msgSize. This converts all '\n' chars into ' ' chars, and all non ASCII characters into printable ASCII characters.

twitter("username","pw"[,"url"])

(Unsupported) A source that collects data from a twitter "spritzer" stream. username is a twitter username, pw is the password for the user, and url is the url for the feed. If not specified, http://stream.twitter.com/1/statuses/sample.json is used by default the url. See http://apiwiki.twitter.com/Streaming-API- Documentation for more details.

irc("server",port, "nick","chan")

(Unsupported) An IRC channel source. Each line sent to the channel is a new event. It attempts to connect to server on TCP port port (standard is 6667). When it connects it attempts to take the nickname nick, and enter channel chan (like #hadoop ).

scribe[(+port)]

A scribe source. This provides a network socket that is compatible with data generated by Facebook’s Scribe collection system.

report[(periodMillis)]

This source polls the local physical node for its report every periodMillis milliseconds and turns it into a new event. The attribute names seen from the node report page are present, and the values are uninterpreted arrays of bytes.

Flume Sinks Catalog

Flume’s Collector Tier Event Sinks
collectorSink("fsdir","fsfileprefix", rollmillis)

Collector sink. fsdir is a fs directory URI such as hdfs://namenode/path or file:/// path. fsfileprefix is a file name prefix for outputted files. Both of these can use escape sequences documented to bucket data as documented in the Output Bucketing section. rollmillis is the number of milliseconds between when a HDFS file should be rolled (opened and closed). The format for data outputted by collectors is specified by the flume.collector.output.format property.

Flume’s Agent Tier Event Sinks
agentSink[("machine"[, port])]

Defaults to agentE2ESink

agentE2ESink[("machine"[, port])]

Agent sink with write ahead log and end-to-end ack. Optional arguments specify a machine, and the TCP port pointing to a collectorSource. If none is specified, the values specified by the flume.collector.event.host and the flume.collector.port properties will be used.

agentDFOSink[("machine"[, port])]

DiskFailover Agent sink that stores to local disk on detected failure. This sink periodically checks with the machine:port and resends events if becomes alive again. Optional arguments specify a machine, and the TCP port pointing to a collectorSource. If none is specified, the values specified by the flume.collector.event.host and the flume.collector.port properties will be used.

agentBESink[("machine"[, port])]

BestEffort Agent sink. This drops messages on failures and continues sending. Optional arguments specify a collector, and the TCP PORT pointing to a collectorSource. If none is specified, the values specified by the flume.collector.event.host and the flume.collector.port properties will be used.

agentE2EChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

Agent sink with write-ahead log and end-to-end ack and collector failover chains. m1:p1 specifies a machine and optional port of the primary default collector. If all failovers are exhausted due to failures, and since data is already durable locally, it will back off attempts to send down stream. Optional arguments specify a list of failover machine:port pairs in a ranked order. If a primary collector is not responding, the backups are used. The primary collectors are checked periodically to see if they have come back up.

agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

DiskFailover Agent sink that first attempts to fail over to other collectors. m1:p1 specifies a machine and optional port of the primary default collector. If all failovers are exhausted due to failures, it will store to local disk. Optional arguments specify a list of failover machine:port pairs in a ranked order. If a primary collector is not responding, the backups are used. The primary collectors are checked periodically to see if they have come back up.

agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

BestEffort Agent sink with collector failover chains. m1:p1 specifies a machine and optional port of the primary default collector. If all failovers are exhausted due to failures, this drops messages. Optional arguments specify a collector, and the TCP port of the collector. If none is specified, the values specified by the flume.collector.event.host and the flume.collector.port properties will be used.

autoE2EChain

This sink is an agentE2EChain that has failover nodes populated automatically by the master.

autoDFOChain

This sink is an agentDFOChain that has failover nodes populated automatically by the master.

autoBEChain

This sink is an agentBEChain that has failover nodes populated automatically by the master.

Flume’s Logical Sinks
logicalSink("logicalnode")

This sink creates an rpcSink that is assigned a host and IP based on the name of a logical node. This information is maintained and automatically selected by the master.

Flume’s Basic Sinks
null

Null sink. Events are dropped

console[("formatter")]

Console sink. Display events to process’s stdout using the optionally specified output formatter.

text("txtfile"[,"formatter"])

Textfile sink. Write to text file txtfile, using an optionally specified formatter. If a file already exists, this sink will attempt to overwrite it.

seqfile("filename")

Seqfile sink. Write to a Hadoop sequence file formatted file, with com.cloudera.flume.handlers.hdfs.WriteableEventKey keys and com.cloudera.flume.handlers.hdfs.WriteableEvent values. If a file already exists, this sink will attempt to overwrite it.

dfs("hdfspath")

Hadoop dfs seqfile sink. Write to a dfs path in Flume- specific Hadoop seqfile record format. The hdfspath can use escape sequences to bucket data as documented in the Output Bucketing section.

customdfs("hdfspath"[, "format"])

Hadoop dfs formatted file sink. The hdfspath string is not escaped. The output format of writes to a dfs path in using specified output format.

+escapedCustomDfs("hdfspath", "file", "format")

Hadoop dfs formatted file sink. The hdfspath string is escaped and events will get written to particular directories and filenames based on this string. The output format of writes to a dfs path in using specified output format. The hdfspath can use escape sequences documented to bucket data as documented in the Output Bucketing section.

rpcSink("host"[, port])

A remote procedure call (RPC) sink that is configured to send to machine host on TCP port port. Default port is 35861 and can be overridden by setting the flume.collector.event.port property. Currently, the apache-thrift RPC framework is used.

syslogTcp("host"[,port])

Syslog TCP sink. Write to host "host" on port "port" in syslog over TCP format (syslog-ng compatible). Default port is TCP 514.

irc("host",port, "nick", "chan")

(Unsupported) An IRC channel sink. Each event is sent to the channel as a line. It attempts to connect to server on TCP port port. When it connects it attempts to take the nickname nick, and enter channel chan (like #hadoop).

Flume Sink Decorator Catalog

Flume’s Sink Decorators
nullDeco

This is a decorator that just passes data through to its child sink.

writeAhead(…)

Write-ahead decorator. Provides durability by writing events to disk before sending them. This can be used as a buffering mechanism — receive and send are decoupled in different threads.

ackedWriteAhead[(maxmillis)]

Write-ahead decorator that adds acknowledgement tags and checksums to events. Provides durability by writing events to disk before sending them. This can be used as a buffering mechanism — receive and send are decoupled in different threads. This generates and tracks groups of events, and also notifies other components to check for acknowledgements. These checks for retries are done where there is an exponential backoff that can top out at maxmillis milliseconds. The default value for maxmillis is flume.agent.logdir.maxage property.

diskFailover[(maxmillis)]

Disk failover decorator. Events that enter this decorator are sent to its sub sink. In the event of a down stream error, data is written to disk, and periodically these disk-buffered events are retried. These checks for retries are done where there is an exponential backoff that can top out at maxmillis milliseconds. The default value for maxmillis is flume.agent.logdir.maxage property.

ackInjector

This decorator injects an extra ack group start message on open, tags appended events with an ack tag, and injects an extra ack group end message. These tags contain a checksum, for all the bodies of the events that pass through the ackInjector.

ackChecker

This decorator tracks ack group start, end, and checksum values inserted by ackInjector. If a group has arrived and its checksum is correct, it sends notifications to other components.

lazyOpen

This decorator tracks open/closed state of the sub sink but does not actually open the sink until an append is called. Thus if a this decorator is opened and closed without any appends, the sub sink is never opened.

insistentOpen[(max[init[,cumulativeMax]],)]

An insistent open attempts to open its subsink multiple times until it succeeds with the specified backoff properties. This is useful for starting a network client up when the network server may not yet be up. When an attempt to open the subsink fails, this exponentially backs off and then retries the open. max is the max number of millis per backoff (default is Integer.MAX_VALUE). init is the initial number of millis to back off on the first encountered failure (default 1000). cumulativeMax is the maximum backoff allowed from a single failure before an exception is forwarded (default is Integer.MAX_VALUE). Note that this synchronously blocks the open call until the open succeeds or fails after cumulativeMax millis.

stubbornAppend

A stubborn append normally passes through append operations to its subsink. It catches the first exception that a subsink’s append method triggers, and then closes, opens, and reappends to the subsink. If this second attempt fails, it throws an exception. This is useful in conjunction with network sinks where connections can be broken. The open/ close retry attempt is often sufficient to re-establish the connection.

value("attr","value")

The value decorator adds a new metadata attribute attr with the value value. Agents can mark their data with specific tags for later demultiplexing.

mask("attr1"[,"attr2", …])

The mask decorator outputs inputted events that are modified so that all metadata except the attributes specified pass through.

select("attr1"[,"attr2", …])

The select decorator outputs inputted events that are modified so that only the metadata attributes specified pass through.

format("pattern")

The format decorator outputs inputted events that are modified so that their bodies are replaced with an escaped version of the pattern argument. Since checksums rely on body data, this should only be used on unreliable flows or reporting flows. Inappropriate use may result in message loss.

regex("regex",idx,"attr")

The regex decorator applies the regular expression regex, extracts the idx th capture group, and writes this value to the attr attribute. The regex must use java-style escaping. Thus a regexs that want to use the \d macro need to be specified as "\\d".

split("regex",idx,"attr")

The split decorator uses the regular expression regex to split the body into tokens (not including the splitter value). The idx is then written as the value to the attr attribute. The regex must use java-style escaping. Thus, a regex that wants to use the \d macro must be specified as "\\d".

batch(n,maxlatency)

buffers n events and then sends one aggregate event. If maxlatency millis have passed, all current buffered events are sent out as an aggregate event.

unbatch

Unbatch takes an aggregate event generated by batch, splits it, and then forwards its original events. If an event is not an aggregate it is just forwarded.

gzip

gzips a serialized event. This is useful when used in conjunction with aggregate events.

gunzip

gunzip’s a gzip’ed event. If the event is not a gzip event, it is just forwarded.

intervalSampler(n)

Interval sampler. Every n th event gets forwarded.

probSampler(p)

Probability sampler. Every event has a probability p (where 0.0 ≤ p ≤ 1.0) chance of being forwarded.

reservoirSampler(k)

Reservoir sampler. When flushed, at most k events are forwarded. If more than k elements have entered this decorator, exactly k events are forwarded. All events that pass through have the same probability of being selected. NOTE: This will reorder the events being sent.

delay(ms)

adds a ms millisecond delay before forwarding events down the pipeline. This bocks and prevents other events from entering the pipeline. This is useful for workload simulation in conjunction with asciisynth sources.

flume-site.xml configuration settings

Troubleshooting

What are the default ports?

TCP ports are used in all situations.

node collector port

flume.collector.port

35853+

node status web server

flume.node.http.port

35862+

master status web server

flume.master.http.port

35871

master heartbeat port

flume.master.heartbeat.port

35872

master admin/shell port

flume.master.admin.port

35873

master gossip port

flume.master.gossip.port

35890

master → zk port

flume.master.zk.client.port

3181

zk → zk quorum port

flume.master.zk.server.quorum.port

3182

zk → zk election port

flume.master.zk.server.election.port

3183

What versions of Hadoop HDFS can I use? How do I change this?

Currently, there are constraints writing to HDFS. A Flume node can only write to one version of Hadoop. Although Hadoop’s HDFS API has been fairly stable, HDFS clients are only guaranteed to be wire compatible with the same major version of HDFS. Cloudera’s testing used Hadoop HDFS 0.20.x and HDFS 0.18.x. They are API compatible so all that is necessary to switch versions is to swap out the Hadoop jar and restart the node that will write to the other Hadoop version.

You still need to configure this instance of Hadoop so that it talks to the correct HDFS nameNode. You configure the Hadoop client settings (such as pointers to the name node) the same way as with Hadoop dataNodes or worker nodes — modify and use conf/core-site.xml.

By default, Flume checks for a Hadoop jar in /usr/lib/hadoop. If it is not present, it defaults to a jar found in its own lib directory, /usr/lib/flume/ lib.

Why doesn’t a Flume node appear on Flume Master?

If a node does not show up on the Master, you should first check to see if the node is running.

# jps | grep FlumeNode

You can also check logs found in /usr/lib/flume/logs/flume-flume-node*.log.

Common errors are error messages or warnings about being unable to contact the masters. This could be due to host misconfiguration, port misconfiguration (35872 is the default heartbeat port), or fire walling problems.

Another possible error is to have a permissions problems with the local machine’s writeahead log directory. On an out-of-the-box setup, this is in the /tmp/flume/agent directory). If a Flume Node is ever run as a user other than flume, (especially if it was run as root), the directory needs to be either deleted or its contents must have its permissions modified to allow Flume to use it.

Why is the state of a Flume node changing rapidly?

Nodes by default start and choose their hostname as their physical node name. Physical nodes names are supposed to be unique. Unexpected results may occur if multiple physical nodes are assigned the same name.

Where are the logs for troubleshooting Flume itself?

On Ubuntu-based installations, logs are written to /usr/lib/logs/.

Master logs

/usr/lib/logs/flume-flume-master-host.log

Master stdout

/usr/lib/logs/flume-flume-master-host.out.*

Node logs

/usr/lib/ logs/flume-flume-node-host.log

Node stdout

/usr/lib/logs/flume-flume- node-host.out.*

What can I do if I get node failure due to out of file handles?

There are two limits in Linux — the max number of allowable open files (328838), and max number of allowable open files for a user (default 1024). Sockets are file handles so this limits the number of open TCP connections available.

On Ubuntu, need to add a line to /etc/security/limits.conf

<user> hard nofile 10000

The user should also add the following line to a ~/.bash_profile to raise the limit to the hard limit.

ulimit -n 10000

Failures due when using Disk Failover or Write Ahead Log

Flume currently relies on the file system for logging mechanisms. You must make sure that the user running Flume has permissions to write to the specified logging directory.

Currently the default is to write to /tmp/flume. In a production system you should write to a directory that does not automatically get deleted on reboot.

Can I write data to Amazon S3?

Yes. In the collector sink or dfs sinks you can use the s3n:// or s3:// prefixes to write to S3 buckets.

First you must add some jars to your CLASSPATH in order to support s3 writing. Here is a set that Cloudera has tested (other versions will likely work as well):

  • commons-codec-1.3.jar

  • commons-httpclient-3.0.1.jar

  • jets3t-0.6.1.jar

s3n uses the native s3 file system and has some limitations on individual file sizes. Files written with this method are compatible with other programs like s3cmd.

s3 writes using an overlay file system must go through Hadoop for file system interaction.

Glossary

Agent

A Flume node located at the start of a flow that captures data from external sources, ready for feeding downstream.

Collector

A Flume node located at the end of a flow that delivers data to its eventual destination.

Flow

A set of nodes wired together in sequence which together process data from a single source into its eventual destination.

Master

A service that controls the configuration of all nodes, and to which all nodes report.

Sink

The place where a node sends its data after all processing is done.

Source

The place where a node gets its data stream.

Versions

history

v0.9.1 8/9/10

Improved error messages and visiblity of property configuration values. First external contributions. Fixed reconfiguration hangs. Improved implementing plugins documentation. Updated scribe and syslog support. Compression on output files.

v0.9 6/29/10

metrics and reporting framework, logical nodes+logical names abstraction, wal/dfo isolation by flow, transformation-based high level sinks. Open source and initial push to github.

v0.3 3/31/10

ZK based master/multi-master, automatic failovers for data and control planes. flume shell. deb/rpm packaging.

v0.2 1/21/09

Different reliability modes: WAL 2.0, DFO, Best effort. Output file escaping/bucketing. Proliferation of many sink and decorators.

v0.1 11/23/09

First installation depoyment, users tests.

v0.0 9/21/09

First cut with current architecture (centralized master, configuration language, web interface.) First version of WAL. Simple visualizations, samplers, thrift based rpc.

pre history 7/21/09

First commit. design, experimental implementations. Initial implementation had individual agent and collector programs, watchdog

     ______
    / ___//_  ______  ____
   / /_/ / / / /    \/ __/
  / __/ / /_/ / / / / __/
 / / /_/\____/_/_/_/\__/
/_/ Distributed Log Collection.