private void waitAndQueueCurrentPacket()
调用者:
1 | private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags) |
public class DFSOutputStream extends FSOutputSummer
1 | implements Syncable, CanSetDropBehind { |
1 | class DataStreamer extends Daemon { |
block > packet > chunk(带chunksum)
主要看看writeChunk方法。
- 构造currentPacket,写入data和checksum,并且的chunk数++,bytesCurBlock增加数据的长度。
- 如果chunk数足够多。则调用waitAndQueueCurrentPacket
- 如果如果block的大小足够大的话,
其中一个处理时添加chunk到缓存中。一下具体处理为判断是否可以添加入发送给datanode的缓存中。lastQueuedSeqno保存了最后一个需要处理的序号。
/**
- 如果没有足够空间话就等待,然后
*/
/**1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
try {
dataQueue.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
checkClosed();
queueCurrentPacket();
} catch (ClosedChannelException e) {
...
}
}
} - 添加dataQueue,设置lastQueuedSeqno
*/
1
2
3
4
5
6
7
8
9
10
11
12private void queueCurrentPacket() {
synchronized (dataQueue) {
if (currentPacket == null) return;
dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.seqno; //在waitForAckedSeqno方法使用,判断是否所有数据都被写入并接受到ack了。
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
}
currentPacket = null;
dataQueue.notifyAll();
}
}
DataStreamer是处理添加入缓存dataQueue的数据。将他们发送然后添加到ackQueue中。
在while中有processDatanodeError的处理。
1 | synchronized (dataQueue) { |
toWaitFor = lastQueuedSeqno;
waitForAckedSeqno(toWaitFor)在flushInternal和flushOrSync会调用。