Java并发编程学习12-任务取消(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容

java-concurrency-logo.png

引言

《任务取消》由于篇幅较多,拆分了两篇来介绍各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。

如何理解任务是可取消的 ?

如果外部代码能在某个任务正常完成之前将其置入 “完成” 状态,那么这个任务就被认为是可取消的。

大多数任务,我们都希望让它们运行直到结束,或者让它们自行停止。但是也有很多原因,导致我们需要取消这些任务,如下所示:

  • 用户请求取消。用户点击图形界面程序中的 “取消” 按钮,或者通过管理接口来发出取消请求,例如 JMXJava Management Extensions,即 Java 管理扩展)。
  • 有时间限制的操作。某个应用程序需要在有限时间内搜索问题空间,并在这个时间内选择最佳的解决方案。当计时器超时时,需要取消所有正在搜索的任务。
  • 应用程序事件。应用程序对某个问题空间进行分解并搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都将被取消。
  • 错误。网页爬虫程序搜索相关的页面,并将页面或摘要数据保存到硬盘。当一个爬虫任务发生错误时(例如,磁盘空间已满),那么所有搜索任务都会取消,此时可能会记录它们的当前状态,以便稍后重新启动。
  • 关闭。当一个程序或服务关闭时,必须对正在处理和等待处理的工作执行某种操作。在平缓的关闭过程中,当前正在执行的任务将继续执行直到完成,而在立即关闭过程中,当前的任务则可能取消。

主要内容

1. 取消策略

当我们需要取消任务时,该怎么操作呢?

Java 中没有一种安全的抢占式方式来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一种 协作机制,使请求取消的任务和代码都遵循一种协商好的协议。

下面我们来看一下如下示例【使用 volatile 类型的域来保存取消状态】:

@ThreadSafe
public class PrimeGenerator implements Runnable {
   
   
    @GuardedBy("this")
    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    // 为了使这个过程能可靠地工作,标志 cancelled 必须为 volatile 类型
    private volatile boolean cancelled;

    public void run() {
   
   
        BigInteger p = BigInteger.ONE;
        while(!cancelled) {
   
   
            LOGGER.debug("before = {}", p);
            p = p.nextProbablePrime();
            LOGGER.debug("prime = {}", p);
            synchronized (this) {
   
   
                primes.add(p);
            }
        }
    }

    public void cancel() {
   
   
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
   
   
        return new ArrayList<BigInteger>(primes);
    }
}

如上示例 PrimeGenerator 将会持续地枚举素数,直到它被取消。它使用了一种协作机制,cancel 方法将设置 cancelledtrue,任务在 搜索下一个素数之前 会检查这个标志,如果标志为 true,则任务将会自行结束。

下面我们再来看一下如下示例【让上面的素数生成器运行 1 秒钟后取消】:

public class PrimeTest {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeTest.class);

    @Test
    public void primeTest() throws InterruptedException {
   
   
        LOGGER.debug("Primes = {}", aSecondOfPrimes());
    }

    /**
     * 一个仅运行一秒钟的素数生成器
     */
    public List<BigInteger> aSecondOfPrimes() throws InterruptedException {
   
   
        PrimeGenerator generator = new PrimeGenerator();
        new Thread(generator).start();
        try {
   
   
            LOGGER.debug("sleep start");
            SECONDS.sleep(1);
        } finally {
   
   
            generator.cancel();
        }
        return generator.get();
    }
}

实际运行中,上述代码并不会刚好在运行 1 秒后停止,因为在请求取消的时刻和 run 方法中循环执行下一次检查之间可能存在延迟。cancel 方法由 finally 块调用,从而确保即使在调用 sleep 时被中断也能取消素数生成器的执行。如果 cancel 没有被调用,那么搜索素数的线程将永远运行下去。

一个可取消的任务必须拥有取消策略,在该策略中需要详细定义取消操作的三步骤:

  • How。应用程序的其他代码如何(How)请求取消该任务。
  • When。任务在何时(When)检查是否已经请求了取消。
  • What。在响应取消请求时应该执行哪些(What)操作。

上述素数生成器 PrimeGenerator 就使用了一种简单的取消策略:

客户代码通过 调用 cancel 来请求取消,PrimeGenerator 在 每次搜索素数前 首先检查是否存在取消请求,如果存在则 退出

2. 线程中断

介绍中断之前,我们首先来分析一下,上述素数生成器使用的取消机制目前存在的问题:

  • 任务的退出过程仍然需要花费一定的时间。
  • 任务中如果调用了一个阻塞的方法(如 BlockingQueue.put),它可能永远不会检查取消方法,从而永远不会结束。

下面我们再来看一下如下示例:

public class BrokenPrimeProducer extends Thread {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(BrokenPrimeProducer.class);

    private final BlockingQueue<BigInteger> queue;

    private volatile boolean cancelled = false;

    public BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
   
   
        this.queue = queue;
    }

    @Override
    public void run() {
   
   
        try {
   
   
            BigInteger p = BigInteger.ONE;
            while (!cancelled) {
   
   
                LOGGER.debug("before = {}", p);
                queue.put(p = p.nextProbablePrime());
                LOGGER.debug("prime = {}", p);
            }
        } catch (InterruptedException e) {
   
   
            LOGGER.error("InterruptedException");
        }
    }

    public void cancel() {
   
   
        cancelled = true;
        LOGGER.debug("cancel");
    }
}

