非阻塞同步算法实战(三)-LatestResultsProvider

简介: 阅读本文前,需要读者对happens-before比较熟悉,了解非阻塞同步的一些基本概念。本文主要为happens-before法则的灵活运用,和一些解决问题的小技巧,分析问题的方式。

前言

阅读本文前,需要读者对happens-before比较熟悉,了解非阻塞同步的一些基本概念。本文主要为happens-before法则的灵活运用,和一些解决问题的小技巧,分析问题的方式。


背景介绍

原始需求为:本人当时在编写一个正则替换工具,里面会动态地显示所有的匹配结果(包括替换预览),文本、正则表达式、参数,这些数据的其中一项发生了变化,结果就应该被更新,为了提供友好的交互体验,数据变化时,应该是发起一个异步请求,由另一个独立的线程来完成运算,完成后通知UI更新结果。由于是动态显示,所以提交会非常频繁。


需求描述

需要这样一个工具类,允许用户频繁地提交数据(本文之后以“submit”表示该操作)和更新结果(本文之后以“update”表示该操作),submit时,如果当前有进行中的运算,则应该取消,使用新参数执行新的运算;update时,如果当前没有进行中的运算(处于阻塞状态),并且当前结果不是最新的,则唤醒该线程,使用当前的新数据,执行新的运算。此处之所以分为submit和update两个方法,是为了支持手动更新,即点击更新按钮时,才更新结果。


此外,出于练手的原因,也出于编写一个功能全面,更实用的工具的目的,我还加入了一些额外的需求:

1、引入多线程场景,update和submit均可由多个线程同时发起,该工具类应设计成线程安全的。

2、允许延迟执行运算,如果延时内执行submit,仅重新计算延时。如果运算不方便取消,在短时间频繁submit的场景下,延时会是一个很好的应对办法。

3、允许设置一个最大延迟时间,作为延迟开启运算的补充。当长时间频繁submit时,会形成这样的局面,一直未进入运算环节,新结果计算不出来,上一次计算结果却是很早以前的。如果需要显示一个较新但不是最新的结果,最大延迟时间将会很有用。

4、提供主动取消方法,主动取消正在进行的运算。

5、update时,允许等待运算完成,同时也可设置超时时间。当主动取消、超时、完成了当前或更(更加的意思)新的数据对应的运算时,结束等待。


需求交待完了,有兴趣有精力的读者,可以先试着思考下怎么实现。


问题分析

该工具应该维护一个状态字段,这样才能在发起某个操作时,根据所处的状态作出正确的动作,如:如果当前不处于停止状态(或者主动取消状态,原因见下文),执行update就不需要唤醒运算线程。简单分析可知,至少应该有这样几种状态:

1、停止状态:当前没有运算任务,线程进入阻塞状态,主动取消和运算完成后,进入该状态

2、延迟状态:设置了延迟开启运算时,进入运算前,处于该状态

3、运算状态:正在执行运算

4、主动取消状态:当发起主动取消时,进入该状态

5、新任务状态:当时有新的运算任务时,进入该状态,然后重新进入运算状态


延迟

再来看一下延迟,如果延迟500毫秒,就每次sleep(500),那么期间再submit怎么办?将它唤醒然后重新sleep(500)吗?显然不行,成本太大了。


我有一个小技巧:将500分成多个合适的等份,使用一个计数器,每次sleep一个等份,计数器加1,如果发起submit,仅把计数器置0即可,虽然看起来线程的状态切换变多了,但应对频繁重置时,它更稳定。虽然时间上会上下波动一个等份,但此处并不需要多么精确。


现在还面临这样一个问题,如何知道当前是处于延迟状态并计数器置0?取出状态值进行判断,然后置0,这方法显然不行,因为置0的时候,可能状态已经变了,所以你无法知道该操作是否生效了。


我想到的办法是,再引入一个延迟重置状态。如果处于该状态,则下一次计数器加1时,将计数器重置,状态变更是可以知道成功与否的。


状态变更

