View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.EOFException;
27  import java.io.FileNotFoundException;
28  import java.io.IOException;
29  import java.io.OutputStream;
30  import java.io.UnsupportedEncodingException;
31  import java.lang.reflect.Method;
32  import java.net.URLEncoder;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.SortedMap;
40  import java.util.TreeMap;
41  import java.util.TreeSet;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.Executors;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicInteger;
50  import java.util.concurrent.atomic.AtomicLong;
51  import java.util.concurrent.locks.Condition;
52  import java.util.concurrent.locks.Lock;
53  import java.util.concurrent.locks.ReentrantLock;
54  import java.util.regex.Pattern;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FileStatus;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.fs.Syncable;
63  import org.apache.hadoop.hbase.HBaseConfiguration;
64  import org.apache.hadoop.hbase.HConstants;
65  import org.apache.hadoop.hbase.HRegionInfo;
66  import org.apache.hadoop.hbase.HServerInfo;
67  import org.apache.hadoop.hbase.HTableDescriptor;
68  import org.apache.hadoop.hbase.KeyValue;
69  import org.apache.hadoop.hbase.RemoteExceptionHandler;
70  import org.apache.hadoop.hbase.regionserver.HRegion;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.ClassSize;
73  import org.apache.hadoop.hbase.util.FSUtils;
74  import org.apache.hadoop.hbase.util.Threads;
75  import org.apache.hadoop.io.Writable;
76  
77  import com.google.common.util.concurrent.NamingThreadFactory;
78  
79  /**
80   * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
81   * implementation.
82   *
83   * It performs logfile-rolling, so external callers are not aware that the
84   * underlying file is being rolled.
85   *
86   * <p>
87   * There is one HLog per RegionServer.  All edits for all Regions carried by
88   * a particular RegionServer are entered first in the HLog.
89   *
90   * <p>
91   * Each HRegion is identified by a unique long <code>int</code>. HRegions do
92   * not need to declare themselves before using the HLog; they simply include
93   * their HRegion-id in the <code>append</code> or
94   * <code>completeCacheFlush</code> calls.
95   *
96   * <p>
97   * An HLog consists of multiple on-disk files, which have a chronological order.
98   * As data is flushed to other (better) on-disk structures, the log becomes
99   * obsolete. We can destroy all the log messages for a given HRegion-id up to
100  * the most-recent CACHEFLUSH message from that HRegion.
101  *
102  * <p>
103  * It's only practical to delete entire files. Thus, we delete an entire on-disk
104  * file F when all of the messages in F have a log-sequence-id that's older
105  * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
106  * a message in F.
107  *
108  * <p>
109  * Synchronized methods can never execute in parallel. However, between the
110  * start of a cache flush and the completion point, appends are allowed but log
111  * rolling is not. To prevent log rolling taking place during this period, a
112  * separate reentrant lock is used.
113  *
114  * <p>To read an HLog, call {@link #getReader(org.apache.hadoop.fs.FileSystem,
115  * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
116  *
117  */
118 public class HLog implements Syncable {
119   static final Log LOG = LogFactory.getLog(HLog.class);
120   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
121   static final byte [] METAROW = Bytes.toBytes("METAROW");
122   private final FileSystem fs;
123   private final Path dir;
124   private final Configuration conf;
125   private final LogRollListener listener;
126   private boolean logRollRequested;
127   private final long optionalFlushInterval;
128   private final long blocksize;
129   private final int flushlogentries;
130   private final String prefix;
131   private final Path oldLogDir;
132   private final List<LogActionsListener> actionListeners =
133       Collections.synchronizedList(new ArrayList<LogActionsListener>());
134 
135 
136   private static Class<? extends Writer> logWriterClass;
137   private static Class<? extends Reader> logReaderClass;
138 
139   private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
140   private int initialReplication;    // initial replication factor of SequenceFile.writer
141   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
142   final static Object [] NO_ARGS = new Object []{};
143 
144   /** Name of file that holds recovered edits written by the wal log splitting
145    * code, one per region
146    */
147   public static final String RECOVERED_EDITS = "recovered.edits";
148 
149   // used to indirectly tell syncFs to force the sync
150   private boolean forceSync = false;
151 
152   public interface Reader {
153     void init(FileSystem fs, Path path, Configuration c) throws IOException;
154     void close() throws IOException;
155     Entry next() throws IOException;
156     Entry next(Entry reuse) throws IOException;
157     void seek(long pos) throws IOException;
158     long getPosition() throws IOException;
159   }
160 
161   public interface Writer {
162     void init(FileSystem fs, Path path, Configuration c) throws IOException;
163     void close() throws IOException;
164     void sync() throws IOException;
165     void append(Entry entry) throws IOException;
166     long getLength() throws IOException;
167   }
168 
169   /*
170    * Current log file.
171    */
172   Writer writer;
173 
174   /*
175    * Map of all log files but the current one.
176    */
177   final SortedMap<Long, Path> outputfiles =
178     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
179 
180   /*
181    * Map of regions to first sequence/edit id in their memstore.
182    */
183   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
184     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
185 
186   private volatile boolean closed = false;
187 
188   private final AtomicLong logSeqNum = new AtomicLong(0);
189 
190   // The timestamp (in ms) when the log file was created.
191   private volatile long filenum = -1;
192 
193   //number of transactions in the current Hlog.
194   private final AtomicInteger numEntries = new AtomicInteger(0);
195 
196   // If > than this size, roll the log. This is typically 0.95 times the size
197   // of the default Hdfs block size.
198   private final long logrollsize;
199 
200   // This lock prevents starting a log roll during a cache flush.
201   // synchronized is insufficient because a cache flush spans two method calls.
202   private final Lock cacheFlushLock = new ReentrantLock();
203 
204   // We synchronize on updateLock to prevent updates and to prevent a log roll
205   // during an update
206   // locked during appends
207   private final Object updateLock = new Object();
208 
209   private final boolean enabled;
210 
211   /*
212    * If more than this many logs, force flush of oldest region to oldest edit
213    * goes to disk.  If too many and we crash, then will take forever replaying.
214    * Keep the number of logs tidy.
215    */
216   private final int maxLogs;
217 
218 
219   /**
220    * Pattern used to validate a HLog file name
221    */
222   private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
223 
224   static byte [] COMPLETE_CACHE_FLUSH;
225   static {
226     try {
227       COMPLETE_CACHE_FLUSH =
228         "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
229     } catch (UnsupportedEncodingException e) {
230       assert(false);
231     }
232   }
233 
234   // For measuring latency of writes
235   private static volatile long writeOps;
236   private static volatile long writeTime;
237   // For measuring latency of syncs
238   private static volatile long syncOps;
239   private static volatile long syncTime;
240 
241   public static long getWriteOps() {
242     long ret = writeOps;
243     writeOps = 0;
244     return ret;
245   }
246 
247   public static long getWriteTime() {
248     long ret = writeTime;
249     writeTime = 0;
250     return ret;
251   }
252 
253   public static long getSyncOps() {
254     long ret = syncOps;
255     syncOps = 0;
256     return ret;
257   }
258 
259   public static long getSyncTime() {
260     long ret = syncTime;
261     syncTime = 0;
262     return ret;
263   }
264 
265   /**
266    * HLog creating with a null actions listener.
267    *
268    * @param fs filesystem handle
269    * @param dir path to where hlogs are stored
270    * @param oldLogDir path to where hlogs are archived
271    * @param conf configuration to use
272    * @param listener listerner used to request log rolls
273    * @throws IOException
274    */
275   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
276               final Configuration conf, final LogRollListener listener)
277   throws IOException {
278     this(fs, dir, oldLogDir, conf, listener, null, null);
279   }
280 
281   /**
282    * Create an edit log at the given <code>dir</code> location.
283    *
284    * You should never have to load an existing log. If there is a log at
285    * startup, it should have already been processed and deleted by the time the
286    * HLog object is started up.
287    *
288    * @param fs filesystem handle
289    * @param dir path to where hlogs are stored
290    * @param oldLogDir path to where hlogs are archived
291    * @param conf configuration to use
292    * @param listener listerner used to request log rolls
293    * @param actionListener optional listener for hlog actions like archiving
294    * @param prefix should always be hostname and port in distributed env and
295    *        it will be URL encoded before being used.
296    *        If prefix is null, "hlog" will be used
297    * @throws IOException
298    */
299   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
300               final Configuration conf, final LogRollListener listener,
301               final LogActionsListener actionListener, final String prefix)
302   throws IOException {
303     super();
304     this.fs = fs;
305     this.dir = dir;
306     this.conf = conf;
307     this.listener = listener;
308     this.flushlogentries =
309       conf.getInt("hbase.regionserver.flushlogentries", 1);
310     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
311       this.fs.getDefaultBlockSize());
312     // Roll at 95% of block size.
313     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
314     this.logrollsize = (long)(this.blocksize * multi);
315     this.optionalFlushInterval =
316       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
317     if (fs.exists(dir)) {
318       throw new IOException("Target HLog directory already exists: " + dir);
319     }
320     fs.mkdirs(dir);
321     this.oldLogDir = oldLogDir;
322     if (!fs.exists(oldLogDir)) {
323       fs.mkdirs(this.oldLogDir);
324     }
325     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
326     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
327     LOG.info("HLog configuration: blocksize=" + this.blocksize +
328       ", rollsize=" + this.logrollsize +
329       ", enabled=" + this.enabled +
330       ", flushlogentries=" + this.flushlogentries +
331       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
332     if (actionListener != null) {
333       addLogActionsListerner(actionListener);
334     }
335     // If prefix is null||empty then just name it hlog
336     this.prefix = prefix == null || prefix.isEmpty() ?
337         "hlog" : URLEncoder.encode(prefix, "UTF8");
338     // rollWriter sets this.hdfs_out if it can.
339     rollWriter();
340 
341     // handle the reflection necessary to call getNumCurrentReplicas()
342     this.getNumCurrentReplicas = null;
343     if(this.hdfs_out != null) {
344       try {
345         this.getNumCurrentReplicas = this.hdfs_out.getClass().
346           getMethod("getNumCurrentReplicas", new Class<?> []{});
347         this.getNumCurrentReplicas.setAccessible(true);
348       } catch (NoSuchMethodException e) {
349         // Thrown if getNumCurrentReplicas() function isn't available
350       } catch (SecurityException e) {
351         // Thrown if we can't get access to getNumCurrentReplicas()
352         this.getNumCurrentReplicas = null; // could happen on setAccessible()
353       }
354     }
355     if(this.getNumCurrentReplicas != null) {
356       LOG.info("Using getNumCurrentReplicas--HDFS-826");
357     } else {
358       LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
359     }
360   }
361 
362   /**
363    * @return Current state of the monotonically increasing file id.
364    */
365   public long getFilenum() {
366     return this.filenum;
367   }
368 
369   /**
370    * Called by HRegionServer when it opens a new region to ensure that log
371    * sequence numbers are always greater than the latest sequence number of the
372    * region being brought on-line.
373    *
374    * @param newvalue We'll set log edit/sequence number to this value if it
375    * is greater than the current value.
376    */
377   public void setSequenceNumber(final long newvalue) {
378     for (long id = this.logSeqNum.get(); id < newvalue &&
379         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
380       // This could spin on occasion but better the occasional spin than locking
381       // every increment of sequence number.
382       LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
383     }
384   }
385 
386   /**
387    * @return log sequence number
388    */
389   public long getSequenceNumber() {
390     return logSeqNum.get();
391   }
392 
393   // usage: see TestLogRolling.java
394   OutputStream getOutputStream() {
395     return this.hdfs_out;
396   }
397 
398   /**
399    * Roll the log writer. That is, start writing log messages to a new file.
400    *
401    * Because a log cannot be rolled during a cache flush, and a cache flush
402    * spans two method calls, a special lock needs to be obtained so that a cache
403    * flush cannot start when the log is being rolled and the log cannot be
404    * rolled during a cache flush.
405    *
406    * <p>Note that this method cannot be synchronized because it is possible that
407    * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
408    * start which would obtain the lock on this but block on obtaining the
409    * cacheFlushLock and then completeCacheFlush could be called which would wait
410    * for the lock on this and consequently never release the cacheFlushLock
411    *
412    * @return If lots of logs, flush the returned regions so next time through
413    * we can clean logs. Returns null if nothing to flush.
414    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
415    * @throws IOException
416    */
417   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
418     // Return if nothing to flush.
419     if (this.writer != null && this.numEntries.get() <= 0) {
420       return null;
421     }
422     byte [][] regionsToFlush = null;
423     this.cacheFlushLock.lock();
424     try {
425       if (closed) {
426         return regionsToFlush;
427       }
428       synchronized (updateLock) {
429         // Clean up current writer.
430         Path oldFile = cleanupCurrentWriter(this.filenum);
431         this.filenum = System.currentTimeMillis();
432         Path newPath = computeFilename();
433         this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
434         this.initialReplication = fs.getFileStatus(newPath).getReplication();
435 
436         // Can we get at the dfsclient outputstream?  If an instance of
437         // SFLW, it'll have done the necessary reflection to get at the
438         // protected field name.
439         this.hdfs_out = null;
440         if (this.writer instanceof SequenceFileLogWriter) {
441           this.hdfs_out =
442             ((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
443         }
444 
445         LOG.info((oldFile != null?
446             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
447             this.numEntries.get() +
448             ", filesize=" +
449             this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
450           "New hlog " + FSUtils.getPath(newPath));
451         // Tell our listeners that a new log was created
452         if (!this.actionListeners.isEmpty()) {
453           for (LogActionsListener list : this.actionListeners) {
454             list.logRolled(newPath);
455           }
456         }
457         // Can we delete any of the old log files?
458         if (this.outputfiles.size() > 0) {
459           if (this.lastSeqWritten.size() <= 0) {
460             LOG.debug("Last sequence written is empty. Deleting all old hlogs");
461             // If so, then no new writes have come in since all regions were
462             // flushed (and removed from the lastSeqWritten map). Means can
463             // remove all but currently open log file.
464             for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
465               archiveLogFile(e.getValue(), e.getKey());
466             }
467             this.outputfiles.clear();
468           } else {
469             regionsToFlush = cleanOldLogs();
470           }
471         }
472         this.numEntries.set(0);
473         this.logRollRequested = false;
474       }
475     } finally {
476       this.cacheFlushLock.unlock();
477     }
478     return regionsToFlush;
479   }
480 
481   /**
482    * Get a reader for the WAL.
483    * @param fs
484    * @param path
485    * @param conf
486    * @return A WAL reader.  Close when done with it.
487    * @throws IOException
488    */
489   public static Reader getReader(final FileSystem fs,
490     final Path path, Configuration conf)
491   throws IOException {
492     try {
493       if (logReaderClass == null) {
494         logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
495                 SequenceFileLogReader.class, Reader.class);
496       }
497 
498       HLog.Reader reader = logReaderClass.newInstance();
499       reader.init(fs, path, conf);
500       return reader;
501     } catch (IOException e) {
502       throw e;
503     }
504     catch (Exception e) {
505       throw new IOException("Cannot get log reader", e);
506     }
507   }
508 
509   /**
510    * Get a writer for the WAL.
511    * @param path
512    * @param conf
513    * @return A WAL writer.  Close when done with it.
514    * @throws IOException
515    */
516   public static Writer createWriter(final FileSystem fs,
517       final Path path, Configuration conf)
518   throws IOException {
519     try {
520       if (logWriterClass == null) {
521         logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
522                 SequenceFileLogWriter.class, Writer.class);
523       }
524       HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
525       writer.init(fs, path, conf);
526       return writer;
527     } catch (Exception e) {
528       IOException ie = new IOException("cannot get log writer");
529       ie.initCause(e);
530       throw ie;
531     }
532   }
533 
534   /*
535    * Clean up old commit logs.
536    * @return If lots of logs, flush the returned region so next time through
537    * we can clean logs. Returns null if nothing to flush.
538    * @throws IOException
539    */
540   private byte [][] cleanOldLogs() throws IOException {
541     Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
542     // Get the set of all log files whose final ID is older than or
543     // equal to the oldest pending region operation
544     TreeSet<Long> sequenceNumbers =
545       new TreeSet<Long>(this.outputfiles.headMap(
546         (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
547     // Now remove old log files (if any)
548     int logsToRemove = sequenceNumbers.size();
549     if (logsToRemove > 0) {
550       if (LOG.isDebugEnabled()) {
551         // Find associated region; helps debugging.
552         byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
553         LOG.debug("Found " + logsToRemove + " hlogs to remove " +
554           " out of total " + this.outputfiles.size() + "; " +
555           "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
556           " from region " + Bytes.toString(oldestRegion));
557       }
558       for (Long seq : sequenceNumbers) {
559         archiveLogFile(this.outputfiles.remove(seq), seq);
560       }
561     }
562 
563     // If too many log files, figure which regions we need to flush.
564     byte [][] regions = null;
565     int logCount = this.outputfiles.size() - logsToRemove;
566     if (logCount > this.maxLogs && this.outputfiles != null &&
567         this.outputfiles.size() > 0) {
568       regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(),
569         this.lastSeqWritten);
570       StringBuilder sb = new StringBuilder();
571       for (int i = 0; i < regions.length; i++) {
572         if (i > 0) sb.append(", ");
573         sb.append(Bytes.toStringBinary(regions[i]));
574       }
575       LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
576         this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
577         sb.toString());
578     }
579     return regions;
580   }
581 
582   /**
583    * Return regions (memstores) that have edits that are less than the passed
584    * <code>oldestWALseqid</code>.
585    * @param oldestWALseqid
586    * @param regionsToSeqids
587    * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
588    * necessarily in order).  Null if no regions found.
589    */
590   static byte [][] findMemstoresWithEditsOlderThan(final long oldestWALseqid,
591       final Map<byte [], Long> regionsToSeqids) {
592     //  This method is static so it can be unit tested the easier.
593     List<byte []> regions = null;
594     for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
595       if (e.getValue().longValue() < oldestWALseqid) {
596         if (regions == null) regions = new ArrayList<byte []>();
597         regions.add(e.getKey());
598       }
599     }
600     return regions == null?
601       null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
602   }
603 
604   /*
605    * @return Logs older than this id are safe to remove.
606    */
607   private Long getOldestOutstandingSeqNum() {
608     return Collections.min(this.lastSeqWritten.values());
609   }
610 
611   private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
612     byte [] oldestRegion = null;
613     for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
614       if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
615         oldestRegion = e.getKey();
616         break;
617       }
618     }
619     return oldestRegion;
620   }
621 
622   /*
623    * Cleans up current writer closing and adding to outputfiles.
624    * Presumes we're operating inside an updateLock scope.
625    * @return Path to current writer or null if none.
626    * @throws IOException
627    */
628   private Path cleanupCurrentWriter(final long currentfilenum)
629   throws IOException {
630     Path oldFile = null;
631     if (this.writer != null) {
632       // Close the current writer, get a new one.
633       try {
634         this.writer.close();
635       } catch (IOException e) {
636         // Failed close of log file.  Means we're losing edits.  For now,
637         // shut ourselves down to minimize loss.  Alternative is to try and
638         // keep going.  See HBASE-930.
639         FailedLogCloseException flce =
640           new FailedLogCloseException("#" + currentfilenum);
641         flce.initCause(e);
642         throw e;
643       }
644       if (currentfilenum >= 0) {
645         oldFile = computeFilename();
646         this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
647       }
648     }
649     return oldFile;
650   }
651 
652   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
653     Path newPath = getHLogArchivePath(this.oldLogDir, p);
654     LOG.info("moving old hlog file " + FSUtils.getPath(p) +
655       " whose highest sequence/edit id is " + seqno + " to " +
656       FSUtils.getPath(newPath));
657     this.fs.rename(p, newPath);
658     if (!this.actionListeners.isEmpty()) {
659       for (LogActionsListener list : this.actionListeners) {
660         list.logArchived(p, newPath);
661       }
662     }
663   }
664 
665   /**
666    * This is a convenience method that computes a new filename with a given
667    * file-number.
668    * @return Path
669    */
670   protected Path computeFilename() {
671     if (filenum < 0) {
672       throw new RuntimeException("hlog file number can't be < 0");
673     }
674     return new Path(dir, prefix + "." + filenum);
675   }
676 
677   /**
678    * Shut down the log and delete the log directory
679    *
680    * @throws IOException
681    */
682   public void closeAndDelete() throws IOException {
683     close();
684     FileStatus[] files = fs.listStatus(this.dir);
685     for(FileStatus file : files) {
686       fs.rename(file.getPath(),
687           getHLogArchivePath(this.oldLogDir, file.getPath()));
688     }
689     LOG.debug("Moved " + files.length + " log files to " +
690         FSUtils.getPath(this.oldLogDir));
691     fs.delete(dir, true);
692   }
693 
694   /**
695    * Shut down the log.
696    *
697    * @throws IOException
698    */
699   public void close() throws IOException {
700     cacheFlushLock.lock();
701     try {
702       synchronized (updateLock) {
703         this.closed = true;
704         if (LOG.isDebugEnabled()) {
705           LOG.debug("closing hlog writer in " + this.dir.toString());
706         }
707         this.writer.close();
708       }
709     } finally {
710       cacheFlushLock.unlock();
711     }
712   }
713 
714    /** Append an entry to the log.
715    *
716    * @param regionInfo
717    * @param logEdit
718    * @param now Time of this edit write.
719    * @throws IOException
720    */
721   public void append(HRegionInfo regionInfo, WALEdit logEdit,
722     final long now,
723     final boolean isMetaRegion)
724   throws IOException {
725     byte [] regionName = regionInfo.getRegionName();
726     byte [] tableName = regionInfo.getTableDesc().getName();
727     this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
728   }
729 
730   /**
731    * @param now
732    * @param regionName
733    * @param tableName
734    * @return New log key.
735    */
736   protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, long now) {
737     return new HLogKey(regionName, tableName, seqnum, now);
738   }
739 
740 
741 
742   /** Append an entry to the log.
743    *
744    * @param regionInfo
745    * @param logEdit
746    * @param logKey
747    * @throws IOException
748    */
749   public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
750   throws IOException {
751     if (this.closed) {
752       throw new IOException("Cannot append; log is closed");
753     }
754     byte [] regionName = regionInfo.getRegionName();
755     synchronized (updateLock) {
756       long seqNum = obtainSeqNum();
757       logKey.setLogSeqNum(seqNum);
758       // The 'lastSeqWritten' map holds the sequence number of the oldest
759       // write for each region (i.e. the first edit added to the particular
760       // memstore). When the cache is flushed, the entry for the
761       // region being flushed is removed if the sequence number of the flush
762       // is greater than or equal to the value in lastSeqWritten.
763       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
764       doWrite(regionInfo, logKey, logEdit);
765       this.numEntries.incrementAndGet();
766     }
767 
768     // sync txn to file system
769     this.sync(regionInfo.isMetaRegion());
770   }
771 
772   /**
773    * Append a set of edits to the log. Log edits are keyed by regionName,
774    * rowname, and log-sequence-id.
775    *
776    * Later, if we sort by these keys, we obtain all the relevant edits for a
777    * given key-range of the HRegion (TODO). Any edits that do not have a
778    * matching COMPLETE_CACHEFLUSH message can be discarded.
779    *
780    * <p>
781    * Logs cannot be restarted once closed, or once the HLog process dies. Each
782    * time the HLog starts, it must create a new log. This means that other
783    * systems should process the log appropriately upon each startup (and prior
784    * to initializing HLog).
785    *
786    * synchronized prevents appends during the completion of a cache flush or for
787    * the duration of a log roll.
788    *
789    * @param info
790    * @param tableName
791    * @param edits
792    * @param now
793    * @throws IOException
794    */
795   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
796     final long now)
797   throws IOException {
798     if (edits.isEmpty()) return;
799     
800     byte[] regionName = info.getRegionName();
801     if (this.closed) {
802       throw new IOException("Cannot append; log is closed");
803     }
804     synchronized (this.updateLock) {
805       long seqNum = obtainSeqNum();
806       // The 'lastSeqWritten' map holds the sequence number of the oldest
807       // write for each region (i.e. the first edit added to the particular
808       // memstore). . When the cache is flushed, the entry for the
809       // region being flushed is removed if the sequence number of the flush
810       // is greater than or equal to the value in lastSeqWritten.
811       this.lastSeqWritten.putIfAbsent(regionName, seqNum);
812       HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
813       doWrite(info, logKey, edits);
814       this.numEntries.incrementAndGet();
815     }
816     // sync txn to file system
817     this.sync(info.isMetaRegion());
818   }
819 
820   public void hflush() throws IOException {
821     if (this.closed) {
822       return;
823     }
824     try {
825       long now = System.currentTimeMillis();
826       this.writer.sync();
827       synchronized (this) {
828         this.syncTime += System.currentTimeMillis() - now;
829         this.syncOps++;
830         this.forceSync = false;
831       }
832       synchronized (this.updateLock) {
833         if (!logRollRequested) {
834           checkLowReplication();
835           if (this.writer.getLength() > this.logrollsize) {
836             requestLogRoll();
837           }
838         }
839       }
840 
841     } catch (IOException e) {
842       LOG.fatal("Could not append. Requesting close of hlog", e);
843       requestLogRoll();
844       throw e;
845     }
846   }
847 
848   private synchronized void checkLowReplication() {
849     // if the number of replicas in HDFS has fallen below the initial   
850     // value, then roll logs.   
851     try {
852       int numCurrentReplicas = getLogReplication();
853       if (numCurrentReplicas != 0 &&
854           numCurrentReplicas < this.initialReplication) {
855         LOG.warn("HDFS pipeline error detected. " +
856             "Found " + numCurrentReplicas + " replicas but expecting " +
857             this.initialReplication + " replicas. " +
858             " Requesting close of hlog.");
859         requestLogRoll();
860         logRollRequested = true;
861       }
862     } catch (Exception e) {
863        LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
864                 " still proceeding ahead...");
865     }
866   }
867 
868   /**
869    * This method gets the datanode replication count for the current HLog.
870    *
871    * If the pipeline isn't started yet or is empty, you will get the default
872    * replication factor.  Therefore, if this function returns 0, it means you
873    * are not properly running with the HDFS-826 patch.
874    *
875    * @throws Exception
876    */
877   int getLogReplication() throws Exception {
878     if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
879       Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
880       if (repl instanceof Integer) {
881         return ((Integer)repl).intValue();
882       }
883     }
884     return 0;
885   }
886 
887   boolean canGetCurReplicas() {
888     return this.getNumCurrentReplicas != null;
889   }
890 
891   public void hsync() throws IOException {
892     // Not yet implemented up in hdfs so just call hflush.
893     hflush();
894   }
895 
896   private synchronized void requestLogRoll() {
897     if (this.listener != null && !logRollRequested) {
898       this.listener.logRollRequested();
899     }
900   }
901 
902   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
903   throws IOException {
904     if (!this.enabled) {
905       return;
906     }
907     try {
908       long now = System.currentTimeMillis();
909       this.writer.append(new HLog.Entry(logKey, logEdit));
910       long took = System.currentTimeMillis() - now;
911       writeTime += took;
912       writeOps++;
913       if (took > 1000) {
914         LOG.warn(Thread.currentThread().getName() + " took " + took +
915           "ms appending an edit to hlog; editcount=" + this.numEntries.get());
916       }
917     } catch (IOException e) {
918       LOG.fatal("Could not append. Requesting close of hlog", e);
919       requestLogRoll();
920       throw e;
921     }
922   }
923 
924   /** @return How many items have been added to the log */
925   int getNumEntries() {
926     return numEntries.get();
927   }
928 
929   /**
930    * Obtain a log sequence number.
931    */
932   private long obtainSeqNum() {
933     return this.logSeqNum.incrementAndGet();
934   }
935 
936   /** @return the number of log files in use */
937   int getNumLogFiles() {
938     return outputfiles.size();
939   }
940 
941   /**
942    * By acquiring a log sequence ID, we can allow log messages to continue while
943    * we flush the cache.
944    *
945    * Acquire a lock so that we do not roll the log between the start and
946    * completion of a cache-flush. Otherwise the log-seq-id for the flush will
947    * not appear in the correct logfile.
948    *
949    * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)}
950    * (byte[], byte[], long)}
951    * @see #completeCacheFlush(byte[], byte[], long, boolean)
952    * @see #abortCacheFlush()
953    */
954   public long startCacheFlush() {
955     this.cacheFlushLock.lock();
956     return obtainSeqNum();
957   }
958 
959   /**
960    * Complete the cache flush
961    *
962    * Protected by cacheFlushLock
963    *
964    * @param regionName
965    * @param tableName
966    * @param logSeqId
967    * @throws IOException
968    */
969   public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
970     final long logSeqId,
971     final boolean isMetaRegion)
972   throws IOException {
973     try {
974       if (this.closed) {
975         return;
976       }
977       synchronized (updateLock) {
978         long now = System.currentTimeMillis();
979         WALEdit edit = completeCacheFlushLogEdit();
980         HLogKey key = makeKey(regionName, tableName, logSeqId,
981             System.currentTimeMillis());
982         this.writer.append(new Entry(key, edit));
983         writeTime += System.currentTimeMillis() - now;
984         writeOps++;
985         this.numEntries.incrementAndGet();
986         Long seq = this.lastSeqWritten.get(regionName);
987         if (seq != null && logSeqId >= seq.longValue()) {
988           this.lastSeqWritten.remove(regionName);
989         }
990       }
991       // sync txn to file system
992       this.sync(isMetaRegion);
993 
994     } finally {
995       this.cacheFlushLock.unlock();
996     }
997   }
998 
999   public void sync() throws IOException {
1000     sync(false);
1001   }
1002 
1003   public void sync(boolean force) throws IOException {
1004     hflush();
1005   }
1006 
1007 
1008   private WALEdit completeCacheFlushLogEdit() {
1009     KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
1010       System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
1011     WALEdit e = new WALEdit();
1012     e.add(kv);
1013     return e;
1014   }
1015 
1016   /**
1017    * Abort a cache flush.
1018    * Call if the flush fails. Note that the only recovery for an aborted flush
1019    * currently is a restart of the regionserver so the snapshot content dropped
1020    * by the failure gets restored to the memstore.
1021    */
1022   public void abortCacheFlush() {
1023     this.cacheFlushLock.unlock();
1024   }
1025 
1026   /**
1027    * @param family
1028    * @return true if the column is a meta column
1029    */
1030   public static boolean isMetaFamily(byte [] family) {
1031     return Bytes.equals(METAFAMILY, family);
1032   }
1033 
1034   /**
1035    * Split up a bunch of regionserver commit log files that are no longer
1036    * being written to, into new files, one per region for region to replay on
1037    * startup. Delete the old log files when finished.
1038    *
1039    * @param rootDir qualified root directory of the HBase instance
1040    * @param srcDir Directory of log files to split: e.g.
1041    *                <code>${ROOTDIR}/log_HOST_PORT</code>
1042    * @param oldLogDir directory where processed (split) logs will be archived to
1043    * @param fs FileSystem
1044    * @param conf Configuration
1045    * @throws IOException will throw if corrupted hlogs aren't tolerated
1046    * @return the list of splits
1047    */
1048   public static List<Path> splitLog(final Path rootDir, final Path srcDir,
1049     Path oldLogDir, final FileSystem fs, final Configuration conf)
1050   throws IOException {
1051 
1052     long millis = System.currentTimeMillis();
1053     List<Path> splits = null;
1054     if (!fs.exists(srcDir)) {
1055       // Nothing to do
1056       return splits;
1057     }
1058     FileStatus [] logfiles = fs.listStatus(srcDir);
1059     if (logfiles == null || logfiles.length == 0) {
1060       // Nothing to do
1061       return splits;
1062     }
1063     LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
1064       srcDir.toString());
1065     splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
1066     try {
1067       LOG.info("Spliting is done. Removing old log dir "+srcDir);
1068       fs.delete(srcDir, false);
1069     } catch (IOException e) {
1070       e = RemoteExceptionHandler.checkIOException(e);
1071       IOException io = new IOException("Cannot delete: " + srcDir);
1072       io.initCause(e);
1073       throw io;
1074     }
1075     long endMillis = System.currentTimeMillis();
1076     LOG.info("hlog file splitting completed in " + (endMillis - millis) +
1077         " millis for " + srcDir.toString());
1078     return splits;
1079   }
1080 
1081   // Private immutable datastructure to hold Writer and its Path.
1082   private final static class WriterAndPath {
1083     final Path p;
1084     final Writer w;
1085     WriterAndPath(final Path p, final Writer w) {
1086       this.p = p;
1087       this.w = w;
1088     }
1089   }
1090 
1091   @SuppressWarnings("unchecked")
1092   public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
1093      return (Class<? extends HLogKey>)
1094        conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
1095   }
1096 
1097   public static HLogKey newKey(Configuration conf) throws IOException {
1098     Class<? extends HLogKey> keyClass = getKeyClass(conf);
1099     try {
1100       return keyClass.newInstance();
1101     } catch (InstantiationException e) {
1102       throw new IOException("cannot create hlog key");
1103     } catch (IllegalAccessException e) {
1104       throw new IOException("cannot create hlog key");
1105     }
1106   }
1107 
1108   /**
1109    * Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions)
1110    * by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size)
1111    *
1112    * A batch consists of a set of log files that will be sorted in a single map of edits indexed by region
1113    * the resulting map will be concurrently written by multiple threads to their corresponding regions
1114    *
1115    * Each batch consists of more more log files that are
1116    *  - recovered (files is opened for append then closed to ensure no process is writing into it)
1117    *  - parsed (each edit in the log is appended to a list of edits indexed by region
1118    *    see {@link #parseHLog} for more details)
1119    *  - marked as either processed or corrupt depending on parsing outcome
1120    *  - the resulting edits indexed by region are concurrently written to their corresponding region
1121    *    region directories
1122    *  - original files are then archived to a different directory
1123    *
1124    *
1125    *
1126    * @param rootDir  hbase directory
1127    * @param srcDir   logs directory
1128    * @param oldLogDir directory where processed logs are archived to
1129    * @param logfiles the list of log files to split
1130    * @param fs
1131    * @param conf
1132    * @return
1133    * @throws IOException
1134    */
1135   private static List<Path> splitLog(final Path rootDir, final Path srcDir,
1136     Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
1137     final Configuration conf)
1138   throws IOException {
1139     List<Path> processedLogs = new ArrayList<Path>();
1140     List<Path> corruptedLogs = new ArrayList<Path>();
1141     final Map<byte [], WriterAndPath> logWriters =
1142       Collections.synchronizedMap(
1143         new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
1144     List<Path> splits = null;
1145 
1146     // Number of logs in a read batch
1147     // More means faster but bigger mem consumption
1148     //TODO make a note on the conf rename and update hbase-site.xml if needed
1149     int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
1150      boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
1151 
1152 
1153     try {
1154       int i = -1;
1155       while (i < logfiles.length) {
1156         final Map<byte[], LinkedList<Entry>> editsByRegion =
1157           new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
1158         for (int j = 0; j < logFilesPerStep; j++) {
1159           i++;
1160           if (i == logfiles.length) {
1161             break;
1162           }
1163           FileStatus log = logfiles[i];
1164           Path logPath = log.getPath();
1165           long logLength = log.getLen();
1166           LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
1167             ": " + logPath + ", length=" + logLength );
1168           try {
1169             recoverFileLease(fs, logPath, conf);
1170             parseHLog(log, editsByRegion, fs, conf);
1171             processedLogs.add(logPath);
1172            } catch (IOException e) {
1173              if (skipErrors) {
1174                LOG.warn("Got while parsing hlog " + logPath +
1175                  ". Marking as corrupted", e);
1176                corruptedLogs.add(logPath);
1177              } else {
1178                throw e;
1179              }
1180           }
1181         }
1182         writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
1183       }
1184       if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
1185         throw new IOException("Discovered orphan hlog after split. Maybe " +
1186           "HRegionServer was not dead when we started");
1187       }
1188       archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
1189     } finally {
1190       splits = new ArrayList<Path>(logWriters.size());
1191       for (WriterAndPath wap : logWriters.values()) {
1192         wap.w.close();
1193         splits.add(wap.p);
1194         LOG.debug("Closed " + wap.p);
1195       }
1196     }
1197     return splits;
1198   }
1199 
1200 
1201   /**
1202    * Utility class that lets us keep track of the edit with it's key
1203    * Only used when splitting logs
1204    */
1205   public static class Entry implements Writable {
1206     private WALEdit edit;
1207     private HLogKey key;
1208 
1209     public Entry() {
1210       edit = new WALEdit();
1211       key = new HLogKey();
1212     }
1213 
1214     /**
1215      * Constructor for both params
1216      * @param edit log's edit
1217      * @param key log's key
1218      */
1219     public Entry(HLogKey key, WALEdit edit) {
1220       super();
1221       this.key = key;
1222       this.edit = edit;
1223     }
1224     /**
1225      * Gets the edit
1226      * @return edit
1227      */
1228     public WALEdit getEdit() {
1229       return edit;
1230     }
1231     /**
1232      * Gets the key
1233      * @return key
1234      */
1235     public HLogKey getKey() {
1236       return key;
1237     }
1238 
1239     @Override
1240     public String toString() {
1241       return this.key + "=" + this.edit;
1242     }
1243 
1244     @Override
1245     public void write(DataOutput dataOutput) throws IOException {
1246       this.key.write(dataOutput);
1247       this.edit.write(dataOutput);
1248     }
1249 
1250     @Override
1251     public void readFields(DataInput dataInput) throws IOException {
1252       this.key.readFields(dataInput);
1253       this.edit.readFields(dataInput);
1254     }
1255   }
1256 
1257   /**
1258    * Construct the HLog directory name
1259    *
1260    * @param info HServerInfo for server
1261    * @return the HLog directory name
1262    */
1263   public static String getHLogDirectoryName(HServerInfo info) {
1264     return getHLogDirectoryName(info.getServerName());
1265   }
1266 
1267   /**
1268    * Construct the HLog directory name
1269    *
1270    * @param serverAddress
1271    * @param startCode
1272    * @return the HLog directory name
1273    */
1274   public static String getHLogDirectoryName(String serverAddress,
1275       long startCode) {
1276     if (serverAddress == null || serverAddress.length() == 0) {
1277       return null;
1278     }
1279     return getHLogDirectoryName(
1280         HServerInfo.getServerName(serverAddress, startCode));
1281   }
1282 
1283   /**
1284    * Construct the HLog directory name
1285    *
1286    * @param serverName
1287    * @return the HLog directory name
1288    */
1289   public static String getHLogDirectoryName(String serverName) {
1290     StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
1291     dirName.append("/");
1292     dirName.append(serverName);
1293     return dirName.toString();
1294   }
1295 
1296   public static boolean validateHLogFilename(String filename) {
1297     return pattern.matcher(filename).matches();
1298   }
1299 
1300   private static Path getHLogArchivePath(Path oldLogDir, Path p) {
1301     return new Path(oldLogDir, p.getName());
1302   }
1303 
1304   /**
1305    * Takes splitLogsMap and concurrently writes them to region directories using a thread pool
1306    *
1307    * @param splitLogsMap map that contains the log splitting result indexed by region
1308    * @param logWriters map that contains a writer per region
1309    * @param rootDir hbase root dir
1310    * @param fs
1311    * @param conf
1312    * @throws IOException
1313    */
1314   private static void writeEditsBatchToRegions(
1315     final Map<byte[], LinkedList<Entry>> splitLogsMap,
1316     final Map<byte[], WriterAndPath> logWriters,
1317     final Path rootDir, final FileSystem fs, final Configuration conf)
1318   throws IOException {
1319     // Number of threads to use when log splitting to rewrite the logs.
1320     // More means faster but bigger mem consumption.
1321     int logWriterThreads =
1322       conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
1323     boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
1324     HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
1325     NamingThreadFactory f  = new NamingThreadFactory(
1326             "SplitWriter-%1$d", Executors.defaultThreadFactory());
1327     ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
1328     for (final byte[] region : splitLogsMap.keySet()) {
1329       Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
1330       writeFutureResult.put(region, threadPool.submit(splitter));
1331     }
1332 
1333     threadPool.shutdown();
1334     // Wait for all threads to terminate
1335     try {
1336       for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
1337         String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds";
1338         if (j < 30) {
1339           LOG.debug(message);
1340         } else {
1341           LOG.info(message);
1342         }
1343 
1344       }
1345     } catch(InterruptedException ex) {
1346       LOG.warn("Hlog writers were interrupted, possible data loss!");
1347       if (!skipErrors) {
1348         throw new IOException("Could not finish writing log entries",  ex);
1349         //TODO  maybe we should fail here regardless if skipErrors is active or not
1350       }
1351     }
1352 
1353     for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
1354       try {
1355         entry.getValue().get();
1356       } catch (ExecutionException e) {
1357         throw (new IOException(e.getCause()));
1358       } catch (InterruptedException e1) {
1359         LOG.warn("Writer for region " +  Bytes.toString(entry.getKey()) +
1360                 " was interrupted, however the write process should have " +
1361                 "finished. Throwing up ", e1);
1362         throw (new IOException(e1.getCause()));
1363       }
1364     }
1365   }
1366 
1367   /*
1368    * Parse a single hlog and put the edits in @splitLogsMap
1369    *
1370    * @param logfile to split
1371    * @param splitLogsMap output parameter: a map with region names as keys and a
1372    * list of edits as values
1373    * @param fs the filesystem
1374    * @param conf the configuration
1375    * @throws IOException if hlog is corrupted, or can't be open
1376    */
1377   private static void parseHLog(final FileStatus logfile,
1378     final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
1379     final Configuration conf)
1380   throws IOException {
1381     // Check for possibly empty file. With appends, currently Hadoop reports a
1382     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
1383     // HDFS-878 is committed.
1384     long length = logfile.getLen();
1385     if (length <= 0) {
1386       LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
1387     }
1388     Path path = logfile.getPath();
1389     Reader in;
1390     int editsCount = 0;
1391     try {
1392       in = HLog.getReader(fs, path, conf);
1393     } catch (EOFException e) {
1394       if (length <= 0) {
1395         //TODO should we ignore an empty, not-last log file if skip.errors is false?
1396         //Either way, the caller should decide what to do. E.g. ignore if this is the last
1397         //log in sequence.
1398         //TODO is this scenario still possible if the log has been recovered (i.e. closed)
1399         LOG.warn("Could not open " + path + " for reading. File is empty" + e);
1400         return;
1401       } else {
1402         throw e;
1403       }
1404     }
1405     try {
1406       Entry entry;
1407       while ((entry = in.next()) != null) {
1408         byte[] region = entry.getKey().getRegionName();
1409         LinkedList<Entry> queue = splitLogsMap.get(region);
1410         if (queue == null) {
1411           queue = new LinkedList<Entry>();
1412           splitLogsMap.put(region, queue);
1413         }
1414         queue.addLast(entry);
1415         editsCount++;
1416       }
1417       LOG.debug("Pushed=" + editsCount + " entries from " + path);
1418     } finally {
1419       try {
1420         if (in != null) {
1421           in.close();
1422         }
1423       } catch (IOException e) {
1424         LOG.warn("Close log reader in finally threw exception -- continuing", e);
1425       }
1426     }
1427   }
1428 
1429   private static Callable<Void> createNewSplitter(final Path rootDir,
1430     final Map<byte[], WriterAndPath> logWriters,
1431     final Map<byte[], LinkedList<Entry>> logEntries,
1432     final byte[] region, final FileSystem fs, final Configuration conf) {
1433     return new Callable<Void>() {
1434       public String getName() {
1435         return "Split writer thread for region " + Bytes.toStringBinary(region);
1436       }
1437 
1438       @Override
1439       public Void call() throws IOException {
1440         LinkedList<Entry> entries = logEntries.get(region);
1441         LOG.debug(this.getName()+" got " + entries.size() + " to process");
1442         long threadTime = System.currentTimeMillis();
1443         try {
1444           int editsCount = 0;
1445           WriterAndPath wap = logWriters.get(region);
1446           for (Entry logEntry: entries) {
1447             if (wap == null) {
1448               Path logFile = getRegionLogPath(logEntry, rootDir);
1449               if (fs.exists(logFile)) {
1450                 LOG.warn("Found existing old hlog file. It could be the result of a previous" +
1451                         "failed split attempt. Deleting " + logFile +
1452                         ", length=" + fs.getFileStatus(logFile).getLen());
1453                 fs.delete(logFile, false);
1454               }
1455               Writer w = createWriter(fs, logFile, conf);
1456               wap = new WriterAndPath(logFile, w);
1457               logWriters.put(region, wap);
1458               LOG.debug("Creating writer path=" + logFile +
1459                 " region=" + Bytes.toStringBinary(region));
1460             }
1461             wap.w.append(logEntry);
1462             editsCount++;
1463           }
1464           LOG.debug(this.getName() + " Applied " + editsCount +
1465             " total edits to " + Bytes.toStringBinary(region) +
1466             " in " + (System.currentTimeMillis() - threadTime) + "ms");
1467         } catch (IOException e) {
1468           e = RemoteExceptionHandler.checkIOException(e);
1469           LOG.fatal(this.getName() + " Got while writing log entry to log", e);
1470           throw e;
1471         }
1472         return null;
1473       }
1474     };
1475   }
1476 
1477   /**
1478    * Moves processed logs to a oldLogDir after successful processing
1479    * Moves corrupted logs (any log that couldn't be successfully parsed
1480    * to corruptDir (.corrupt) for later investigation
1481    *
1482    * @param corruptedLogs
1483    * @param processedLogs
1484    * @param oldLogDir
1485    * @param fs
1486    * @param conf
1487    * @throws IOException
1488    */
1489   private static void archiveLogs(final List<Path> corruptedLogs,
1490     final List<Path> processedLogs, final Path oldLogDir,
1491     final FileSystem fs, final Configuration conf)
1492   throws IOException{
1493     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
1494       conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
1495 
1496     fs.mkdirs(corruptDir);
1497     fs.mkdirs(oldLogDir);
1498 
1499     for (Path corrupted: corruptedLogs) {
1500       Path p = new Path(corruptDir, corrupted.getName());
1501       LOG.info("Moving corrupted log " + corrupted + " to " + p);
1502       fs.rename(corrupted, p);
1503     }
1504 
1505     for (Path p: processedLogs) {
1506       Path newPath = getHLogArchivePath(oldLogDir, p);
1507       fs.rename(p, newPath);
1508       LOG.info("Archived processed log " + p + " to " + newPath);
1509     }
1510   }
1511 
1512   private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
1513     Path tableDir =
1514       HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
1515     Path regionDir =
1516             HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
1517     return new Path(regionDir, RECOVERED_EDITS);
1518    }
1519 
1520 
1521 
1522 
1523 
1524 
1525 
1526 
1527   public void addLogActionsListerner(LogActionsListener list) {
1528     LOG.info("Adding a listener");
1529     this.actionListeners.add(list);
1530   }
1531 
1532   public boolean removeLogActionsListener(LogActionsListener list) {
1533     return this.actionListeners.remove(list);
1534   }
1535 
1536   private static void usage() {
1537     System.err.println("Usage: java org.apache.hbase.HLog" +
1538         " {--dump <logfile>... | --split <logdir>...}");
1539   }
1540 
1541   /**
1542    * Pass one or more log file names and it will either dump out a text version
1543    * on <code>stdout</code> or split the specified log files.
1544    *
1545    * @param args
1546    * @throws IOException
1547    */
1548   public static void main(String[] args) throws IOException {
1549     if (args.length < 2) {
1550       usage();
1551       System.exit(-1);
1552     }
1553     boolean dump = true;
1554     if (args[0].compareTo("--dump") != 0) {
1555       if (args[0].compareTo("--split") == 0) {
1556         dump = false;
1557 
1558       } else {
1559         usage();
1560         System.exit(-1);
1561       }
1562     }
1563     Configuration conf = HBaseConfiguration.create();
1564     FileSystem fs = FileSystem.get(conf);
1565     final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
1566     final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1567     for (int i = 1; i < args.length; i++) {
1568       Path logPath = new Path(args[i]);
1569       if (!fs.exists(logPath)) {
1570         throw new FileNotFoundException(args[i] + " does not exist");
1571       }
1572       if (dump) {
1573         if (!fs.isFile(logPath)) {
1574           throw new IOException(args[i] + " is not a file");
1575         }
1576         Reader log = getReader(fs, logPath, conf);
1577         try {
1578           HLog.Entry entry;
1579           while ((entry = log.next()) != null) {
1580             System.out.println(entry.toString());
1581           }
1582         } finally {
1583           log.close();
1584         }
1585       } else {
1586         if (!fs.getFileStatus(logPath).isDir()) {
1587           throw new IOException(args[i] + " is not a directory");
1588         }
1589         splitLog(baseDir, logPath, oldLogDir, fs, conf);
1590       }
1591     }
1592   }
1593 
1594   public static final long FIXED_OVERHEAD = ClassSize.align(
1595       ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1596       ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1597 
1598 }