hdfs balance的一些解读和问题记录

##balance过程的解读

  1. 准备工作
    获取的namenode

  2. 调用Balancer.run L1422,开始迭代
    一个循环(除了2.1的返回结果ReturnStatus.IN_PROGRESS 继续之外,其他都结束)

2.1 ReturnStatus r = b.run L1352

​ 2.1.1 initNode获取需要移动的字节数bytesLeftToMove
​ 计算平均使用率
​ 计算出过载和未充分利用的节点需要移动的字节数,两者选取较大值(已排除在与平均值相差在threshold内的节点)
​ 2.1.2 chooseNode获取决定要移动的字节
chooseDatanodes: 三种匹配类型Matcher选取(同一组,同一机架,剩下的),每一类型做如下类型操作

​ 1. 处理过载到未好好利用的。 2. 处理过载到使用少的。 3. 处理多使用到少使用

​ chooseCandidate对于每个源节点,选个候选节点(如果能符合匹配规则Matcher就选择他)
​ matchSourceWithTargetToMove选择两者能移动的少的字节数,形成NodeTask
​ 2.1.3 dispatchBlockMoves L1103启动线程去移动数据,处理NodeTask
​ {两个条件会结束noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS 和Time.now()- startTime > MAX_ITERATION_TIME}
​ dispatchBlocks L614 为每个source启动一个线程去移动数据(线程放入dispatcherExecutor线程池),然后等待回复
​ 此处一轮迭代有时间限制
​ dispatcherExecutor.submit(source.new BlockMoveDispatcher());(调用dispatchBlocks())
​ chooseNextBlockToMove 获取下一步需要移动block,选择的代理节点是拥有此block但是传递到target比较好。
​ 此处有一个5个任务的限制,难怪增大timeout的时间之后不会quota is exceeded***
​ chooseBlockAndProxy 选择块和代理节点
​ isGoodBlockCandidate 选择好的块
​ chooseProxySource 在选中的块中选择一个和target比较好的location
​ scheduleBlockMove 发送到代理节点并给代理节点的发送复制请求
​ dispatch() 发送数据(此处看代码不是异步处理,果然不是)
​ 在目的节点DataXceiver.replaceBlock方法接受来自proxy的block发送流
​ 在此处有balanceThrottler限制
​ filterMovedBlocks 过滤
​ 判断需要不需要更多的block
​ waitForMoveCompletion()等待所有targets里的pending任务结束(如果没有结束此处会等待)
​ shouldContinue如果移动字节数(dispatchBlockMoves结果)大于零或等于零次数少于 MAX_NOT_CHANGED_ITERATIONS就是in_progress
2.2 resetData清楚数据,为再来一次循环做准备
2.3 根据r判断是否结束,判断条件在上面

简单概括

--> 计算那些需要移动的节点 
--> 在此节点中选择想要移动的block 
--> 对此block选择一个proxy(此proxy也有这个block的副本,并且传输起来比较好) 
--> 建立任务(向target节点发送replaceBlock请求,target节点向proxy节点发送copyBlock请求)拷贝数据

问题与解决

  1. balancer节点发送请求会有超时,在日志文件中报Read timed out -> 日志会报线程数超出配额 -> 提早退出balance过程,这种情况是balancer节点和target节点之间的rpc断开连接。只要改大超时设置就可以了(不清楚为啥不把超时统一到那个配置,并且hdfs把那些异常处理不打印问题,这个处理方式也很奇怪,可能新版本有改进)。

HBase源码系列之server端的get请求处理

scanner的层次

一个Region对应一个RegionScannerImpl


  • RegionScannerImpl (下面缩进代表了层级)

    • StoreScanner (组成storeHeap 和 joinedHeap, 这俩都是KeyValueHeap, 用的是region的Comparator)

      • MemStoreScanner
      • StoreFileScanner

      ​ (MemStoreScanner 和 StoreFileScanner 组成heap ,也是一个KeyValueHeap, 用的是store的Comparator)

