##balance过程的解读
准备工作
获取的namenode调用Balancer.run L1422,开始迭代
一个循环(除了2.1的返回结果ReturnStatus.IN_PROGRESS 继续之外,其他都结束)
2.1 ReturnStatus r = b.run L1352
2.1.1 initNode获取需要移动的字节数bytesLeftToMove
计算平均使用率
计算出过载和未充分利用的节点需要移动的字节数,两者选取较大值(已排除在与平均值相差在threshold内的节点)
2.1.2 chooseNode获取决定要移动的字节
chooseDatanodes: 三种匹配类型Matcher选取(同一组,同一机架,剩下的),每一类型做如下类型操作
1. 处理过载到未好好利用的。 2. 处理过载到使用少的。 3. 处理多使用到少使用
chooseCandidate对于每个源节点,选个候选节点(如果能符合匹配规则Matcher就选择他)
matchSourceWithTargetToMove选择两者能移动的少的字节数,形成NodeTask
2.1.3 dispatchBlockMoves L1103启动线程去移动数据,处理NodeTask
{两个条件会结束noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS 和Time.now()- startTime > MAX_ITERATION_TIME}
dispatchBlocks L614 为每个source启动一个线程去移动数据(线程放入dispatcherExecutor线程池),然后等待回复
此处一轮迭代有时间限制
dispatcherExecutor.submit(source.new BlockMoveDispatcher());(调用dispatchBlocks())
chooseNextBlockToMove 获取下一步需要移动block,选择的代理节点是拥有此block但是传递到target比较好。
此处有一个5个任务的限制,难怪增大timeout的时间之后不会quota is exceeded***
chooseBlockAndProxy 选择块和代理节点
isGoodBlockCandidate 选择好的块
chooseProxySource 在选中的块中选择一个和target比较好的location
scheduleBlockMove 发送到代理节点并给代理节点的发送复制请求
dispatch() 发送数据(此处看代码不是异步处理,果然不是)
在目的节点DataXceiver.replaceBlock方法接受来自proxy的block发送流
在此处有balanceThrottler限制
filterMovedBlocks 过滤
判断需要不需要更多的block
waitForMoveCompletion()等待所有targets里的pending任务结束(如果没有结束此处会等待)
shouldContinue如果移动字节数(dispatchBlockMoves结果)大于零或等于零次数少于 MAX_NOT_CHANGED_ITERATIONS就是in_progress
2.2 resetData清楚数据,为再来一次循环做准备
2.3 根据r判断是否结束,判断条件在上面
简单概括
--> 计算那些需要移动的节点
--> 在此节点中选择想要移动的block
--> 对此block选择一个proxy(此proxy也有这个block的副本,并且传输起来比较好)
--> 建立任务(向target节点发送replaceBlock请求,target节点向proxy节点发送copyBlock请求)拷贝数据
问题与解决
- balancer节点发送请求会有超时,在日志文件中报Read timed out -> 日志会报线程数超出配额 -> 提早退出balance过程,这种情况是balancer节点和target节点之间的rpc断开连接。只要改大超时设置就可以了(不清楚为啥不把超时统一到那个配置,并且hdfs把那些异常处理不打印问题,这个处理方式也很奇怪,可能新版本有改进)。