有些状态的变更是有条件的,比如说当前处于取消状态,就不能把它转为运算状态,运算状态只能由新任务状态、延迟状态(延迟完成后执行运算)或延迟重置状态转入。这种场景正好跟CAS一致,所以,使用一个AtomicInteger来表示状态。


分析下各状态之间的转换,可以得出下面的状态变更图:

image.png

蓝色的a(bcd)|(e)f线路为停止状态下,发起一次update,运算完重新回到停止的过程,开启延迟时是bcd,否则是e。

红色的线j表示超过了最大延迟时间,退出延迟,进入运算状态(也可以是d)。


绿色的线ghi(包括a)表示:如果发起了submit或update,状态应该怎么改变。如果处于延迟重置、新任务则不需要进行任何操作;如果处于延迟状态,则转为延迟重置即可;如果处于运算状态,则可能使用了旧参数,应该转为新任务;如果为主动取消或停止状态,并且是调用update方法,则转为新任务,并且可能处于阻塞状态,应该唤醒该线程。


黑色的线l表示,可在任意状态下发起主动取消,进入该状态。然后通知等待线程后,转入停止状态,对应紫色的k,如果在停止状态下发起主动取消,则仅转为主动取消状态,不会通知等待线程。所以当线程阻塞时,可能处于停止状态或者主动取消状态。


顺序问题

上面已经分析到,当submit时,应该把延迟转为延迟重置、或运算转为新任务,这两个尝试的顺序是不是也有讲究呢?


是的,因为正常执行流程a(bcd)|(e)f中,运算状态在延迟状态之后,假如先尝试运算转为新任务,可能此时为延迟状态,故失败,再尝试延迟转为延迟重置时,状态在这期间从刚才的延迟转为了运算,故两次尝试都失败了,本应该重置延迟的,却什么也没干,这是错误的。而将两次尝试顺序调换一下,只要状态为延迟或运算,那么两次状态转换尝试中,一定有一次会成功。


之后的代码中还有多处类似的顺序细节。


解决方案

下面给出完整的代码,除去等待运算完成那部分,其它地方均为wait-free级别的实现。

calculateResult是具体执行运算的方法;上文中的submit对应代码里的updateParametersVersion方法,上文中的update对应剩余几个update方法。

updateAndWait方法中,使用了上一篇中讲到的BoundlessCyclicBarrier,其维护的版本号就是参数的版本号ParametersVersion。

/**

* @author trytocatch@163.com

* @date 2013-2-2

*/

