当前位置: 首页 > news >正文

nsqd 源码,写入数据

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

切记: chanel 有有自己的持久化 队列

topic 也有自己的持久化队列,两个是相互独立

给一个topic put 数据的:

// PutMessage writes to the appropriate incoming message channel

func (t *Topic) PutMessage(msg *nsq.Message) error {

    t.RLock()

    defer t.RUnlock()

    if atomic.LoadInt32(&t.exitFlag) == 1 {

        return errors.New("exiting")

    }

    t.incomingMsgChan <- msg

    atomic.AddUint64(&t.messageCount, 1)

    return nil

}


incomingMsgChan  是:incomingMsgChan:   make(chan *nsq.Message, 1),
        memoryMsgChan:     make(chan *nsq.Message, context.nsqd.options.MemQueueSize),
        
 初始化topic 会执行:
 
 t.waitGroup.Wrap(func() { t.router() })
 t.waitGroup.Wrap(func() { t.messagePump() })
 
 func (t *Topic) router() {
    var msgBuf bytes.Buffer
    for msg := range t.incomingMsgChan { //当有数据的时候执行
        select {
        case t.memoryMsgChan <- msg://memoryMsgChan 可以写入
        default:
            err := WriteMessageToBackend(&msgBuf, msg, t.backend)// 默认持久化到硬盘
            if err != nil {
                log.Printf("ERROR: failed to write message to backend - %s", err.Error())
                // theres not really much we can do at this point, you're certainly
                // going to lose messages...
            }
        }
    }

    log.Printf("TOPIC(%s): closing ... router", t.name)
}
 
 WriteMessageToBackend  会调用:
 func (d *DiskQueue) Put(data []byte) error {
    d.RLock()
    defer d.RUnlock()

    if d.exitFlag == 1 {
        return errors.New("exiting")
    }

    d.writeChan <- data 
    return <-d.writeResponseChan
}
 
 初始化一个:NewDiskQueue 会定时执行:
 func (d *DiskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64
    var r chan []byte

    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        count++
        // dont sync all the time :)
        if count == d.syncEvery {
            count = 0
            d.needSync = true
        }

        if d.needSync {
            err = d.sync()
            if err != nil {
                log.Printf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err.Error())
            }
        }

        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    log.Printf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err.Error())
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        case r <- dataRead:
            d.moveForward()
        case <-d.emptyChan:
            d.emptyResponseChan <- d.deleteAllFiles()
        case dataWrite := <-d.writeChan:  //writeChan   刚刚写入数据的chan
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            d.needSync = true
        case <-d.exitChan:
            goto exit
        }
    }

exit:
    log.Printf("DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}


转载于:https://my.oschina.net/u/1388024/blog/213438

相关文章:

  • Java基础加强总结(二)——泛型
  • eclipse fail to create java virtual machine
  • UIImageView -- 选择图片、循环播放
  • JTree/DefaultMutableTreeNode 树形结构
  • 各大银行的收发标准
  • CSS 元素透明
  • 【转】网站布局--瀑布流式布局
  • 使用Python实现Hadoop MapReduce程序
  • centos svn快速搭建
  • 一个IO的传奇一生(8) -- elevator子系统
  • linux:shell脚本格式
  • CSS自定义select下拉选择框(不用其他标签模拟)
  • 关于一级指针和二级指针作为参数的探究
  • 2014年4月15日星期二java学习历程
  • Amazon Workspace DaaS服务快速导读
  • 230. Kth Smallest Element in a BST
  • 78. Subsets
  • chrome扩展demo1-小时钟
  • es的写入过程
  • HTML5新特性总结
  • JavaScript 无符号位移运算符 三个大于号 的使用方法
  • java第三方包学习之lombok
  • Linux gpio口使用方法
  • opencv python Meanshift 和 Camshift
  • pdf文件如何在线转换为jpg图片
  • python_bomb----数据类型总结
  • vagrant 添加本地 box 安装 laravel homestead
  • 看完九篇字体系列的文章,你还觉得我是在说字体?
  • 力扣(LeetCode)22
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 思维导图—你不知道的JavaScript中卷
  • 进程与线程(三)——进程/线程间通信
  • #include到底该写在哪
  • #中国IT界的第一本漂流日记 传递IT正能量# 【分享得“IT漂友”勋章】
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (MIT博士)林达华老师-概率模型与计算机视觉”
  • (编译到47%失败)to be deleted
  • (二)JAVA使用POI操作excel
  • (附源码)ssm航空客运订票系统 毕业设计 141612
  • (免费领源码)python+django+mysql线上兼职平台系统83320-计算机毕业设计项目选题推荐
  • (十三)Maven插件解析运行机制
  • (十一)图像的罗伯特梯度锐化
  • (四)Controller接口控制器详解(三)
  • (算法)Travel Information Center
  • (转)Android学习笔记 --- android任务栈和启动模式
  • (最优化理论与方法)第二章最优化所需基础知识-第三节:重要凸集举例
  • .NET CF命令行调试器MDbg入门(三) 进程控制
  • .NET Core WebAPI中使用swagger版本控制,添加注释
  • .NET Framework杂记
  • .net 开发怎么实现前后端分离_前后端分离:分离式开发和一体式发布
  • .NET 中创建支持集合初始化器的类型
  • // an array of int
  • []常用AT命令解释()
  • [8481302]博弈论 斯坦福game theory stanford week 1
  • [ABP实战开源项目]---ABP实时服务-通知系统.发布模式