// If strictly internal, copy successor's element to p and then make p // point to successor. // 如果有两个子节点,则获取后继,然后将后继的节点的内容拷贝至p中,接下来要处理的就变成后继节点。 if (p.left != null && p.right != null) { Entry<K, V> s = successor(p); p.key = s.key; p.value = s.value; p = s; } // p has 2 children
[2019-02-24 16:03:15,121] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/Users/liubinbin/Documents/install/kafka_2.12-2.1.0/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
如果配置之后启动broker会在日志里显示:
1 2 3 4 5 6
[2019-02-24 14:06:09,599] INFO Client successfully logged in. (org.apache.zookeeper.Login) [2019-02-24 14:06:09,600] INFO Client will use DIGEST-MD5 as SASL mechanism. (org.apache.zookeeper.client.ZooKeeperSaslClient) [2019-02-24 14:06:09,657] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will attempt to SASL-authenticate using Login Context section 'Client' (org.apache.zookeeper.ClientCnxn) [2019-02-24 14:06:09,676] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-02-24 14:06:09,795] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100043c21d70000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2019-02-24 14:06:09,799] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
try { // 获取需要执行sync的场景 while (true) { takeSyncFuture = null; // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); currentSequence = this.sequence; longsyncFutureSequence= takeSyncFuture.getTxid(); if (syncFutureSequence > currentSequence) { thrownewIllegalStateException("currentSequence=" + currentSequence + ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. // 掠过一些已经sync过的sequence,可能在别syncRunner已经执行过了,或当时多加的部分。 longcurrentHighestSyncedSequence= highestSyncedTxid.get(); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); // Done with the 'take'. Go around again and do a new 'take'. continue; } break; } try { // 执行sync writer.sync(useHsync); // 更新highestSyncedTxid,此处记录了目前sync的最高Sequence。 currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { ... } finally { // First release what we 'took' from the queue. // release目前取到的SyncFuture。 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? // release所有目前小于当前sequence的SyncFuture。这步感觉好像是没有必要的。 syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { requestLogRoll(); } else { checkLogRoll(); } } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { ... } }