1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
23
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.EOFException;
27 import java.io.FileNotFoundException;
28 import java.io.IOException;
29 import java.io.OutputStream;
30 import java.io.UnsupportedEncodingException;
31 import java.lang.reflect.Method;
32 import java.net.URLEncoder;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.SortedMap;
40 import java.util.TreeMap;
41 import java.util.TreeSet;
42 import java.util.concurrent.Callable;
43 import java.util.concurrent.ConcurrentSkipListMap;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.ThreadPoolExecutor;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicLong;
51 import java.util.concurrent.locks.Condition;
52 import java.util.concurrent.locks.Lock;
53 import java.util.concurrent.locks.ReentrantLock;
54 import java.util.regex.Pattern;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import org.apache.hadoop.conf.Configuration;
59 import org.apache.hadoop.fs.FileStatus;
60 import org.apache.hadoop.fs.FileSystem;
61 import org.apache.hadoop.fs.Path;
62 import org.apache.hadoop.fs.Syncable;
63 import org.apache.hadoop.hbase.HBaseConfiguration;
64 import org.apache.hadoop.hbase.HConstants;
65 import org.apache.hadoop.hbase.HRegionInfo;
66 import org.apache.hadoop.hbase.HServerInfo;
67 import org.apache.hadoop.hbase.HTableDescriptor;
68 import org.apache.hadoop.hbase.KeyValue;
69 import org.apache.hadoop.hbase.RemoteExceptionHandler;
70 import org.apache.hadoop.hbase.regionserver.HRegion;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.ClassSize;
73 import org.apache.hadoop.hbase.util.FSUtils;
74 import org.apache.hadoop.hbase.util.Threads;
75 import org.apache.hadoop.io.Writable;
76
77 import com.google.common.util.concurrent.NamingThreadFactory;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public class HLog implements Syncable {
119 static final Log LOG = LogFactory.getLog(HLog.class);
120 public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
121 static final byte [] METAROW = Bytes.toBytes("METAROW");
122 private final FileSystem fs;
123 private final Path dir;
124 private final Configuration conf;
125 private final LogRollListener listener;
126 private boolean logRollRequested;
127 private final long optionalFlushInterval;
128 private final long blocksize;
129 private final int flushlogentries;
130 private final String prefix;
131 private final Path oldLogDir;
132 private final List<LogActionsListener> actionListeners =
133 Collections.synchronizedList(new ArrayList<LogActionsListener>());
134
135
136 private static Class<? extends Writer> logWriterClass;
137 private static Class<? extends Reader> logReaderClass;
138
139 private OutputStream hdfs_out;
140 private int initialReplication;
141 private Method getNumCurrentReplicas;
142 final static Object [] NO_ARGS = new Object []{};
143
144
145
146
147 public static final String RECOVERED_EDITS = "recovered.edits";
148
149
150 private boolean forceSync = false;
151
152 public interface Reader {
153 void init(FileSystem fs, Path path, Configuration c) throws IOException;
154 void close() throws IOException;
155 Entry next() throws IOException;
156 Entry next(Entry reuse) throws IOException;
157 void seek(long pos) throws IOException;
158 long getPosition() throws IOException;
159 }
160
161 public interface Writer {
162 void init(FileSystem fs, Path path, Configuration c) throws IOException;
163 void close() throws IOException;
164 void sync() throws IOException;
165 void append(Entry entry) throws IOException;
166 long getLength() throws IOException;
167 }
168
169
170
171
172 Writer writer;
173
174
175
176
177 final SortedMap<Long, Path> outputfiles =
178 Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
179
180
181
182
183 private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
184 new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
185
186 private volatile boolean closed = false;
187
188 private final AtomicLong logSeqNum = new AtomicLong(0);
189
190
191 private volatile long filenum = -1;
192
193
194 private final AtomicInteger numEntries = new AtomicInteger(0);
195
196
197
198 private final long logrollsize;
199
200
201
202 private final Lock cacheFlushLock = new ReentrantLock();
203
204
205
206
207 private final Object updateLock = new Object();
208
209 private final boolean enabled;
210
211
212
213
214
215
216 private final int maxLogs;
217
218
219
220
221
222 private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
223
224 static byte [] COMPLETE_CACHE_FLUSH;
225 static {
226 try {
227 COMPLETE_CACHE_FLUSH =
228 "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
229 } catch (UnsupportedEncodingException e) {
230 assert(false);
231 }
232 }
233
234
235 private static volatile long writeOps;
236 private static volatile long writeTime;
237
238 private static volatile long syncOps;
239 private static volatile long syncTime;
240
241 public static long getWriteOps() {
242 long ret = writeOps;
243 writeOps = 0;
244 return ret;
245 }
246
247 public static long getWriteTime() {
248 long ret = writeTime;
249 writeTime = 0;
250 return ret;
251 }
252
253 public static long getSyncOps() {
254 long ret = syncOps;
255 syncOps = 0;
256 return ret;
257 }
258
259 public static long getSyncTime() {
260 long ret = syncTime;
261 syncTime = 0;
262 return ret;
263 }
264
265
266
267
268
269
270
271
272
273
274
275 public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
276 final Configuration conf, final LogRollListener listener)
277 throws IOException {
278 this(fs, dir, oldLogDir, conf, listener, null, null);
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299 public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
300 final Configuration conf, final LogRollListener listener,
301 final LogActionsListener actionListener, final String prefix)
302 throws IOException {
303 super();
304 this.fs = fs;
305 this.dir = dir;
306 this.conf = conf;
307 this.listener = listener;
308 this.flushlogentries =
309 conf.getInt("hbase.regionserver.flushlogentries", 1);
310 this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
311 this.fs.getDefaultBlockSize());
312
313 float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
314 this.logrollsize = (long)(this.blocksize * multi);
315 this.optionalFlushInterval =
316 conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
317 if (fs.exists(dir)) {
318 throw new IOException("Target HLog directory already exists: " + dir);
319 }
320 fs.mkdirs(dir);
321 this.oldLogDir = oldLogDir;
322 if (!fs.exists(oldLogDir)) {
323 fs.mkdirs(this.oldLogDir);
324 }
325 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
326 this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
327 LOG.info("HLog configuration: blocksize=" + this.blocksize +
328 ", rollsize=" + this.logrollsize +
329 ", enabled=" + this.enabled +
330 ", flushlogentries=" + this.flushlogentries +
331 ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
332 if (actionListener != null) {
333 addLogActionsListerner(actionListener);
334 }
335
336 this.prefix = prefix == null || prefix.isEmpty() ?
337 "hlog" : URLEncoder.encode(prefix, "UTF8");
338
339 rollWriter();
340
341
342 this.getNumCurrentReplicas = null;
343 if(this.hdfs_out != null) {
344 try {
345 this.getNumCurrentReplicas = this.hdfs_out.getClass().
346 getMethod("getNumCurrentReplicas", new Class<?> []{});
347 this.getNumCurrentReplicas.setAccessible(true);
348 } catch (NoSuchMethodException e) {
349
350 } catch (SecurityException e) {
351
352 this.getNumCurrentReplicas = null;
353 }
354 }
355 if(this.getNumCurrentReplicas != null) {
356 LOG.info("Using getNumCurrentReplicas--HDFS-826");
357 } else {
358 LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
359 }
360 }
361
362
363
364
365 public long getFilenum() {
366 return this.filenum;
367 }
368
369
370
371
372
373
374
375
376
377 public void setSequenceNumber(final long newvalue) {
378 for (long id = this.logSeqNum.get(); id < newvalue &&
379 !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
380
381
382 LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
383 }
384 }
385
386
387
388
389 public long getSequenceNumber() {
390 return logSeqNum.get();
391 }
392
393
394 OutputStream getOutputStream() {
395 return this.hdfs_out;
396 }
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
418
419 if (this.writer != null && this.numEntries.get() <= 0) {
420 return null;
421 }
422 byte [][] regionsToFlush = null;
423 this.cacheFlushLock.lock();
424 try {
425 if (closed) {
426 return regionsToFlush;
427 }
428 synchronized (updateLock) {
429
430 Path oldFile = cleanupCurrentWriter(this.filenum);
431 this.filenum = System.currentTimeMillis();
432 Path newPath = computeFilename();
433 this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
434 this.initialReplication = fs.getFileStatus(newPath).getReplication();
435
436
437
438
439 this.hdfs_out = null;
440 if (this.writer instanceof SequenceFileLogWriter) {
441 this.hdfs_out =
442 ((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
443 }
444
445 LOG.info((oldFile != null?
446 "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
447 this.numEntries.get() +
448 ", filesize=" +
449 this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
450 "New hlog " + FSUtils.getPath(newPath));
451
452 if (!this.actionListeners.isEmpty()) {
453 for (LogActionsListener list : this.actionListeners) {
454 list.logRolled(newPath);
455 }
456 }
457
458 if (this.outputfiles.size() > 0) {
459 if (this.lastSeqWritten.size() <= 0) {
460 LOG.debug("Last sequence written is empty. Deleting all old hlogs");
461
462
463
464 for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
465 archiveLogFile(e.getValue(), e.getKey());
466 }
467 this.outputfiles.clear();
468 } else {
469 regionsToFlush = cleanOldLogs();
470 }
471 }
472 this.numEntries.set(0);
473 this.logRollRequested = false;
474 }
475 } finally {
476 this.cacheFlushLock.unlock();
477 }
478 return regionsToFlush;
479 }
480
481
482
483
484
485
486
487
488
489 public static Reader getReader(final FileSystem fs,
490 final Path path, Configuration conf)
491 throws IOException {
492 try {
493 if (logReaderClass == null) {
494 logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
495 SequenceFileLogReader.class, Reader.class);
496 }
497
498 HLog.Reader reader = logReaderClass.newInstance();
499 reader.init(fs, path, conf);
500 return reader;
501 } catch (IOException e) {
502 throw e;
503 }
504 catch (Exception e) {
505 throw new IOException("Cannot get log reader", e);
506 }
507 }
508
509
510
511
512
513
514
515
516 public static Writer createWriter(final FileSystem fs,
517 final Path path, Configuration conf)
518 throws IOException {
519 try {
520 if (logWriterClass == null) {
521 logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
522 SequenceFileLogWriter.class, Writer.class);
523 }
524 HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
525 writer.init(fs, path, conf);
526 return writer;
527 } catch (Exception e) {
528 IOException ie = new IOException("cannot get log writer");
529 ie.initCause(e);
530 throw ie;
531 }
532 }
533
534
535
536
537
538
539
540 private byte [][] cleanOldLogs() throws IOException {
541 Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
542
543
544 TreeSet<Long> sequenceNumbers =
545 new TreeSet<Long>(this.outputfiles.headMap(
546 (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
547
548 int logsToRemove = sequenceNumbers.size();
549 if (logsToRemove > 0) {
550 if (LOG.isDebugEnabled()) {
551
552 byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
553 LOG.debug("Found " + logsToRemove + " hlogs to remove " +
554 " out of total " + this.outputfiles.size() + "; " +
555 "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
556 " from region " + Bytes.toString(oldestRegion));
557 }
558 for (Long seq : sequenceNumbers) {
559 archiveLogFile(this.outputfiles.remove(seq), seq);
560 }
561 }
562
563
564 byte [][] regions = null;
565 int logCount = this.outputfiles.size() - logsToRemove;
566 if (logCount > this.maxLogs && this.outputfiles != null &&
567 this.outputfiles.size() > 0) {
568 regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(),
569 this.lastSeqWritten);
570 StringBuilder sb = new StringBuilder();
571 for (int i = 0; i < regions.length; i++) {
572 if (i > 0) sb.append(", ");
573 sb.append(Bytes.toStringBinary(regions[i]));
574 }
575 LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
576 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
577 sb.toString());
578 }
579 return regions;
580 }
581
582
583
584
585
586
587
588
589
590 static byte [][] findMemstoresWithEditsOlderThan(final long oldestWALseqid,
591 final Map<byte [], Long> regionsToSeqids) {
592
593 List<byte []> regions = null;
594 for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
595 if (e.getValue().longValue() < oldestWALseqid) {
596 if (regions == null) regions = new ArrayList<byte []>();
597 regions.add(e.getKey());
598 }
599 }
600 return regions == null?
601 null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
602 }
603
604
605
606
607 private Long getOldestOutstandingSeqNum() {
608 return Collections.min(this.lastSeqWritten.values());
609 }
610
611 private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
612 byte [] oldestRegion = null;
613 for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
614 if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
615 oldestRegion = e.getKey();
616 break;
617 }
618 }
619 return oldestRegion;
620 }
621
622
623
624
625
626
627
628 private Path cleanupCurrentWriter(final long currentfilenum)
629 throws IOException {
630 Path oldFile = null;
631 if (this.writer != null) {
632
633 try {
634 this.writer.close();
635 } catch (IOException e) {
636
637
638
639 FailedLogCloseException flce =
640 new FailedLogCloseException("#" + currentfilenum);
641 flce.initCause(e);
642 throw e;
643 }
644 if (currentfilenum >= 0) {
645 oldFile = computeFilename();
646 this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
647 }
648 }
649 return oldFile;
650 }
651
652 private void archiveLogFile(final Path p, final Long seqno) throws IOException {
653 Path newPath = getHLogArchivePath(this.oldLogDir, p);
654 LOG.info("moving old hlog file " + FSUtils.getPath(p) +
655 " whose highest sequence/edit id is " + seqno + " to " +
656 FSUtils.getPath(newPath));
657 this.fs.rename(p, newPath);
658 if (!this.actionListeners.isEmpty()) {
659 for (LogActionsListener list : this.actionListeners) {
660 list.logArchived(p, newPath);
661 }
662 }
663 }
664
665
666
667
668
669
670 protected Path computeFilename() {
671 if (filenum < 0) {
672 throw new RuntimeException("hlog file number can't be < 0");
673 }
674 return new Path(dir, prefix + "." + filenum);
675 }
676
677
678
679
680
681
682 public void closeAndDelete() throws IOException {
683 close();
684 FileStatus[] files = fs.listStatus(this.dir);
685 for(FileStatus file : files) {
686 fs.rename(file.getPath(),
687 getHLogArchivePath(this.oldLogDir, file.getPath()));
688 }
689 LOG.debug("Moved " + files.length + " log files to " +
690 FSUtils.getPath(this.oldLogDir));
691 fs.delete(dir, true);
692 }
693
694
695
696
697
698
699 public void close() throws IOException {
700 cacheFlushLock.lock();
701 try {
702 synchronized (updateLock) {
703 this.closed = true;
704 if (LOG.isDebugEnabled()) {
705 LOG.debug("closing hlog writer in " + this.dir.toString());
706 }
707 this.writer.close();
708 }
709 } finally {
710 cacheFlushLock.unlock();
711 }
712 }
713
714
715
716
717
718
719
720
721 public void append(HRegionInfo regionInfo, WALEdit logEdit,
722 final long now,
723 final boolean isMetaRegion)
724 throws IOException {
725 byte [] regionName = regionInfo.getRegionName();
726 byte [] tableName = regionInfo.getTableDesc().getName();
727 this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
728 }
729
730
731
732
733
734
735
736 protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, long now) {
737 return new HLogKey(regionName, tableName, seqnum, now);
738 }
739
740
741
742
743
744
745
746
747
748
749 public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
750 throws IOException {
751 if (this.closed) {
752 throw new IOException("Cannot append; log is closed");
753 }
754 byte [] regionName = regionInfo.getRegionName();
755 synchronized (updateLock) {
756 long seqNum = obtainSeqNum();
757 logKey.setLogSeqNum(seqNum);
758
759
760
761
762
763 this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
764 doWrite(regionInfo, logKey, logEdit);
765 this.numEntries.incrementAndGet();
766 }
767
768
769 this.sync(regionInfo.isMetaRegion());
770 }
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795 public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
796 final long now)
797 throws IOException {
798 if (edits.isEmpty()) return;
799
800 byte[] regionName = info.getRegionName();
801 if (this.closed) {
802 throw new IOException("Cannot append; log is closed");
803 }
804 synchronized (this.updateLock) {
805 long seqNum = obtainSeqNum();
806
807
808
809
810
811 this.lastSeqWritten.putIfAbsent(regionName, seqNum);
812 HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
813 doWrite(info, logKey, edits);
814 this.numEntries.incrementAndGet();
815 }
816
817 this.sync(info.isMetaRegion());
818 }
819
820 public void hflush() throws IOException {
821 if (this.closed) {
822 return;
823 }
824 try {
825 long now = System.currentTimeMillis();
826 this.writer.sync();
827 synchronized (this) {
828 this.syncTime += System.currentTimeMillis() - now;
829 this.syncOps++;
830 this.forceSync = false;
831 }
832 synchronized (this.updateLock) {
833 if (!logRollRequested) {
834 checkLowReplication();
835 if (this.writer.getLength() > this.logrollsize) {
836 requestLogRoll();
837 }
838 }
839 }
840
841 } catch (IOException e) {
842 LOG.fatal("Could not append. Requesting close of hlog", e);
843 requestLogRoll();
844 throw e;
845 }
846 }
847
848 private synchronized void checkLowReplication() {
849
850
851 try {
852 int numCurrentReplicas = getLogReplication();
853 if (numCurrentReplicas != 0 &&
854 numCurrentReplicas < this.initialReplication) {
855 LOG.warn("HDFS pipeline error detected. " +
856 "Found " + numCurrentReplicas + " replicas but expecting " +
857 this.initialReplication + " replicas. " +
858 " Requesting close of hlog.");
859 requestLogRoll();
860 logRollRequested = true;
861 }
862 } catch (Exception e) {
863 LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
864 " still proceeding ahead...");
865 }
866 }
867
868
869
870
871
872
873
874
875
876
877 int getLogReplication() throws Exception {
878 if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
879 Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
880 if (repl instanceof Integer) {
881 return ((Integer)repl).intValue();
882 }
883 }
884 return 0;
885 }
886
887 boolean canGetCurReplicas() {
888 return this.getNumCurrentReplicas != null;
889 }
890
891 public void hsync() throws IOException {
892
893 hflush();
894 }
895
896 private synchronized void requestLogRoll() {
897 if (this.listener != null && !logRollRequested) {
898 this.listener.logRollRequested();
899 }
900 }
901
902 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
903 throws IOException {
904 if (!this.enabled) {
905 return;
906 }
907 try {
908 long now = System.currentTimeMillis();
909 this.writer.append(new HLog.Entry(logKey, logEdit));
910 long took = System.currentTimeMillis() - now;
911 writeTime += took;
912 writeOps++;
913 if (took > 1000) {
914 LOG.warn(Thread.currentThread().getName() + " took " + took +
915 "ms appending an edit to hlog; editcount=" + this.numEntries.get());
916 }
917 } catch (IOException e) {
918 LOG.fatal("Could not append. Requesting close of hlog", e);
919 requestLogRoll();
920 throw e;
921 }
922 }
923
924
925 int getNumEntries() {
926 return numEntries.get();
927 }
928
929
930
931
932 private long obtainSeqNum() {
933 return this.logSeqNum.incrementAndGet();
934 }
935
936
937 int getNumLogFiles() {
938 return outputfiles.size();
939 }
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954 public long startCacheFlush() {
955 this.cacheFlushLock.lock();
956 return obtainSeqNum();
957 }
958
959
960
961
962
963
964
965
966
967
968
969 public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
970 final long logSeqId,
971 final boolean isMetaRegion)
972 throws IOException {
973 try {
974 if (this.closed) {
975 return;
976 }
977 synchronized (updateLock) {
978 long now = System.currentTimeMillis();
979 WALEdit edit = completeCacheFlushLogEdit();
980 HLogKey key = makeKey(regionName, tableName, logSeqId,
981 System.currentTimeMillis());
982 this.writer.append(new Entry(key, edit));
983 writeTime += System.currentTimeMillis() - now;
984 writeOps++;
985 this.numEntries.incrementAndGet();
986 Long seq = this.lastSeqWritten.get(regionName);
987 if (seq != null && logSeqId >= seq.longValue()) {
988 this.lastSeqWritten.remove(regionName);
989 }
990 }
991
992 this.sync(isMetaRegion);
993
994 } finally {
995 this.cacheFlushLock.unlock();
996 }
997 }
998
999 public void sync() throws IOException {
1000 sync(false);
1001 }
1002
1003 public void sync(boolean force) throws IOException {
1004 hflush();
1005 }
1006
1007
1008 private WALEdit completeCacheFlushLogEdit() {
1009 KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
1010 System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
1011 WALEdit e = new WALEdit();
1012 e.add(kv);
1013 return e;
1014 }
1015
1016
1017
1018
1019
1020
1021
1022 public void abortCacheFlush() {
1023 this.cacheFlushLock.unlock();
1024 }
1025
1026
1027
1028
1029
1030 public static boolean isMetaFamily(byte [] family) {
1031 return Bytes.equals(METAFAMILY, family);
1032 }
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048 public static List<Path> splitLog(final Path rootDir, final Path srcDir,
1049 Path oldLogDir, final FileSystem fs, final Configuration conf)
1050 throws IOException {
1051
1052 long millis = System.currentTimeMillis();
1053 List<Path> splits = null;
1054 if (!fs.exists(srcDir)) {
1055
1056 return splits;
1057 }
1058 FileStatus [] logfiles = fs.listStatus(srcDir);
1059 if (logfiles == null || logfiles.length == 0) {
1060
1061 return splits;
1062 }
1063 LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
1064 srcDir.toString());
1065 splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
1066 try {
1067 LOG.info("Spliting is done. Removing old log dir "+srcDir);
1068 fs.delete(srcDir, false);
1069 } catch (IOException e) {
1070 e = RemoteExceptionHandler.checkIOException(e);
1071 IOException io = new IOException("Cannot delete: " + srcDir);
1072 io.initCause(e);
1073 throw io;
1074 }
1075 long endMillis = System.currentTimeMillis();
1076 LOG.info("hlog file splitting completed in " + (endMillis - millis) +
1077 " millis for " + srcDir.toString());
1078 return splits;
1079 }
1080
1081
1082 private final static class WriterAndPath {
1083 final Path p;
1084 final Writer w;
1085 WriterAndPath(final Path p, final Writer w) {
1086 this.p = p;
1087 this.w = w;
1088 }
1089 }
1090
1091 @SuppressWarnings("unchecked")
1092 public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
1093 return (Class<? extends HLogKey>)
1094 conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
1095 }
1096
1097 public static HLogKey newKey(Configuration conf) throws IOException {
1098 Class<? extends HLogKey> keyClass = getKeyClass(conf);
1099 try {
1100 return keyClass.newInstance();
1101 } catch (InstantiationException e) {
1102 throw new IOException("cannot create hlog key");
1103 } catch (IllegalAccessException e) {
1104 throw new IOException("cannot create hlog key");
1105 }
1106 }
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135 private static List<Path> splitLog(final Path rootDir, final Path srcDir,
1136 Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
1137 final Configuration conf)
1138 throws IOException {
1139 List<Path> processedLogs = new ArrayList<Path>();
1140 List<Path> corruptedLogs = new ArrayList<Path>();
1141 final Map<byte [], WriterAndPath> logWriters =
1142 Collections.synchronizedMap(
1143 new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
1144 List<Path> splits = null;
1145
1146
1147
1148
1149 int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
1150 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
1151
1152
1153 try {
1154 int i = -1;
1155 while (i < logfiles.length) {
1156 final Map<byte[], LinkedList<Entry>> editsByRegion =
1157 new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
1158 for (int j = 0; j < logFilesPerStep; j++) {
1159 i++;
1160 if (i == logfiles.length) {
1161 break;
1162 }
1163 FileStatus log = logfiles[i];
1164 Path logPath = log.getPath();
1165 long logLength = log.getLen();
1166 LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
1167 ": " + logPath + ", length=" + logLength );
1168 try {
1169 recoverFileLease(fs, logPath, conf);
1170 parseHLog(log, editsByRegion, fs, conf);
1171 processedLogs.add(logPath);
1172 } catch (IOException e) {
1173 if (skipErrors) {
1174 LOG.warn("Got while parsing hlog " + logPath +
1175 ". Marking as corrupted", e);
1176 corruptedLogs.add(logPath);
1177 } else {
1178 throw e;
1179 }
1180 }
1181 }
1182 writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
1183 }
1184 if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
1185 throw new IOException("Discovered orphan hlog after split. Maybe " +
1186 "HRegionServer was not dead when we started");
1187 }
1188 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
1189 } finally {
1190 splits = new ArrayList<Path>(logWriters.size());
1191 for (WriterAndPath wap : logWriters.values()) {
1192 wap.w.close();
1193 splits.add(wap.p);
1194 LOG.debug("Closed " + wap.p);
1195 }
1196 }
1197 return splits;
1198 }
1199
1200
1201
1202
1203
1204
1205 public static class Entry implements Writable {
1206 private WALEdit edit;
1207 private HLogKey key;
1208
1209 public Entry() {
1210 edit = new WALEdit();
1211 key = new HLogKey();
1212 }
1213
1214
1215
1216
1217
1218
1219 public Entry(HLogKey key, WALEdit edit) {
1220 super();
1221 this.key = key;
1222 this.edit = edit;
1223 }
1224
1225
1226
1227
1228 public WALEdit getEdit() {
1229 return edit;
1230 }
1231
1232
1233
1234
1235 public HLogKey getKey() {
1236 return key;
1237 }
1238
1239 @Override
1240 public String toString() {
1241 return this.key + "=" + this.edit;
1242 }
1243
1244 @Override
1245 public void write(DataOutput dataOutput) throws IOException {
1246 this.key.write(dataOutput);
1247 this.edit.write(dataOutput);
1248 }
1249
1250 @Override
1251 public void readFields(DataInput dataInput) throws IOException {
1252 this.key.readFields(dataInput);
1253 this.edit.readFields(dataInput);
1254 }
1255 }
1256
1257
1258
1259
1260
1261
1262
1263 public static String getHLogDirectoryName(HServerInfo info) {
1264 return getHLogDirectoryName(info.getServerName());
1265 }
1266
1267
1268
1269
1270
1271
1272
1273
1274 public static String getHLogDirectoryName(String serverAddress,
1275 long startCode) {
1276 if (serverAddress == null || serverAddress.length() == 0) {
1277 return null;
1278 }
1279 return getHLogDirectoryName(
1280 HServerInfo.getServerName(serverAddress, startCode));
1281 }
1282
1283
1284
1285
1286
1287
1288
1289 public static String getHLogDirectoryName(String serverName) {
1290 StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
1291 dirName.append("/");
1292 dirName.append(serverName);
1293 return dirName.toString();
1294 }
1295
1296 public static boolean validateHLogFilename(String filename) {
1297 return pattern.matcher(filename).matches();
1298 }
1299
1300 private static Path getHLogArchivePath(Path oldLogDir, Path p) {
1301 return new Path(oldLogDir, p.getName());
1302 }
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314 private static void writeEditsBatchToRegions(
1315 final Map<byte[], LinkedList<Entry>> splitLogsMap,
1316 final Map<byte[], WriterAndPath> logWriters,
1317 final Path rootDir, final FileSystem fs, final Configuration conf)
1318 throws IOException {
1319
1320
1321 int logWriterThreads =
1322 conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
1323 boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
1324 HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
1325 NamingThreadFactory f = new NamingThreadFactory(
1326 "SplitWriter-%1$d", Executors.defaultThreadFactory());
1327 ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
1328 for (final byte[] region : splitLogsMap.keySet()) {
1329 Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
1330 writeFutureResult.put(region, threadPool.submit(splitter));
1331 }
1332
1333 threadPool.shutdown();
1334
1335 try {
1336 for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
1337 String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds";
1338 if (j < 30) {
1339 LOG.debug(message);
1340 } else {
1341 LOG.info(message);
1342 }
1343
1344 }
1345 } catch(InterruptedException ex) {
1346 LOG.warn("Hlog writers were interrupted, possible data loss!");
1347 if (!skipErrors) {
1348 throw new IOException("Could not finish writing log entries", ex);
1349
1350 }
1351 }
1352
1353 for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
1354 try {
1355 entry.getValue().get();
1356 } catch (ExecutionException e) {
1357 throw (new IOException(e.getCause()));
1358 } catch (InterruptedException e1) {
1359 LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) +
1360 " was interrupted, however the write process should have " +
1361 "finished. Throwing up ", e1);
1362 throw (new IOException(e1.getCause()));
1363 }
1364 }
1365 }
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377 private static void parseHLog(final FileStatus logfile,
1378 final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
1379 final Configuration conf)
1380 throws IOException {
1381
1382
1383
1384 long length = logfile.getLen();
1385 if (length <= 0) {
1386 LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
1387 }
1388 Path path = logfile.getPath();
1389 Reader in;
1390 int editsCount = 0;
1391 try {
1392 in = HLog.getReader(fs, path, conf);
1393 } catch (EOFException e) {
1394 if (length <= 0) {
1395
1396
1397
1398
1399 LOG.warn("Could not open " + path + " for reading. File is empty" + e);
1400 return;
1401 } else {
1402 throw e;
1403 }
1404 }
1405 try {
1406 Entry entry;
1407 while ((entry = in.next()) != null) {
1408 byte[] region = entry.getKey().getRegionName();
1409 LinkedList<Entry> queue = splitLogsMap.get(region);
1410 if (queue == null) {
1411 queue = new LinkedList<Entry>();
1412 splitLogsMap.put(region, queue);
1413 }
1414 queue.addLast(entry);
1415 editsCount++;
1416 }
1417 LOG.debug("Pushed=" + editsCount + " entries from " + path);
1418 } finally {
1419 try {
1420 if (in != null) {
1421 in.close();
1422 }
1423 } catch (IOException e) {
1424 LOG.warn("Close log reader in finally threw exception -- continuing", e);
1425 }
1426 }
1427 }
1428
1429 private static Callable<Void> createNewSplitter(final Path rootDir,
1430 final Map<byte[], WriterAndPath> logWriters,
1431 final Map<byte[], LinkedList<Entry>> logEntries,
1432 final byte[] region, final FileSystem fs, final Configuration conf) {
1433 return new Callable<Void>() {
1434 public String getName() {
1435 return "Split writer thread for region " + Bytes.toStringBinary(region);
1436 }
1437
1438 @Override
1439 public Void call() throws IOException {
1440 LinkedList<Entry> entries = logEntries.get(region);
1441 LOG.debug(this.getName()+" got " + entries.size() + " to process");
1442 long threadTime = System.currentTimeMillis();
1443 try {
1444 int editsCount = 0;
1445 WriterAndPath wap = logWriters.get(region);
1446 for (Entry logEntry: entries) {
1447 if (wap == null) {
1448 Path logFile = getRegionLogPath(logEntry, rootDir);
1449 if (fs.exists(logFile)) {
1450 LOG.warn("Found existing old hlog file. It could be the result of a previous" +
1451 "failed split attempt. Deleting " + logFile +
1452 ", length=" + fs.getFileStatus(logFile).getLen());
1453 fs.delete(logFile, false);
1454 }
1455 Writer w = createWriter(fs, logFile, conf);
1456 wap = new WriterAndPath(logFile, w);
1457 logWriters.put(region, wap);
1458 LOG.debug("Creating writer path=" + logFile +
1459 " region=" + Bytes.toStringBinary(region));
1460 }
1461 wap.w.append(logEntry);
1462 editsCount++;
1463 }
1464 LOG.debug(this.getName() + " Applied " + editsCount +
1465 " total edits to " + Bytes.toStringBinary(region) +
1466 " in " + (System.currentTimeMillis() - threadTime) + "ms");
1467 } catch (IOException e) {
1468 e = RemoteExceptionHandler.checkIOException(e);
1469 LOG.fatal(this.getName() + " Got while writing log entry to log", e);
1470 throw e;
1471 }
1472 return null;
1473 }
1474 };
1475 }
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489 private static void archiveLogs(final List<Path> corruptedLogs,
1490 final List<Path> processedLogs, final Path oldLogDir,
1491 final FileSystem fs, final Configuration conf)
1492 throws IOException{
1493 final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
1494 conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
1495
1496 fs.mkdirs(corruptDir);
1497 fs.mkdirs(oldLogDir);
1498
1499 for (Path corrupted: corruptedLogs) {
1500 Path p = new Path(corruptDir, corrupted.getName());
1501 LOG.info("Moving corrupted log " + corrupted + " to " + p);
1502 fs.rename(corrupted, p);
1503 }
1504
1505 for (Path p: processedLogs) {
1506 Path newPath = getHLogArchivePath(oldLogDir, p);
1507 fs.rename(p, newPath);
1508 LOG.info("Archived processed log " + p + " to " + newPath);
1509 }
1510 }
1511
1512 private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
1513 Path tableDir =
1514 HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
1515 Path regionDir =
1516 HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
1517 return new Path(regionDir, RECOVERED_EDITS);
1518 }
1519
1520
1521
1522
1523
1524
1525
1526
1527 public void addLogActionsListerner(LogActionsListener list) {
1528 LOG.info("Adding a listener");
1529 this.actionListeners.add(list);
1530 }
1531
1532 public boolean removeLogActionsListener(LogActionsListener list) {
1533 return this.actionListeners.remove(list);
1534 }
1535
1536 private static void usage() {
1537 System.err.println("Usage: java org.apache.hbase.HLog" +
1538 " {--dump <logfile>... | --split <logdir>...}");
1539 }
1540
1541
1542
1543
1544
1545
1546
1547
1548 public static void main(String[] args) throws IOException {
1549 if (args.length < 2) {
1550 usage();
1551 System.exit(-1);
1552 }
1553 boolean dump = true;
1554 if (args[0].compareTo("--dump") != 0) {
1555 if (args[0].compareTo("--split") == 0) {
1556 dump = false;
1557
1558 } else {
1559 usage();
1560 System.exit(-1);
1561 }
1562 }
1563 Configuration conf = HBaseConfiguration.create();
1564 FileSystem fs = FileSystem.get(conf);
1565 final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
1566 final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1567 for (int i = 1; i < args.length; i++) {
1568 Path logPath = new Path(args[i]);
1569 if (!fs.exists(logPath)) {
1570 throw new FileNotFoundException(args[i] + " does not exist");
1571 }
1572 if (dump) {
1573 if (!fs.isFile(logPath)) {
1574 throw new IOException(args[i] + " is not a file");
1575 }
1576 Reader log = getReader(fs, logPath, conf);
1577 try {
1578 HLog.Entry entry;
1579 while ((entry = log.next()) != null) {
1580 System.out.println(entry.toString());
1581 }
1582 } finally {
1583 log.close();
1584 }
1585 } else {
1586 if (!fs.getFileStatus(logPath).isDir()) {
1587 throw new IOException(args[i] + " is not a directory");
1588 }
1589 splitLog(baseDir, logPath, oldLogDir, fs, conf);
1590 }
1591 }
1592 }
1593
1594 public static final long FIXED_OVERHEAD = ClassSize.align(
1595 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1596 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1597
1598 }