对scanner预处理(裁剪等)

  1. 获取此region中store里的mem和file能组成的scanner,并做一定程度的过滤(rang,bf和ts)

    获取所有的scanner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //对此region中每个store(列族)都需要获取scanner
    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
    //此方法是获取了scanner,过滤了一些scanner,并且还粗略的检索到需要的位置。
    //此scanner是一个store对应的
    KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
    if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) {
    scanners.add(scanner);
    } else {
    //比如有filter的情况
    joinedScanners.add(scanner);
    }
    }
    //生成俩KeyValueHeap,此处的比较器不知道啥意思。真他妈尴尬
    initializeKVHeap(scanners, joinedScanners, region);

    1.1 跟入store.getScanner方法,现在次问题只在一个store中讨论。store间的协调后面会提到。

    1
    2
    3
    4
    5
    6
    //此方法和1.1.4节中想呼应。
    this.store.addChangedReaderObserver(this);
    //获取所有的file,打开文件,做一些过滤等。
    List<KeyValueScanner> scanners = getScannersNoCompaction();
    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
    resetKVHeap(scanners, store.getComparator());

    1.1.1 getScannersNoCompaction()里面有重要的方法如下。

    1
    2
    selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
    isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));

    ​ 主要两层,store.getScanners中是获取所有的file,打开文件。

    ​ selectScannersFrom主要做过滤,主要是如下方法

    1
    2
    3
    kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)
    此方法两个分支,mem和file值得关注,此处就不说了。反正用了一些range和bf(bf只针对单行和单行列族)
    bf的过滤有一个判断(!scan.isGetScan()),如果是get,并且isStartRowAndEqualsStopRow为true才走bf。

    1.1.2 seekScanners

    ​ bf判断,时间判断,伪造kv使其跳过此个检索吧。

    1.1.3 resetKVHeap

    ​ 组织成一个KeyValueHeap,比较器有代表时间的id等。

    1.1.4另外如下方法也会让检索重新打开scanner,一般是有compact,bulkload等操作。需要去重新,应该为更改了一些东西。

    1
    notifyChangedReadersObservers()
  2. 裁剪的storefile和block的方式

    我个人觉得总共几个部分,包括time裁剪,key裁剪,bf和索引

    获取scanner分两个,一个memstore的,另一个是file的。

    获取有关file的Scanner如下:

    1
    2
    3
    4
    this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);

    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
    cacheBlocks, usePread, isCompaction, false, matcher, readPt);

    获取有关mem的Scanner如下:

    1
    memStoreScanners = this.memstore.getScanners(readPt);

    然后在返回之前做了筛选,注意那个selectScannerFrom方法

    1
    2
    selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
    isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));

    selectScannerFrom还是分两个分支,有两个实现,file的和mem的。

    file的过滤分三种,time,key和bf。file过滤实现如下:

    1
    2
    3
    4
    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
    return reader.passesTimerangeFilter(scan.getTimeRange(), oldestUnexpiredTS)
    && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
    }

    memstore的实现如下:

    1
    2
    3
    4
    5
    6
    7
    public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
    TimeRange timeRange = scan.getTimeRange();
    return (timeRangeTracker.includesTimeRange(timeRange) ||
    snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
    (Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax())
    >= oldestUnexpiredTS);
    }

    从文件格式看,主要是几个过滤,主要代码如下

    主要的实现在如下方法中

    1
    2
    3
    BlockWithScanInfo blockWithScanInfo =
    indexReader.loadDataBlockWithScanInfo(key, offset, length, block, 、
    cacheBlocks, pread, isCompaction);

    在HFileReaderV2中,由索引过滤和blockcache的实现,在

    根据索引信息过滤block

    1
    int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);

    方法实现如下

    1
    2
    3
    4
    5
    public int rootBlockContainingKey(final byte[] key, int offset,
    int length) {
    int pos = Bytes.binarySearch(blockKeys, key, offset, length,
    comparator);
    ....}

    如果需要读取,会先从blockcache中读取,减少io。blcokcache中读取block,代码如下

    1
    2
    3
    block = cachingBlockReader.readBlock(currentOffset,
    currentOnDiskSize, shouldCache, pread, isCompaction, true,
    expectedBlockType);

    两处使用bf索引过滤,一处是isLazy==true时

    1
    boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.requestSeek(KeyValue kv, boolean forward, boolean useBloom)

    bf的粒度是chunk。

