开发者社区> 问答> 正文

java 并行处理-更新值-仅一个线程更新该值-等待并继续执行

用例:

我正在编写一个主动的会话更新程序;用例适用于并行API调用或顺序API调用拦截API调用并检查令牌是否有效。

如果令牌有效,则继续调用。如果令牌无效,则首先更新令牌,然后继续进行API调用。

这对于顺序调用来说是一条小路,但是对于并行调用,我想避免DDOS对重复的令牌调用使用服务器。

相反,我有以下流程:

假设我的令牌更新程序拦截了五个并行调用的突发事件,第一个调用(线程)去更新令牌,所有其他四个将等待,直到第一个调用更新了令牌。令牌更新后,其他线程将继续正常执行,因此跳过令牌更新。

这里的另一点是,请求可能会在T = 1的5个API调用中突发出现,而在T = 2的10个API调用中突发出现,因此我也对重置值感兴趣(例如在executeLatch中)

现在,我为实现此目的而编写的代码已加载了框架和行话,为了简单起见,我想出了下面的代码框架:

class SingleUpdater {

private val lock = ReentrantLock()
private val executionLatch = AtomicBoolean(false)

// Say valueToBeUpdated is my session token 
private val valueToBeUpdated = AtomicInteger(0)

private val awaitingThreadsCounter = AtomicInteger(0)

// Method to emulate bursts of threads 
fun start() {
    for (i in 0 until 100) {
        Thread(this::execute).start()
        Thread(this::execute).start()
        Thread(this::execute).start()
    }

    // Toggle burst emulator 
    Thread.sleep(3000)

    for (i in 0 until 1) {
        Thread(this::execute).start()
        Thread(this::execute).start()
        Thread(this::execute).start()
    }

}


private fun execute() {

    println("Trying to acquire lock ${Thread.currentThread().name}")
    try {
        awaitingThreadsCounter.set(awaitingThreadsCounter.get() + 1)
        println("============================= awaitingThreadsCounter = ${awaitingThreadsCounter.get()} ============================= ")
        lock.lock()
        println("I am Thread ${Thread.currentThread().name} I have acquired lock")
        if (executionLatch.get()) {
            continueWithRestOfExecution()
        } else {
            doUpdate()
            continueWithRestOfExecution()
        }

    } catch (e: Exception) {
        e.printStackTrace()

    } finally {
        println("Released by ${Thread.currentThread().name}")
        lock.unlock()

    }

}

private fun continueWithRestOfExecution() {
    if (awaitingThreadsCounter.get() >= 0) awaitingThreadsCounter.set(awaitingThreadsCounter.get() - 1)
    println("I am Thread: ${Thread.currentThread().name} and I am executing continueWithRestOfExecution")
    if (awaitingThreadsCounter.get() == 0) {
        println(" =============================  BALANCED  ============================= ")
        executionLatch.set(false)
    }
}


private fun doUpdate() {
    try {
        Thread.sleep(3000)
        valueToBeUpdated.set(valueToBeUpdated.get() + 1)
        println("=============================== Value is: $valueToBeUpdated ${Thread.currentThread().name} ===============================")
        executionLatch.set(true)
    } catch (ex: Exception) {
        ex.printStackTrace()
    }
}

} 如果您有更好的实施方法,请在此处帮助分析边缘情况。

展开
收起
被纵养的懒猫 2019-10-09 16:38:28 601 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
The Reactive Cloud Native Arch 立即下载
多IO线程优化版 立即下载