当前位置: 首页 > news >正文

Apache Kafka源码剖析:第7篇 日志存储系列2-FileMessageSet

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

本节主要讲解FileMessageSet

Kafka使用FileMessageSet来管理日志文件,对应磁盘上的一个真正的文件。

一开始没找到这个类
后来下了0.10版本看了下,才知道在0.11版本这个类没了,不知道是删除了还是转移了。

在0.10.0中,Kafka使用FileMessageSet管理日志文件。

它对应着磁盘上的一个真正的日志文件。

看一下类的继承关系

@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {
/**
 * A set of messages with offsets. A message set has a fixed serialized form, though the container
 * for the bytes could be either in-memory or on disk. The format of each message is
 * as follows:
 * 8 byte message offset number
 * 4 byte size containing an integer N
 * N message bytes as described in the Message class
 */
abstract class MessageSet extends Iterable[MessageAndOffset] {

MessageSet中保存的数据格式分为3部分:

1) 8字节的offset

2) 4字节的size表示data的大小

3) data表示真正的数据

1)+2)--->LogOverhead

这个从上面的注释也可以看得出来

/**
 * A set of messages with offsets. A message set has a fixed serialized form, though the container
 * for the bytes could be either in-memory or on disk. The format of each message is
 * as follows:
 * 8 byte message offset number
 * 4 byte size containing an integer N
 * N message bytes as described in the Message class
 */

不用关注细节,只要知道这里面包含了哪些信息点就足够了,协议都是人定的!

下面开始聊Message的格式,我们打开这个类

/**
 * Constants related to messages
 */
object Message {

  /**
   * The current offset and size for all the fixed-length fields
   */
  val CrcOffset = 0
  val CrcLength = 4
  val MagicOffset = CrcOffset + CrcLength
  val MagicLength = 1
  val AttributesOffset = MagicOffset + MagicLength
  val AttributesLength = 1
  // Only message format version 1 has the timestamp field.
  val TimestampOffset = AttributesOffset + AttributesLength
  val TimestampLength = 8
  val KeySizeOffset_V0 = AttributesOffset + AttributesLength
  val KeySizeOffset_V1 = TimestampOffset + TimestampLength
  val KeySizeLength = 4
  val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
  val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
  val ValueSizeLength = 4

下面是各个字段的含义

crc---4个字节,消息的校验码,防止消息错误

magic---1个字节,魔数,取值为0或者1,影响了消息的长度和格式。

attributes:1字节,消息的属性,比如压缩类型,时间戳类型,创建时间/追加时间等。

读者可以理解为一些meta信息点。

timestamp: 时间戳信息,由上面的字段负责解释具体含义。

key:你懂的,不解释

value:你懂的,不解释

------------------------------------------------------------------------------

有2个比较关键的方法:

1)writeTo---写消息
  /**
   * Write some of this set to the given channel.
   * @param destChannel The channel to write to.
   * @param writePosition The position in the message set to begin writing from.
   * @param size The maximum number of bytes to write
   * @return The number of bytes actually written.
   */
  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {

2)
  /**
   * Provides an iterator over the message/offset pairs in this set
   */
  def iterator: Iterator[MessageAndOffset]
提供迭代器,顺序读取MessageSet中的消息

这个很好理解,写了就要读,MessageSet具有顺序写入和读取的特性。

在Kafka里,一切都是顺序IO

---上面聊的是MessageSet抽象类以及其中保存消息的格式,我们开始分析FileMessageSet实现类

先看下类的定义

/**
 * An on-disk message set. An optional start and end position can be applied to the message set
 * which will allow slicing a subset of the file.
 * @param file The file name for the underlying log data
 * @param channel the underlying file channel used
 * @param start A lower bound on the absolute position in the file from which the message set begins
 * @param end The upper bound on the absolute position in the file at which the message set ends
 * @param isSlice Should the start and end parameters be used for slicing?
 */
@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {

1)file: java.io.File类型,指向真正的磁盘上的日志文件

2)channel:用于读写此日志文件,我们都知道文件可以拿到channel,如果你写过MappedByteBuffer,你自然知道我说的是啥

3)start/end: FileMessageSet除了表示一个完整的日志文件,还可以表示日志文件分片,start/end就是这个分片的起始位置和结束位置,

4)isSlice:表示此FileMessageSet是否为日志文件的分片

