当前位置: 首页 > news >正文

世界上最简单的无等待算法(getAndIncrement)

本文基于compareandswap指令完成一个无等待并发算法。
根据维基百科,它的定义如下:

An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes.

本文的方法参考了Wait-free queues with multiple enqueuers and dequeuers,A methodology for creating fast wait-free data structures,A practical wait-free simulation for lock-free data structures。
它的目标非常简单,就是把一个值通过原子指令CMPXCHG加N,然后返回原来的值。
lock-free版本如下:

    public final int getAndIncrement(int add) {
        for (;;) {
            int current = get();
            int next = current + add;
            if (compareAndSet(current, next))
                return current;
        }
    }

那么,这里的方式是,compareAndSet失败了重试。并且导致它失败的那一方必定取得了进展。
这里的问题在于,究竟需要重试多少次compareAndSet才能成功呢?
假如一直处于竞争环境下,这个重试次数是没有上限的。这里也就是饥饿的问题。
有没有办法给出一个上限,哪怕这个上限比较大
这种并发的性质被称为wait-freedom。
先给出一个我实现好了的代码,之后再来讲思路:

    public static int getAndIncrement(int index, int add) {
        //fast-path, 最多MAX次。
        int count = MAX;
        for(;;) {
            ValueObj valueObj_ = valueObj;
            if(valueObj_.threadObj == null) {
                ValueObj valueObjNext = new ValueObj(valueObj_.value + add, null);
                if(casValueObj(valueObj_, valueObjNext)) {
                    StateObj myState = states[index];
                    //前进一步,每assistStep,尝试一个帮助。
                    if(((++myState.steps) & myState.assistStep) == 0){
                        long helpThread = myState.index;
                        help(helpThread);
                        //下一个协助的对象。
                        ++myState.index;
                    }
                    return valueObj_.value;
                }
                Thread.yield();Thread.yield();Thread.yield();Thread.yield();
            } else {
                helpTransfer(valueObj_);
            }
            
            if(--count == 0)
                break;
        }
//        System.out.println("here " + inter.incrementAndGet());
        for(int j = 0; j < bigYields; ++j)
            Thread.yield();
        
        //slow-path,将自己列为被帮助对象。
        ThreadObj myselfObj = new ThreadObj(new ThreadObj.WrapperObj(null, false), add);
        setThreadObj(index, myselfObj);
        //开始帮助自己
        ValueObj result = help(index);
        setThreadObj(index, null);
        return result.value;
    }
    
    // valueObj->threadObj->wrapperObj->valueObj。
    // step 1-3,每一个步骤都不会阻塞其他步骤。
    // 严格遵守以下顺序: 
    // step 1: 通过将ValueObj指向ThreadObj:
    //         atomic: (value, null)->(value, ThreadObj)来锚定该值                      //确定该value归ThreadObj对应线程所有。
    // step 2: 通过将ThreadObj包裹的WrapperObj,
    //         atomic: 从(null, false)更新为(valueObj, true)来更新状态的同时传递value    //对应线程通过isFinish判定操作已完成。
    // step 3: 更新ValueObj,提升value,同时设置ThreadObj为null:
    //         atomic: (value, ThreadObj)->(value+1, null)完成收尾动作                 //此时value值回到了没有被线程锚定的状态,也可以看做step1之前的状态。
    private static ValueObj help(long helpIndex) {
        helpIndex = helpIndex % N;
        ThreadObj helpObj = getThreadObj(helpIndex);
        ThreadObj.WrapperObj wrapperObj;
        if(helpObj == null || helpObj.wrapperObj == null)
            return null;
        //判定句,是否该线程对应的操作未完成,(先取valueObj,再取isFinish,这很重要)。
        ValueObj valueObj_ = valueObj;
        while(!(wrapperObj = helpObj.wrapperObj).isFinish) {
            /*ValueObj valueObj_ = valueObj;*/
            if(valueObj_.threadObj == null) {
                ValueObj intermediateObj = new ValueObj(valueObj_.value, helpObj);
                //step1
                if(!casValueObj(valueObj_, intermediateObj)) {
                    valueObj_ = valueObj;
                    continue;
                }
                //step1: 锚定该ValueObj,接下来所有看到该valueObj的线程,都会一致地完成一系列操作.
                valueObj_ = intermediateObj;
            }
            //完成ValueObj、ThreadObj中的WrapperObj的状态迁移。
            helpTransfer(valueObj_);
            valueObj_ = valueObj;
        }
        valueObj_ = wrapperObj.value;
        helpValueTransfer(valueObj_);
        //返回锚定的valueObj。
        return valueObj_;
    }
    
    private static void helpTransfer(ValueObj valueObj_) {
        ThreadObj.WrapperObj wrapperObj = valueObj_.threadObj.wrapperObj;
        //step2: 先完成ThreadObj的状态迁移,WrapperObj(valueObj,true)分别表示(值,完成),原子地将这两个值喂给threadObj。
        if(!wrapperObj.isFinish) {
            ThreadObj.WrapperObj wrapValueFiniash = new ThreadObj.WrapperObj(valueObj_, true);
            valueObj_.threadObj.casWrapValue(wrapperObj, wrapValueFiniash);
        }
        //step3: 最后完成ValueObj上的状态迁移
        helpValueTransfer(valueObj_);
    }
    
    private static ValueObj helpValueTransfer(ValueObj valueObj_) {
        if(valueObj_ == valueObj) {
            ValueObj valueObjNext = new ValueObj(valueObj_.value + valueObj_.threadObj.add, null);
            casValueObj(valueObj_, valueObjNext);
        }
        return valueObj_;
    }