public class PrimeConsumer {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeConsumer.class);

    private static final int BOUND = 100;

    private int times = 0; // 消费次数

    public void consumePrimes() throws InterruptedException {
   
   
        BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<>(BOUND);
        BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
        producer.start();
        try {
   
   
            while (needMorePrimes())
                consume(primes.take());
        } finally {
   
   
            producer.cancel();
        }
    }

    private void consume(BigInteger value) {
   
   
        times++;
        LOGGER.debug1(new Object() {
   
   }, "value = {}", value);
    }

    private boolean needMorePrimes() throws InterruptedException {
   
   
        return true;
    }
}

如上示例中,生产者线程 BrokenPrimeProducer 生成素数,并存放到阻塞队列 queue 中【其中 queue 由生产者的构造方法传入】。消费者 PrimeConsumerconsumePrimes 方法从生产者中获取素数并处理。

如果生产者的生产速度超过了消费者的处理速度,队列将被填满,其 put 方法也会一直阻塞下去。

我们来思考下遇到上述情况,如果消费者想取消生产者任务,又该怎么办呢?

消费者可以调用生产者的 cancel 方法来设置 cancelled 标志,但是因为消费者已经停止从队列中取素数,而阻塞队列 queue 的 put 方法将一直保持阻塞状态,导致生产者任务无法从阻塞的 put 方法中恢复过来,也就永远不会再检查 cancelled 标志,从而无法取消生产者任务。

下面我们来修改下消费者的 needMorePrimes 方法, 再用测试类 PrimeConsumerTest 来直观地演示下上述情况:

private boolean needMorePrimes() throws InterruptedException {
   
   
    Thread.sleep(1000); // 等待1s 再判断
    return times < 5;
}

public class PrimeConsumerTest {
   
   

    public static void main(String[] args) throws InterruptedException {
   
   
        PrimeConsumer consumer = new PrimeConsumer();
        consumer.consumePrimes();
    }
}

运行结果如下:

image.png

从上图可以看到,当消费次数达到 5 次后,消费者不再从队列中取素数并打印出来,从代码看它后面直接进入 finally 方法,并且调用生产者的 cancel 方法准备去取消生产者任务,但是生产者线程的打印日志除了 cancel 外,就一直保持 before 那,并且看程序也没有结束掉,说明此时生产者在 put 方法上一直阻塞着。

那么我们如何改造生产者,能够在 put 方法阻塞的情况下,支持消费者取消生产者任务呢?

想要实现这种功能,就不得不提到接下来要讲到的 线程中断

其实在笔者前面的《阻塞队列》博文中,曾简单介绍了阻塞方法与中断方法,大家可以快速去回顾一下。

说到线程中断,就不得不提到 Thread 类,下面简单介绍下:

每个线程都有一个 boolean 类型的中断状态。

当中断线程时,该线程的中断状态将被设置为 true

线程中还包含了中断线程、查询线程中断状态的方法,如下所示:

  • interrupt 方法能中断目标线程。
  • isInterrupted 方法能返回目标线程的中断状态。
  • 静态的 interrupted 方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

注意:

  • 调用 interrupt 方法并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。
  • 因为静态的 interrupted 方法会清除当前线程的中断状态。如果调用它时返回了 true,那么除非你想屏蔽这个中断,否则必须对它进行处理----可以抛出 InterruptedException,或者通过再次调用 interrupt 来恢复中断状态【可以结合《阻塞队列》“阻塞方法与中断方法” 那块的内容进行思考】。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。如果不触发 InterruptedException,那么中断状态将一直保持,直到明确地清除中断状态。

我们知道 Java 类库中的一些阻塞库方法支持中断。例如 Thread.sleepObject.wait 等,它们都会检查中断状态,并且在发现中断时提前返回。

它们在响应中断时执行的操作包括:

  • 清除中断状态
  • 抛出 InterruptedException,表示阻塞操作由于中断而提前结束。

如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用 Java 类库中提供的中断支持。

通过上面的了解,我们现在可以使用中断来请求取消生产者任务,以解决 BrokenPrimeProducer 存在的问题。

下面来看一下改造后的生产者的代码示例:

public class PrimeProducer extends Thread {
   
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeProducer.class);

    private final BlockingQueue<BigInteger> queue;

    public PrimeProducer(BlockingQueue<BigInteger> queue) {
   
   
        this.queue = queue;
    }

    @Override
    public void run() {
   
   
        try {
   
   
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted()) {
   
   
                LOGGER.debug("before = {}", p);
                queue.put(p = p.nextProbablePrime());
                LOGGER.debug("prime = {}", p);
            }
        } catch (InterruptedException e) {
   
   
            // 允许线程退出
            LOGGER.error("InterruptedException");
        }
    }

    public void cancel() {
   
   
        interrupt(); // 中断线程
        LOGGER.debug("interrupt");
    }
}

public class PrimeConsumer {
   
   

