DFSOutputStream分析

private void waitAndQueueCurrentPacket()
调用者:

1
2
3
private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
public synchronized void close()

public class DFSOutputStream extends FSOutputSummer

1
2
3
4
5
6
7
implements Syncable, CanSetDropBehind {
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>(); // dataQueue是数据队列,用于保存等待发送给datanode的数据包
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); // ackQueue是确认队列,保存还没有被datanode确认接收的数据包
private DataStreamer streamer; // streamer线程,不停的从dataQueue中取出数据包,发送给datanode
...
}

1
2
3
4
5
6
7
8
9
class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
private ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream; // socket的输出流(client->datanode),用于将数据传输给datanode
private DataInputStream blockReplyStream; // socket的输入流(datanode->client),用户收到datanode的确认包
private ResponseProcessor response = null; // response线程,用于接收从datanode返回的反馈信息
...
}

block > packet > chunk(带chunksum)

主要看看writeChunk方法。

  1. 构造currentPacket,写入data和checksum,并且的chunk数++,bytesCurBlock增加数据的长度。
  2. 如果chunk数足够多。则调用waitAndQueueCurrentPacket
  3. 如果如果block的大小足够大的话,

其中一个处理时添加chunk到缓存中。一下具体处理为判断是否可以添加入发送给datanode的缓存中。lastQueuedSeqno保存了最后一个需要处理的序号。
/**

  • 如果没有足够空间话就等待,然后
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
    try {
    // If queue is full, then wait till we have enough space
    while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
    try {
    dataQueue.wait();
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    break;
    }
    }
    checkClosed();
    queueCurrentPacket();
    } catch (ClosedChannelException e) {
    ...
    }
    }
    }
    /**
  • 添加dataQueue,设置lastQueuedSeqno
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void queueCurrentPacket() {
    synchronized (dataQueue) {
    if (currentPacket == null) return;
    dataQueue.addLast(currentPacket);
    lastQueuedSeqno = currentPacket.seqno; //在waitForAckedSeqno方法使用,判断是否所有数据都被写入并接受到ack了。
    if (DFSClient.LOG.isDebugEnabled()) {
    DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
    }
    currentPacket = null;
    dataQueue.notifyAll();
    }
    }

DataStreamer是处理添加入缓存dataQueue的数据。将他们发送然后添加到ackQueue中。
在while中有processDatanodeError的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
// write out data to remote datanode
try {
one.writeTo(blockStream);
blockStream.flush();
}

还需要一个线程接受ack,为ResponseProcessor
//获取返回的序号
ack.readFields(blockReplyStream);
long seqno = ack.getSeqno();
//获取dataQueue里第一个包的序号
Packet one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.seqno != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
//移除ackQueuede里的第一个,
synchronized (dataQueue) {
lastAckedSeqno = seqno; //lastAckedSeqno在waitForAckedSeqno有使用
ackQueue.removeFirst();
dataQueue.notifyAll();
}

toWaitFor = lastQueuedSeqno;
waitForAckedSeqno(toWaitFor)在flushInternal和flushOrSync会调用。