协调一个cf下的检索

  1. 我们讲获取到的scanner组织成heap,然后从heap中获取对应的数据。这个heap需要有一定的组织,比如检索到了数据,需要关闭;对于同一个key,需要在所有的scanner中获取到等

    获取到scanner之后就开始获取数据结果了。

    1
    scanner.next(results);

    里面一堆逻辑的,实际获取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
    byte[] currentRow, int offset, short length) throws IOException {
    KeyValue nextKv;
    do {
    heap.next(results, limit - results.size());
    if (limit > 0 && results.size() == limit) {
    return KV_LIMIT;
    }
    nextKv = heap.peek();
    } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
    return nextKv;
    }

StoreFileScanner中检索相关的处理

。。。

协调不同cf下的检索

暂时不看,暂时在业务中不需要。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69





===================== 以下太乱 ======================

hbase检索在region中的检索流程,其中主要需要分析的是在一个store中的检索的操作,分两个层面:一个是到scanner的上层的处理,另一个是scanner的内部的处理(包括men和file)两种。

## 处理scanner

## scanner内部

### memscanner

要从memstore中获取想要的数据,感觉不会很难,因为memstore是个有一定顺序。

### filescanner

对于filestore的检索,hbase提供了集中方式来加速检索。

1. bloomfilter的过滤,见”处理scanner“
2. index的过滤
3. blockcache的使用








## 对一个get的请求怎么判断结束

boolean org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl.nextInternal(List<Cell> results, int limit)

只执行一次。一个keyvalue只会在一个block中,其实并不需要再去检索,只要检索一次就可以了,感觉是对的。没想到前几天想的有问题。



## scan判断结束

scan的判断结束