    public void consumePrimes() throws InterruptedException {
   
   
        // 。。。
        PrimeProducer producer = new PrimeProducer(primes);
        producer.start();
        // 。。。
    }

    // 。。。
}

上述生产者 PrimeProducer 在每次循环中,都有两个位置可以检测出中断:

  1. 阻塞的 put 方法调用中。
  2. while 循环的判断条件中。

当然因为这里调用阻塞的 put 方法,while 循环条件中显式的检测也可以去掉。但如果加上这段的话,可以使 PrimeProducer 对中断具有更高的响应性。

然后我们用自测类 PrimeConsumerNewTest 来演示一下使用中断来请求取消:

public class PrimeConsumerNewTest {
   
   
    public static void main(String[] args) throws InterruptedException {
   
   
        PrimeConsumer consumer = new PrimeConsumer();
        consumer.consumePrimes();
    }
}

运行结果如下:

image.png

从上图可以看到,当消费次数达到 5 次后,消费者不再从队列中取素数并打印出来,从代码看它后面直接进入 finally 方法,并且调用生产者的 cancel 方法,这里可以看到生产者日志打印了 interrupt,发出了中断请求;在发出中断请求之前,我们也从日志中看到生产最后停在了 before 处,说明此时生产者在 put 方法上阻塞着;而发出中断请求之后,put 方法响应了中断,并抛出了 InterruptedException,从日志看就是打印了 InterruptedException

3. 中断策略

正如任务中应该包含取消策略一样,线程同样应该包含中断策略。

那该如何理解中断策略呢?

中断策略规定了当线程发现中断请求时,应该做哪些工作,哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

其中最合理的中断策略是什么呢?

当线程发现中断请求后,就尽快退出,在必要时进行清理,并通知某个所有者该线程已经退出。

当然,除上外还可以建立其他的中断策略,如 暂停服务重新开始服务

我们知道任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使 “非所有者” 代码也可以做出响应。

这也就是为什么大多数可阻塞的库函数都只是抛出 InterruptedException 作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作

当检查到中断请求时,任务并不需要放弃所有的操作,它可以推迟处理中断请求,并直到某个更合适的时刻。因此就需要记住中断请求,并在完成当前任务后抛出 InterruptedException 或者 表示已收到中断请求。

无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将 InterruptedException 传递给调用者外还需要下执行其他操作,那么应该在捕获 InterruptedException 之后恢复中断状态:

    Thread.currentThread().interrupt();

线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如 关闭方法

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

4. 响应中断

在笔者前面的《阻塞队列》博文中,当调用可中断的阻塞函数时,例如 Thread.sleepBlockingQueue.put 等,有两种常见的方法可用于处理 InterruptedException :传递 InterruptedException 和 恢复中断。

不过需要注意的是,你不能在 catch 块中捕获到 InterruptedException 异常却不做任何处理,除非在你的代码中实现了线程的中断策略。

当然细心的小伙伴就会说了,上文中的生产者线程 PrimeProducer 不就捕获了 InterruptedException 异常却不做任何处理。这里需要解释下,虽然 PrimeProducer 屏蔽了中断,但因为它已经知道线程将要结束,并且在调用栈中已经没有上层代码需要知道中断信息。上述只是一类特殊的情况,由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获 InterruptedException 时恢复状态,可参考如下示例:

    public Task getNextTask(BlockingQueue<Task> queue) {
   
   
        boolean interrupted = false;
        try {
   
   
            while (true) {
   
   
                try {
   
   
                    return queue.take();
                } catch (InterruptedException e) {
   
   
                    interrupted = true;
                    // 重新尝试
                }
            }
        } finally {
   
   
            if (interrupted) 
                Thread.currentThread().interrupt(); // 恢复中断
        }
    }

如果过早地设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已被设置时会立即抛出 InterruptedException。(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)。

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么就不应该调用那些执行时间较长并且不响应中断的方法。

在取消过程中可能涉及除了中断状态之外的其他状态,中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。(当访问这些信息时,要确保使用同步。)

例如,当一个由 ThreadPoolExecutor 拥有的工作者线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创建一个新线程将线程池恢复到合理的规模。

总结

本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容,下篇将要介绍如何编写任务和服务,使它们能对取消请求做出响应。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
14天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
68 17
|
27天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
1月前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
64 12
|
27天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
146 2
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
63 3
|
7月前
|
Java C++
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
56 5
|
4月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
6月前
|
安全 Java 开发者
Java中的并发编程:深入理解线程池
在Java的并发编程中,线程池是管理资源和任务执行的核心。本文将揭示线程池的内部机制,探讨如何高效利用这一工具来优化程序的性能与响应速度。通过具体案例分析,我们将学习如何根据不同的应用场景选择合适的线程池类型及其参数配置,以及如何避免常见的并发陷阱。
64 1
|
6月前
|
监控 Java
Java并发编程:深入理解线程池
在Java并发编程领域,线程池是提升应用性能和资源管理效率的关键工具。本文将深入探讨线程池的工作原理、核心参数配置以及使用场景,通过具体案例展示如何有效利用线程池优化多线程应用的性能。

热门文章

最新文章