首先全局作用域有个值ValueObj,一种自然的想法是ValueObj应该值包含一个int值value,图[1]:
图片描述
如图所示,无锁并发的版本无非就是从一个值原子地变化为另一个+1的值。
但是我们要的是wait-free,势必涉及到线程之间的互助。如何在线程之间传递信息从而实现协助呢?
我们给ValueObj增加一个字段threadObj,那么上图的版本现在可以视为,图[2]:
图片描述
那么现在增加了一个数据ThreadObj(后面会解释如何使用),ValueObj会有两种状态:

  1. (Value, null)
  2. (Value, threadObj)

那么好,现在是两种情况:

  1. (value, null),代表这个value我可以去竞争。
  2. (value, threadObj),代表这个value已经被threadObj对应的线程预定了,剩余线程达成共识。

所以情况2我们能做什么呢?
这里就涉及到无等待算法的核心技术,help,通俗地说,就是多个线程间的相互协助。
我们会先将value,原子地喂给threadObj,再将(value, threadObj)原子地迁移到(value+1, null)。
这里的顺序很重要,因为假如线程A先迁移了(value, threadObj)->(value+1, nulll),那么剩下的其他所有线程都无法再看到threadObj,也就是说尽管value已经增加了,但是threadObj对应线程被锁住了。只能等待线程A将value喂给threadObj。
ThreadObj中包裹着WrapperObj,WrapperObj中初始化为(null, false),将会原子地转变为(ValueObj, ture),从而ThreadObj拿到了ValueObj。
所以这里一共是三个steps:
step 1: (value, null) atomic-> (value, threadObj)
step 2: threadObj.WrapperObj(null, false) atomic-> WrapperObj(valueObj, true)
stpe 3: (value, trheadObj) atomic-> (value + 1, null)
如图所示[3]:
图片描述

其中灰框黑字标识了关键的3个step.
图[3]所示是一种通用的无等待构造技术。
对于其中的ThreadObj以及每个线程协助其他线程的策略states,可以通过数组来构造,具体可在代码里看到。
最后我们的程序结构,如图[4]:
图片描述
最后给出完整的可执行代码 WaitFreeGetAndIncrement:

https://github.com/Pslydhh/Ps...

or WaitFreeAtomic:

https://github.com/Pslydhh/Wa...

,每个loop,100个线程每个操作100000次。
https://github.com/Pslydhh/Ps...
参考文献:
0:Wait-free synchronization
1:Wait-free queues with multiple enqueuers and dequeuers
2:A methodology for creating fast wait-free data structures
3:A practical wait-free simulation for lock-free data structures

相关文章:

  • 项目Alpha冲刺Day1
  • RHEL6基础三十二之系统默认语言修改
  • [转]建行B2B支付回调参数乱码现象解析
  • 制作简易无限魔方
  • 【技巧篇】解决悬浮的header、footer遮挡内容的处理技巧
  • 聚集索引:三级阶梯SQL Server索引
  • es 5 数组reduce方法记忆
  • 用vlan划分实现全网互通,并隔离c1,c3和c2,c4
  • Nginx 目录配置详解
  • 详解最大似然估计(MLE)、最大后验概率估计(MAP),以及贝叶斯公式的理解...
  • linux之理解文件系统上的复制,移动,删除
  • Linux运维 第四阶段 (三) MySQL的SQL语句
  • C# GetSchema Get List of Table 获取数据库中所有的表名以及表中的纪录条数的方法
  • XML技术-Schema约束-Dom4j-Xpath详解
  • 从windows server的文件服务到分布式文件服务(二)
  • ES6指北【2】—— 箭头函数
  • 《Javascript数据结构和算法》笔记-「字典和散列表」
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • C++回声服务器_9-epoll边缘触发模式版本服务器
  • ES6语法详解(一)
  • ESLint简单操作
  • learning koa2.x
  • macOS 中 shell 创建文件夹及文件并 VS Code 打开
  • SpringBoot 实战 (三) | 配置文件详解
  • Sublime text 3 3103 注册码
  • 笨办法学C 练习34:动态数组
  • 从零开始在ubuntu上搭建node开发环境
  • 前端面试题总结
  • 如何编写一个可升级的智能合约
  • 深入体验bash on windows,在windows上搭建原生的linux开发环境,酷!
  • 为视图添加丝滑的水波纹
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • Prometheus VS InfluxDB
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • 进程与线程(三)——进程/线程间通信
  • ![CDATA[ ]] 是什么东东
  • ###C语言程序设计-----C语言学习(6)#
  • #define与typedef区别
  • (2015)JS ES6 必知的十个 特性
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第2节(共同的基类)
  • (Redis使用系列) Springboot 使用redis实现接口Api限流 十
  • (二)linux使用docker容器运行mysql
  • (二)换源+apt-get基础配置+搜狗拼音
  • (六)vue-router+UI组件库
  • (全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF
  • (转) Face-Resources
  • (转)人的集合论——移山之道
  • ****Linux下Mysql的安装和配置
  • *setTimeout实现text输入在用户停顿时才调用事件!*
  • .Net CF下精确的计时器
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • .net6使用Sejil可视化日志
  • /bin/rm: 参数列表过长"的解决办法
  • @Autowired自动装配
  • @ModelAttribute使用详解