defread(key: SelectionKey) { ...... val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = newBoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress();
if(read < 0) { close(key) } elseif(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done key.interestOps(SelectionKey.OP_READ) wakeup() } }
因为Kafka的消息前四个字节代表(一个int)为后续消息的size,所以首先读取size,接着把一个完整的消息读取出来。 如果读取出来一个完整的Request,则将它放到requestChannel中。 具体的Kafka消息的格式可以参考A Guide To The Kafka Protocol
我们再看看write方法的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
defwrite(key: SelectionKey) { val socketChannel = channelFor(key) val response = key.attachment().asInstanceOf[RequestChannel.Response] val responseSend = response.responseSend if(responseSend == null) thrownewIllegalStateException("Registered for write interest but no response attached to key.") val written = responseSend.writeTo(socketChannel) if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) key.interestOps(SelectionKey.OP_READ) } else { key.interestOps(SelectionKey.OP_WRITE) wakeup() } }
defappendMessages(......) { if (isValidRequiredAcks(requiredAcks)) { val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
val produceStatus = localProduceResults.map { case (topicAndPartition, result) => topicAndPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status }
if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = newDelayedProduce(timeout, produceMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = messagesPerPartition.keys.map(newTopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all val responseStatus = messagesPerPartition.map { case (topicAndPartition, messageSet) => (topicAndPartition -> ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } responseCallback(responseStatus) } }
To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader. The leader writes the message to its local log. Each follower constantly pulls new messages from the leader using a single socket channel. That way, the follower receives all messages in the same order as written in the leader. The follower writes each received message to its own log and sends an acknowledgment back to the leader. Once the leader receives the acknowledgment from all replicas in ISR, the message is committed. The leader advances the HW and sends an acknowledgment to the client. For better performance, each follower sends an acknowledgment after the message is written to memory. So, for each committed message, we guarantee that the message is stored in multiple replicas in memory. However, there is no guarantee that any replica has persisted the commit message to disks though. Given that correlated failures are relatively rare, this approach gives us a good balance between response time and durability. In the future, we may consider adding options that provide even stronger guarantees. The leader also periodically broadcasts the HW to all followers. The broadcasting can be piggybacked on the return value of the fetch requests from the followers. From time to time, each replica checkpoints its HW to its disk.
privatedefprocessConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) if(jsonOpt.isDefined) { val json = jsonOpt.get val topic = json.substring(1, json.length - 1) // hacky way to dequote if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = newProperties(logManager.defaultConfig.toProps) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic)) log.config = logConfig info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) purgeObsoleteNotifications(now, notifications) } } lastExecutedChange = changeId } } } }