publicabstractclassLatestResultsProvider {

   /** update return value */

   publicstaticfinalintUPDATE_FAILED= -1;

   publicstaticfinalintUPDATE_NO_NEED_TO_UPDATE=0;

   publicstaticfinalintUPDATE_SUCCESS=1;

   publicstaticfinalintUPDATE_COMMITTED=2;

   /** update return value */

   /** work states*/

   privatestaticfinalintWS_OFF=0;

   privatestaticfinalintWS_NEW_TASK=1;

   privatestaticfinalintWS_WORKING=2;

   privatestaticfinalintWS_DELAYING=3;

   privatestaticfinalintWS_DELAY_RESET=4;

   privatestaticfinalintWS_CANCELED=5;

   /** work states*/

   privatefinal AtomicInteger workState;

   privateintsleepPeriod=30;

   privatefinal AtomicInteger parametersVersion;

   privatevolatileint updateDelay;// updateDelay>=0

   privatevolatileint delayUpperLimit;

   privatefinal BoundlessCyclicBarrier barrier;

   private Thread workThread;

   /**

    *

    * @param updateDelay unit: millisecond

    * @param delayUpperLimit limit the sum of the delay, disabled

    * while delayUpperLimit<0, unit: millisecond

    */

   publicLatestResultsProvider(int updateDelay, int delayUpperLimit) {

       if (updateDelay < 0)

           this.updateDelay = 0;

       else

           this.updateDelay = updateDelay;

       this.delayUpperLimit = delayUpperLimit;

       barrier = newBoundlessCyclicBarrier(0);

       workState = newAtomicInteger(WS_OFF);

       parametersVersion = newAtomicInteger(0);

       initThread();

   }

   privatevoidinitThread() {

       workThread = newThread("trytocatch's worker") {

           @Override

           publicvoidrun() {

               intsleepCount=0;

               for (;;) {

                   try {

                       while (!workState.compareAndSet(WS_NEW_TASK,

                               updateDelay > 0 ? WS_DELAY_RESET : WS_WORKING)) {

                           if (workState.compareAndSet(WS_CANCELED, WS_OFF)) {

                               barrier.cancel();

                           }

                           LockSupport.park();

                           interrupted();

                       }

                       if (workState.get() == WS_DELAY_RESET) {

                           intdelaySum=0;

                           for (;;) {

                               if (workState.compareAndSet(WS_DELAY_RESET,

                                       WS_DELAYING)) {

                                   sleepCount = (updateDelay + sleepPeriod - 1)

                                           / sleepPeriod;

                               }

                               sleep(sleepPeriod);

                               if (--sleepCount <= 0

                                       && workState.compareAndSet(WS_DELAYING,

                                               WS_WORKING))

                                   break;

                               if (delayUpperLimit >= 0) {

                                   delaySum += sleepPeriod;

                                   if (delaySum >= delayUpperLimit) {

                                       if (!workState.compareAndSet(

                                               WS_DELAYING, WS_WORKING))

                                           workState.compareAndSet(

                                                   WS_DELAY_RESET, WS_WORKING);

                                       break;

                                   }

                               }

                               if (workState.get() != WS_DELAYING

                                       && workState.get() != WS_DELAY_RESET)

                                   break;

                           }

                       }

                       if (isWorking()) {

                           intworkingVersion= parametersVersion.get();

                           try {

                               calculateResult();

                               if (workState.compareAndSet(WS_WORKING, WS_OFF))

                                   barrier.nextCycle(workingVersion);

                           } catch (Throwable t) {

                               t.printStackTrace();

                               workState.set(WS_CANCELED);

                           }

                       }

                   } catch (InterruptedException e) {

                       workState.compareAndSet(WS_DELAYING, WS_CANCELED);

                       workState.compareAndSet(WS_DELAY_RESET, WS_CANCELED);

                   }

               }// for(;;)

           }// run()

       };

       workThread.setDaemon(true);

       workThread.start();

   }

   publicintgetUpdateDelay() {

       return updateDelay;

   }

   /**

    * @param updateDelay

    *            delay time. unit: millisecond

    */

   publicvoidsetUpdateDelay(int updateDelay) {

       this.updateDelay = updateDelay < 0 ? 0 : updateDelay;

   }

   publicintgetDelayUpperLimit() {

       return delayUpperLimit;

   }

   /**

    * @param delayUpperLimit limit the sum of the delay, disabled

    * while delayUpperLimit<0, unit: millisecond

    */

   publicvoidsetDelayUpperLimit(int delayUpperLimit) {

       this.delayUpperLimit = delayUpperLimit;

   }

   publicfinalvoidstopCurrentWorking() {

       workState.set(WS_CANCELED);

   }

   /**

    * @return NO_NEED_TO_UPDATE, COMMITTED

    */

   publicfinalintupdate() {

       if (isResultUptodate())

           return UPDATE_NO_NEED_TO_UPDATE;

       if (workState.compareAndSet(WS_CANCELED, WS_NEW_TASK)

               || workState.compareAndSet(WS_OFF, WS_NEW_TASK))

           LockSupport.unpark(workThread);

       return UPDATE_COMMITTED;

   }

   /**

    * @param timeout

    *            unit:nanoseconds

    * @return FAILED, NO_NEED_TO_UPDATE, SUCCESS

    * @throws InterruptedException

    */

   publicfinalintupdateAndWait(long nanosTimeout)

           throws InterruptedException {

       intnewVersion= parametersVersion.get();

       if (update() == UPDATE_NO_NEED_TO_UPDATE)

           return UPDATE_NO_NEED_TO_UPDATE;

       barrier.awaitWithAssignedVersion(newVersion, nanosTimeout);

       return barrier.getVersion() - newVersion >= 0 ? UPDATE_SUCCESS

               : UPDATE_FAILED;

   }

   /**

    * @return FAILED, NO_NEED_TO_UPDATE, SUCCESS

    * @throws InterruptedException

    */

   publicfinalintupdateAndWait()throws InterruptedException {

       return updateAndWait(0);

   }

   publicfinalbooleanisResultUptodate() {

       return parametersVersion.get() == barrier.getVersion();

   }

   /**

    * be used in calculateResult()

    * @return true: the work state is working, worth to calculate the

    * result absolutely, otherwise you can cancel the current calculation

    */

   protectedfinalbooleanisWorking() {

       return workState.get()==WS_WORKING;

   }

   /**

    * you must call this after update the parameters, and before calling the

    * update

    */

   protectedfinalvoidupdateParametersVersion() {

       intpVersion= parametersVersion.get();

       //CAS failed means that another thread do the same work already

       if (parametersVersion.compareAndSet(pVersion, pVersion + 1))

           if (!workState.compareAndSet(WS_DELAYING, WS_DELAY_RESET))

               workState.compareAndSet(WS_WORKING, WS_NEW_TASK);

   }

   /**

    * implement this to deal with you task

    */

   protectedabstractvoidcalculateResult();

}