```java
while (i < rows) {
// Stop collecting results if maxScannerResultSize is set and we have exceeded it
if ((maxResultSize < Long.MAX_VALUE) &&
(currentScanResultSize >= maxResultSize)) {
builder.setMoreResultsInRegion(true);
break;
}
// Collect values to be returned here
moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
for (Cell cell : values) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
currentScanResultSize += kv.heapSizeWithoutTags();
totalKvSize += kv.getLength();
}
results.add(Result.create(values));
i++;
}
if (!moreRows) {
//结束了
break;
}
values.clear();
}

检索keyvalue

1
int org.apache.hadoop.hbase.io.hfile.HFileReaderV2.AbstractScannerV2.seekTo(byte[] key, int offset, int length, boolean rewind) throws IOException

实现获取block

​ index 去检索

检索到指定位置

bloom index的位置

第一层的root index就是chunk的位置。

但是root index指向的是imtermediate或leaf,理论上来说leaf对应chunk

HFile中除了Data Block需要索引之外,上一篇文章提到过Bloom Block也需要索引,索引结构实际上就是采用了single-level结构,文中Bloom Index Block就是一种Root Index Block。

bloom block 和 data block的索引不一致。bloom block采用单层结构,data block采用多层结构。

bloom block的单层结构的叶子结点是chunk。

get请求读取block的种类和情况

  1. 正常读取block的

get 流程真是复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  public boolean seek(KeyValue key) throws IOException {
if (seekCount != null) seekCount.incrementAndGet();

try {
try {
if(!seekAtOrAfter(hfs, key)) {
LOG.info("liubb seekAtOrAfter false");
close();
return false;
} else {
LOG.info("liubb seekAtOrAfter true");
}

cur = hfs.getKeyValue();

return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
} catch (FileNotFoundException e) {
throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
}
里面的seekAtOrAfter很重要。
1
2
3
4
5
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
qcode = optimize(qcode, kv);
switch(qcode) {

此处各种code的解释对于不同storefile的理解也很好。

可以看到一些问题和关注点

  1. RegionScannerImpl里使用StoreScanner 时,各种heap的操作主要需要关注一个记录的组织,因为一条记录的不同列族保存在不同的store中。还要关注什么时候结束。
  2. StoreScanner中的两种scanner(StoreFileScanner可能有多个)需要协调在不同scanner中的不同的版本的问题。还要关注什么时候结束。

HBase源码系列之memstore

主要结构

​ 主要结构包括「kvset,snapshot」,先说snapshot,主要适用于生成HFile时临时存放的kvset的一个快照。更改的snaphost一定条件判断可以生成HFile时调用

1
FlushResult org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HLog wal, long myseqid, MonitoredTask status)

此方法两种情况会调用

​ 一个是关闭时。

1
HRegion.doClose(boolean abort, MonitoredTask status)

​ 另一个时需要flush时。

1
FlushResult org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HLog wal, long myseqid, MonitoredTask status)

flush过程

  1. 准备工作
  2. 获取HRegion.updatesLock的写锁
  3. 获取mvcc事务,并努力前提读点
  4. 调用prepare()
    1. 将snapshot设置为的kvset,并重新生成一个kvset,用于写入。
  5. 释放写锁
  6. 同步wal日志
  7. 等到mvcc结束
  8. flushcache()
  9. commit()
    1. 才更新构造scanner使用的org.apache.hadoop.hbase.regionserver.DefaultStoreFileManager.storefiles

疑问与解释

问题:

最开始,我感觉逻辑有问题,可能会导致在某个时间点(flush)时,读取不到已经写入的数据。

回答:

有三个方面:

  1. 读取数据时会读取snapshot的数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Override
    public synchronized KeyValue next() {
    if (theNext == null) {
    return null;
    }

    final KeyValue ret = theNext;

    // Advance one of the iterators
    if (theNext == kvsetNextRow) {
    kvsetNextRow = getNext(kvsetIt);
    } else {
    snapshotNextRow = getNext(snapshotIt);
    }

    // Calculate the next value
    theNext = getLowest(kvsetNextRow, snapshotNextRow);

    //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
    //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
    // getLowest() + " threadpoint=" + readpoint);
    return ret;
    }
  2. 清除snapshot和更新文件时在一个锁内。

    insertNewFiles方法将刷写成的HFile加入Store的fileManger里storefiles里,此结构用语构造storefilescanner。

    clearSnapshot方法将snapshot设置为一个新的空的结构。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    this.lock.writeLock().lock();
    try {
    this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
    this.memstore.clearSnapshot(set);
    } finally {
    // We need the lock, as long as we are updating the storeFiles
    // or changing the memstore. Let us release it before calling
    // notifyChangeReadersObservers. See HBASE-4485 for a possible
    // deadlock scenario that could have happened if continue to hold
    // the lock.
    this.lock.writeLock().unlock();
    }
  3. 第三个方面,注意到第一个方面读取时或去的scanner方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
    byte[] stopRow, long readPt) throws IOException {
    Collection<StoreFile> storeFilesToScan;
    List<KeyValueScanner> memStoreScanners;
    this.lock.readLock().lock();
    try {
    storeFilesToScan =
    this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
    memStoreScanners = this.memstore.getScanners(readPt);
    } finally {
    this.lock.readLock().unlock();
    }
    ....
    }

    snapshot里的数据要么被getScanner引用到要么清空并生成新文件,而且两者互斥(使用的是同一个锁)。这样就可以保证数据能被读取到了。

HBase源码系列之MVCC

关键点

​ KeyValue类中有long mvcc的变量,感觉此处可以跟踪。

​ MultiVersionConsistencyControl在一个Region有一个。

​ MultiVersionConsistencyControl{

​ memstoreRead:能够读取的序列号。

​ memstoreWrite:写操作的序列号。

​ writeQueue:写操作队列}

问题点

就是需要去回答的问题,这些问题十分有助于思考这个大问题

  1. 读取是版本的选择

    在org.apache.hadoop.hbase.regionserver.MemStore.MemStoreScanner.getNext(Iterator it) L777中判断了曲出来的KeyValue是否到了可以读的范围(v.getMvccVersion() <= this.readPoint)。否则掠过

    在构造scanner时会将readpoint传入到scanner,说明能读到的版本在构造开始就定好了。

  2. 写的时候的版本

    w = mvcc.beginMemstoreInsert();

    ​ 申请一个写操作的序列号,使用的变量是memstoreWrite

    mvcc.completeMemstoreInsert(w);

    ​ 标记操作结束,尽量更新memstoreRead,并且通知readWriters(在compaction和flush时会用到,因为这时需要将那时的版本都等待写完才能操作)。

    ​ 等待到此操作完成

  3. 回滚memstore的时候版本的控制

    回滚操作时找kv有版本比较MemStore.java L334。并且会滚后再执行mvcc.completeMemstoreInsert(w);

ps:long org.apache.hadoop.hbase.regionserver.HRegion.getReadpoint(IsolationLevel isolationLevel) L1115用了一个隔离级别。之后写一篇隔离级别的小文,加深理解。

去杭州

大学毕业在北京工作生活了近三年之后,即将回到自己的家乡的省会(杭州)来工作生活。明天就要去杭州正式开始我的杭州的生活了。有诸多感慨,从生活和工作两方面,在此小记一下。

生活

北京的生活方面貌似常被吐槽,具体有空气不好,节奏快等等。但是我对北京的感觉还不错,这是个非常适合学习和工作的城市,这也是个伟大的城市。

北京有十分发达的地铁网络并且有十分发达的文化资源,我在工作日可以非常便捷的去上班,在周末可以非常方便的去逛北京的博物馆,美术馆等,去了一些博物馆之后,可以感受到这个世界有很多值得我们去关注的,真是真心喜欢北京这座城市。相较之下,杭州就没这么好,很多地方没通地铁,哭。当然我隐约的感觉到杭州潜力很大,并且希望自己能参与到其中,能随着这个城市的成长而成长。

在去年清明假期时,一个来杭州呆了一个假期,晚上睡宾馆,白天到处走走停停看看,感觉十分惬意,只是略讨厌那一只下雨的天气。当时第一天就去了浙大的玉泉校区,走在曾经梦想的校园里,有种考上浙大来读书的感觉,当我在留学生食堂吃完晚饭之后听着校园里的音乐,走在校园的一个个角落,我感觉这就是天堂。之后去了杭州图书馆,太子湾,植物园等,可以说这很斌斌。不知道为什么,自己很喜欢这种略文艺的东西,也许以后我会用脚步去丈量杭州的每条路。

工作

毫无疑问,自己真的没法说喜欢在北京的工作。不仅仅是自己感觉没有激情的,而且做的事情也不是未来长期能发展的。去绿湾面试时,被问到在上一家公司的感受时,说了一句“不太开心,只是觉得数据量大,人比较有意思”,我猜这应该是我最真实的感受。

到了杭州开始找工作之后,有两个问题一直在心里徘徊:技术还要做多久?我还能不能在技术深度上去努力达到一个程度了。目前还无解。

最近倒是对python去分析金融信息有一定的兴趣,什么时候要搞一搞。

城市

谈谈对杭州的感觉,据说杭州市政府对应届毕业生来杭工作给很大补助,本人也感觉杭州最近人口的增长太慢了。其次,从面试来看隐约感觉杭州现在吸引了大量的人才,而且环境好,未来估计会好。但是在六月份市政府出了如此蛋疼的入户政策,简直无语。

IT和金融感觉是未来很有前途的。

PS

其实从来也没有在网上发表过自己很内心的想法,经过这几个月,也确实发现自己需要改变,需要有更多的表达,以后要多写一些东西,多发些东西。

写RCFile不停报的ArrayIndexOutOfBoundsException错误

场景:使用flume+自主开发的形式将数据按系统的要求写入hdfs中,并生成相应的索引
最近在一个试运行集群中经常发生数据无法入库情况,然后看了一下系统日志,发现日志在不停的抛出如下异常。

步骤1

登录及其的查看异常(不停歇的报此错)如下

1
2
3
4
java.lang.ArrayIndexOutOfBoundsException: 7107
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:76)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:50)
at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)

首先,我看到此日志,我内心第一反应,这错真是奇葩,没想到定位和解决这个问题用了好几天。

步骤2

看到此异常后,我第一反应是得先找到第一个报错的地方并且查看的此报错的原因。然后我就开始看此处涉及到hdfs的代码,想看看为什么会报错。

1
2
3
4
5
6
7
public void sync() throws IOException {
if (sync != null && lastSyncPos != out.getPos()) {
out.writeInt(SYNC_ESCAPE); // mark the start of the sync
out.write(sync); // write sync
lastSyncPos = out.getPos(); // update lastSyncPos
}
}

RCFile.java中如上代码,程序在out.writeInt就开始报错了,lastSyncPos无法的更新。但是在上层使用如下代码的判断的调用sync。

1
2
3
4
5
private void checkAndWriteSync() throws IOException {
if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
sync();
}
}

好吧,这writeInt报错了,对于这个我真是醉了。

步骤3

发现了此处无法理解的地方后,我将重点转向了第一个报错的地方。因为他是不确定时间报错,我就开始启动程序后,时常的查看日志,终于在之后的某个早上,我在查看日志时,我发现了第一个报错的地方。然后在那上面看到了如下异常。

1
2
3
4
java.io.IOException: All datanodes ...:50010 are bad. Aborting...
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1147)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:945)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:496)

然后在日志中找第一个报次错的地方,还发现了其他的报错

1
2
3
4
5
6
7
[ShortCircuitCache_SlotReleaser] (org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache$SlotReleaser.run:215)  - ShortCircuitCache(0x3fbdc007): failed to release short-circuit shared memory slot Slot(slotIdx=1, shm=DfsClientShm(7bf35643718a2bf0a9d85884afb8f4f8)) by sending ReleaseShortCircuitAccessRequestProto to /var/run/hdfs-sockets/dn.  Closing shared memory segment.

java.net.ConnectException: connect(2) error: Connection refused when trying to connect to '/var/run/hdfs-sockets/dn'

29 七月 2016 10:14:17,390 WARN [ShortCircuitCache_SlotReleaser] (org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.shutdown:380) - EndpointShmManager(10.139.90.136:50010, parent=ShortCircuitShmManager(5c89577b)): error shutting down shm: got IOException calling shutdown(SHUT_RDWR)

java.nio.channels.ClosedChannelException

步骤4

看到这些日志后,我感觉在10:14一定发生了什么,于是我开始寻找datanode中相关时间点日志STARTUP_MSG: Starting DataNode这种datanode启动信息,我内心一万只草泥马飘过。
在此日志的上面我看到了诸如的的日志。

1
2
2016-07-29 10:14:12,374 INFO org.apache.hadoop.util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 4519ms
No GCs detected

然后就是

1
2016-07-29 10:14:36,720 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: STARTUP_MSG:

中间隔了三十秒。
从这种情况看感觉应该是datanode进程死了,然后又重启了。

步骤5

然后我就开始全力找为什么此datanode会挂。列出几点信息,这些信息是感觉有点头疼。

  1. supervisord.log的日志中有2016-07-29 10:14:16,820 INFO exited: 411-hdfs-DATANODE (terminated by SIGKILL; not expected)此时间正好在10:14:12到10:14:36之间。
  2. 之前无聊时看过datanode虚拟机参数,记得有这么个参数。-XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh
    我开始怀疑是不是我这个参数给datanode进程发了kill信号,然后又重新启动。在/var/log/messages中也没发现oom的异常。

步骤6

开始看killparent.sh脚本,发现在调用此脚本时会将一个字符串写入一个文件,但是我在系统中没有发现这个文件,然后我就有点懵逼了。然后我在此脚本中加了一行我自己的代码,等再看看会不会再出现这种情况。

步骤7

跟了一个周末后,发现killparent.sh的脚本没有调用。然后开始在晚上搜索,看了一些hdfs源码,可以确定的是flume的hdfs客户端和datanode之间的连接断了,然后导致datanode报超时错误,最后在flume中报读取流出错。

步骤8

然后开始了两天完全懵逼的时光,在网上看到说可能ulimit对进程的限制,然后就开始在的集群中查看lsof中flume和datanode的链接使用数量。监控了一晚上什么收获都没有,然后就放弃了。

步骤9

然后开始猜测是不是flume进程的某些原因,因为我在成个几次重启的过程中没有重启过datanode,而只重启了flume,但是重启了之后集群就可以开始加载了。然后又开始做尝试,同时看到看tailf过滤datanode日志中的error和warn信息和flume的出错日志。不停等,终于等到了出错,发现只有datanode报连接异常的时候才会导致flume出错,这时候就断定是flume和datanode之间的连接,突然想到可能是gc,想到之前的看过datanode没有full gc,那就可能是flume,看了flumegc日志,尼玛在出错时间点之前还真有一次400多秒的gc。我内心是崩溃的。

步骤10

明天开始尝试将timeout时间再延长,将超时时间设置为3600,跑了周末两天都没什么问题。

暂时告一段落

因为接下来要做的是调整gc参数。