zookeeper源码分析(一)

client发送数据到服务器

1
2
单线程发送(SendThread)
main线程和sendThread线程,两线程间如何协调?
1
2
3
4
    // 发出后,放入即等待结果
    private final Queue<Packet> pendingQueue = new ArrayDeque<>();
    // 发送时放入
    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();

Packet记录发出报文和接收报文对象,以及相关控制字段:finished表示请求是否结束

相关控制,将数据封装成Packet,放入outgoingQueue队尾,main线程休眠,等待结果

1
2
3
4
5
6
7
8
9
// org.apache.zookeeper.ClientCnxn#submitRequest[main thread]

synchronized (packet) {
// ...
    // Wait for request completion infinitely
    while (!packet.finished) {
        packet.wait();
    }
}

最终的处理结果(在Packet中,成功、连接丢失、超时等),要通知main线程

1
2
3
4
5
6
// org.apache.zookeeper.ClientCnxn#finishPacket[-SendThread]

synchronized (p) {
    p.finished = true;
    p.notifyAll();
}

SendThread的IO处理

两个队列数据的处理(代码org.apache.zookeeper.ClientCnxnSocketNIO#doIO)

通道可写,则outgoingQueue.peek,直到全部写完(全部发往服务器),再outgoingQueue.removeFirstOccurrence,并且pendingQueue.add

通道可读(Nio的处理):

1
2
3
4
// org.apache.zookeeper.ClientCnxnSocket

    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
    protected ByteBuffer incomingBuffer = lenBuffer;

返回的包是两个部分,一是4Byte的int,代表长度,二是该长度的包数据 所以,首先要读全4Byte获取包数据大小,拿到长度以后,在读取第二部分,通过sendThread.readResponse进行包数据的后续处理

单线程模型的特点

请求RequestHeader.xid,标记着客户端请求的顺序,在doIO的时候才赋值的(ClientCnxnSocket::doIO) 所以,xid也是pendingQueue的顺序,也是readResponse的处理顺序,如果不一致,那么这段时间,pendingQueue被清理过,比如网络断开了

zk要保持顺序一致性(sequence consistency),客户端是否重连另一台机器,从而读到”回滚”的数据?

不会,在连接建立后,要执行sendThread.primeConnection();创建session

1
2
3
4
5
// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection

ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
// ...
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

新建连接后,ConnectRequest对象,这次是addFirst,放在队首的。lastZxid上一次发送操作产生的zxid, 它是严格递增的。它的意义是,zk集群中,标记着唯一一条log。所以服务器收到lastZxid,和自己的比对,如果自己的zxid小, 那么说明自己的数据是落后的,于是拒绝建立session。

1
2
3
4
5
6
7
8
9
10
11
12
// org.apache.zookeeper.server.ZooKeeperServer#processConnectRequest
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
    String msg = "Refusing session request for client "
                 + cnxn.getRemoteSocketAddress()
                 + " as it has seen zxid 0x"
                 + Long.toHexString(connReq.getLastZxidSeen())
                 + " our last zxid is 0x"
                 + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                 + " client must try another server";
    LOG.info(msg);
    throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}