代码中,我直接在构造方法里开启了新的线程,一般来说,是不推荐这样做的,但在此处,除非在构造还未完成时就执行update方法,否则不会引发什么问题。


最后,附上该正则替换工具的介绍和下载地址:http://www.cnblogs.com/trytocatch/p/RegexReplacer.html


小结

状态变更非常适合使用非阻塞算法,并且还能够达到wait-free级别。限于篇幅,有些没讲到的细节,请读者借助代码来理解吧,如有疑问,欢迎回复讨论。


系列总结

本实战系列就到此结束了,简单总结下。


非阻塞同步相对于锁同步而言,由代码块,转为了点,是另一种思考方式。

有时,无法做到一步完成,也许可以分成两步完成,同样可以解决问题,ConcurrentLinkedQueue就是这么做的。


如果需要维护多个数据之间的某种一致关系,则可以将它们封装到一个类中,更新时采用更新该类对象的引用的方式。


众所周知,锁同步算法是难以测试的,非阻塞同步算法更加难以测试,我个人认为,其正确性主要靠慎密的推敲和论证。


非阻塞同步算法比锁同步算法要显得更复杂些,如果对性能要求不高,对非阻塞算法掌握得还不太熟练,建议不要使用非阻塞算法,锁同步算法要简洁得多,也更容易维护,如上面所说的,两条看似没有顺序的语句,调换下顺序,可能就会引发BUG。