5)_size: FileMessageSet大小,单位字节。如果是分片,就表示分片大小,否则表示FileMessageSet大小。

 

在操作系统里,进行文件的预分配可以提高写操作的性能。

上面解释了预分配的概念!

 

然后还是直接上源码吧

  /**
   * Create a file message set with no slicing, and with initFileSize and preallocate.
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
   */
  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
      this(file,
        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
        start = 0,
        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
        isSlice = false)

openChannel的具体实现如下:

/**
   * Open a channel for the given file
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
   * @param file File path
   * @param mutable mutable
   * @param fileAlreadyExists File already exists or not
   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
   * @param preallocate Pre allocate file or not, gotten from configuration.
   */
  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) {
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {
          val randomAccessFile = new RandomAccessFile(file, "rw")
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()
      }
    }
    else
      new FileInputStream(file).getChannel()
  }
}

比较简单,一个是是否只读,一个是是否预先分配空间。

---------------------------------------------------------------------------------------------

初始化时,会设置channel的position位置

下面我们来分析读写过程

这个主要是看append函数

我们可以理解为初始化了一个FileMessageSet,然后就可以调用append函数进行写了,当然每次都是在文件的最后进行文件IO.
顺序性嘛!
  /**
   * Append these messages to the message set
   */
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)
    _size.getAndAdd(written)
  }

writeFullyTo就是调用channel直接写文件了

下面讲查找文件,函数是searchFor()

/**
 * The mapping between a logical log offset and the physical position
 * in some log file of the beginning of the message set entry with the
 * given offset.
 */
case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
  override def indexKey = offset
  override def indexValue = position.toLong
}

也就是标记了position和offset的值!

还有1个writeTo方法,是将数据写入到别的channel.

 

转载于:https://my.oschina.net/qiangzigege/blog/1509660

相关文章:

  • Linux常见命令总结
  • 作用域
  • 二 APPIUM Android自动化 环境搭建(转)
  • [20170713] 无法访问SQL Server
  • elaselasticsearch分片交互过程
  • Visual Studio 2017使用
  • springboot devtools 嵌套jar 序列化错误 java.lang.ClassCastException 完美解决方案
  • lab5打卡
  • 折腾了几个月,终于调教出一架可以抢车位的无人机。然而…
  • Vue实战(四)登录/注册页的实现
  • TROUBLESHOOTING GUIDE TNS-12518 TNS listener could not hand off client connection
  • 数据结构--zkw线段树
  • GraphicsStatsService之1-dump数据的实现
  • Nginx(转)
  • react-create-app
  • Angular2开发踩坑系列-生产环境编译
  • Laravel 中的一个后期静态绑定
  • React-flux杂记
  • VUE es6技巧写法(持续更新中~~~)
  • 不上全站https的网站你们就等着被恶心死吧
  • 对话:中国为什么有前途/ 写给中国的经济学
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 排序算法之--选择排序
  • 如何解决微信端直接跳WAP端
  • 深入体验bash on windows,在windows上搭建原生的linux开发环境,酷!
  • 时间复杂度与空间复杂度分析
  • 使用common-codec进行md5加密
  • 试着探索高并发下的系统架构面貌
  • 探索 JS 中的模块化
  • 跳前端坑前,先看看这个!!
  • 我看到的前端
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • RDS-Mysql 物理备份恢复到本地数据库上
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • # Java NIO(一)FileChannel
  • # MySQL server 层和存储引擎层是怎么交互数据的?
  • #{}和${}的区别?
  • #【QT 5 调试软件后,发布相关:软件生成exe文件 + 文件打包】
  • #include到底该写在哪
  • #Linux(make工具和makefile文件以及makefile语法)
  • (14)学习笔记:动手深度学习(Pytorch神经网络基础)
  • (2.2w字)前端单元测试之Jest详解篇
  • (MIT博士)林达华老师-概率模型与计算机视觉”
  • (超简单)使用vuepress搭建自己的博客并部署到github pages上
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (附源码)基于SpringBoot和Vue的厨到家服务平台的设计与实现 毕业设计 063133
  • (简单) HDU 2612 Find a way,BFS。
  • (转载)深入super,看Python如何解决钻石继承难题
  • **CI中自动类加载的用法总结
  • .htaccess配置重写url引擎
  • .java 指数平滑_转载:二次指数平滑法求预测值的Java代码
  • .Net - 类的介绍
  • .NET CLR基本术语
  • .Net Core 中间件验签