相关文章
|
2月前
|
算法 数据可视化 测试技术
HNSW算法实战:用分层图索引替换k-NN暴力搜索
HNSW是一种高效向量检索算法,通过分层图结构实现近似最近邻的对数时间搜索,显著降低查询延迟。相比暴力搜索,它在保持高召回率的同时,将性能提升数十倍,广泛应用于大规模RAG系统。
209 10
HNSW算法实战:用分层图索引替换k-NN暴力搜索
|
7月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
2月前
|
机器学习/深度学习 缓存 算法
微店关键词搜索接口核心突破:动态权重算法与语义引擎的实战落地
本文详解微店搜索接口从基础匹配到智能推荐的技术进阶路径,涵盖动态权重、语义理解与行为闭环三大创新,助力商家提升搜索转化率、商品曝光与用户留存,实现技术驱动的业绩增长。
|
4月前
|
机器学习/深度学习 算法 文件存储
神经架构搜索NAS详解:三种核心算法原理与Python实战代码
神经架构搜索(NAS)正被广泛应用于大模型及语言/视觉模型设计,如LangVision-LoRA-NAS、Jet-Nemotron等。本文回顾NAS核心技术,解析其自动化设计原理,探讨强化学习、进化算法与梯度方法的应用与差异,揭示NAS在大模型时代的潜力与挑战。
1095 6
神经架构搜索NAS详解:三种核心算法原理与Python实战代码
|
2月前
|
存储 人工智能 算法
从零掌握贪心算法Java版:LeetCode 10题实战解析(上)
在算法世界里,有一种思想如同生活中的"见好就收"——每次做出当前看来最优的选择,寄希望于通过局部最优达成全局最优。这种思想就是贪心算法,它以其简洁高效的特点,成为解决最优问题的利器。今天我们就来系统学习贪心算法的核心思想,并通过10道LeetCode经典题目实战演练,带你掌握这种"步步为营"的解题思维。
|
3月前
|
机器学习/深度学习 资源调度 算法
遗传算法模型深度解析与实战应用
摘要 遗传算法(GA)作为一种受生物进化启发的优化算法,在复杂问题求解中展现出独特优势。本文系统介绍了GA的核心理论、实现细节和应用经验。算法通过模拟自然选择机制,利用选择、交叉、变异三大操作在解空间中进行全局搜索。与梯度下降等传统方法相比,GA不依赖目标函数的连续性或可微性,特别适合处理离散优化、多目标优化等复杂问题。文中详细阐述了染色体编码、适应度函数设计、遗传操作实现等关键技术,并提供了Python代码实现示例。实践表明,GA的成功应用关键在于平衡探索与开发,通过精心调参维持种群多样性同时确保收敛效率
|
3月前
|
机器学习/深度学习 边缘计算 人工智能
粒子群算法模型深度解析与实战应用
蒋星熠Jaxonic是一位深耕智能优化算法领域多年的技术探索者,专注于粒子群优化(PSO)算法的研究与应用。他深入剖析了PSO的数学模型、核心公式及实现方法,并通过大量实践验证了其在神经网络优化、工程设计等复杂问题上的卓越性能。本文全面展示了PSO的理论基础、改进策略与前沿发展方向,为读者提供了一份详尽的技术指南。
粒子群算法模型深度解析与实战应用
|
10月前
|
人工智能 编解码 算法
DeepSeek加持的通义灵码2.0 AI程序员实战案例:助力嵌入式开发中的算法生成革新
本文介绍了通义灵码2.0 AI程序员在嵌入式开发中的实战应用。通过安装VS Code插件并登录阿里云账号,用户可切换至DeepSeek V3模型,利用其强大的代码生成能力。实战案例中,AI程序员根据自然语言描述快速生成了C语言的base64编解码算法,包括源代码、头文件、测试代码和CMake编译脚本。即使在编译错误和需求迭代的情况下,AI程序员也能迅速分析问题并修复代码,最终成功实现功能。作者认为,通义灵码2.0显著提升了开发效率,打破了编程语言限制,是AI编程从辅助工具向工程级协同开发转变的重要标志,值得开发者广泛使用。
8835 71
DeepSeek加持的通义灵码2.0 AI程序员实战案例:助力嵌入式开发中的算法生成革新
|
存储 缓存 算法
前端算法:优化与实战技巧的深度探索
【10月更文挑战第21天】前端算法:优化与实战技巧的深度探索
228 1
|
大数据 UED 开发者
实战演练:利用Python的Trie树优化搜索算法,性能飙升不是梦!
在数据密集型应用中,高效搜索算法至关重要。Trie树(前缀树/字典树)通过优化字符串处理和搜索效率成为理想选择。本文通过Python实战演示Trie树构建与应用,显著提升搜索性能。Trie树利用公共前缀减少查询时间,支持快速插入、删除和搜索。以下为简单示例代码,展示如何构建及使用Trie树进行搜索与前缀匹配,适用于自动补全、拼写检查等场景,助力提升应用性能与用户体验。
279 2

热门文章

最新文章