线程-线程池1

简介: 今天我们说说线程池,线程池为什么来呢?

带着问题去思考!大家好!

今天我们说说线程池,线程池为什么来呢?

  之前我们讲过线程,它的创建和协作的几种方式。花费极少的时间来完成创建很多异步操作。创建线程是昂贵的操作,所以为每个短暂的异步操作创建线程会产生显著的开销

  那么为了解决这一问题,有一个常用的方式叫做池,线程池可以成功的适用于任何需要大量短暂的开销大的资源情形。事先分配一定的资源,将这些资源放到资源池中。每次需要新的资源,只需要从池中获取一个,不需要创建新的。

  通过System.Threading.ThreadPool类型可以使用线程池。线程池是受.NET通用语言运行时(CLR)管理的。每个CLR都有一个线程池的实例,ThreadPool类型拥有一个QueueUserWorkItem静态方法,该静态方法接受一个委托。方法被调用后,委托会进入内部队列中,如果池中没有任何线程,将创建一个新的工作线程并将队列中第一个委托放入到该工作线程中。

注意:保持线程中的操作都是短暂的。

  停止:停止线程池中放置新操作时,线程池最终会删除一定时间后过期的不再使用的线程。这将释放所有那些不再需要的系统资源。

线程池中的的工作线程都是后台线程。这意味着当所有的前台线程(主线程)完成后,所有的后台线程将停止工作。

在线程池中使用委托

这里我们将说到异步编程模型(Asynchronous Programming Model,简称APM)的方式。

privatedelegatestring RunOnThreadPool(outint threadId);         privatestaticvoid Callback(IAsyncResult ar)         {             Console.WriteLine("Starting a callback...");             Console.WriteLine("State passed to a callback:{0}", ar.AsyncState);             Console.WriteLine("Is thread pool thread {0}", Thread.CurrentThread.IsThreadPoolThread);             Console.WriteLine("Thread pool worker thread id:{0}", Thread.CurrentThread.ManagedThreadId);         }         privatestaticstring Test(outint threadId)         {             Console.WriteLine("Starting....");             Console.WriteLine("Is thraed pool thread :{0}", Thread.CurrentThread.IsThreadPoolThread);             Thread.Sleep(TimeSpan.FromSeconds(2));             threadId = Thread.CurrentThread.ManagedThreadId;             returnstring.Format("Thread pool worker thread id was:{0}", threadId);         }         staticvoid Main(string[] args)         {             int threadId = 0;             RunOnThreadPool poolDelegate = Test;             var t = new Thread(() => Test(out threadId));             t.Start();             t.Join();             Console.WriteLine("Thread id:{0}", threadId);             //使用APM方式,进行异步调用,            IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delege asynchronous call");             r.AsyncWaitHandle.WaitOne();             string result = poolDelegate.EndInvoke(out threadId, r);             Console.WriteLine("Thread pool worker thread id:{0}", threadId);             Console.WriteLine(result);             Thread.Sleep(TimeSpan.FromSeconds(2));         }

 

运行结果:

.NET CORE不支持BeginInvoke

image.png

首先开始我们使用以前的方式创建一个线程,然后启动它并等待完成。而TheadPool为开始线程池任务输出的信息,Callback为APM模式运行任务结束后,执行的回调方法。

使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult对象等方式称为异步编程模型(或APM模式),这样的方法称为异步方法,但在现代编程中,更推荐使用任务并行库(Task Parallel Libary,TPL)来组织异步API

 

向线程池中放入异步操作

 

privatestaticvoid AsyncOperation(object state)         {             Console.WriteLine("Operation state:{0}",state??"(null)");             Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);             Thread.Sleep(TimeSpan.FromSeconds(2));         }         staticvoid Main(string[] args)         {             constint x = 1;             constint y = 2;             conststring lambdaState = "lambda state 2";             ThreadPool.QueueUserWorkItem(AsyncOperation);             Thread.Sleep(TimeSpan.FromSeconds(1));             ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");             Thread.Sleep(TimeSpan.FromSeconds(1));             ThreadPool.QueueUserWorkItem(state => {                 Console.WriteLine("Operation state:{0}", state);                 Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);                 Thread.Sleep(TimeSpan.FromSeconds(2));             }, "lambda state");             ThreadPool.QueueUserWorkItem(_ => {                 Console.WriteLine("Operation state:{0},{1}", x+y,lambdaState);                 Console.WriteLine("Worker thread id :{0}", Thread.CurrentThread.ManagedThreadId);                 Thread.Sleep(TimeSpan.FromSeconds(2));             }, "lambda state");         }

定义一个方法,首先接受单个object类型的参数,通过QueueUserWorkItem方法将该方法放到线程池.接着再次放入该方法,但是这次给方法调用传入一个状态对象,该对象将作为状态参数给AsynchronousOperation方法,

闭包机制,无需传递lambda表达式的状态,闭包更灵活,允许我们向异步操作传递一个以上的对象而且这些对象具有静态类型。

线程池与并行度

staticvoid UseThreads(int numberOfOperations)         {             using (var countdown=new CountdownEvent(numberOfOperations))             {                 Console.WriteLine("Scheduling work by creating threads");                 for (int i = 0; i < numberOfOperations; i++)                 {                     var thread = new Thread(()=> {                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);                         Thread.Sleep(TimeSpan.FromSeconds(0.1));                         countdown.Signal();//向 CountdownEvent 注册信号,同时减小 CurrentCount 的值。                    });                     thread.Start();                 }                 countdown.Wait();                 Console.WriteLine();             }         }         staticvoid UseThreadPool(int numberOfOperations)         {             using (var countdown=new CountdownEvent(numberOfOperations))             {                 Console.WriteLine("Stating work on a threadpool");                 for (int i = 0; i < numberOfOperations; i++)                 {                     ThreadPool.QueueUserWorkItem(_=> {                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);                         Thread.Sleep(TimeSpan.FromSeconds(0.1));                         countdown.Signal();//向 CountdownEvent 注册信号,同时减小 CurrentCount 的值。                    });                 }                 countdown.Wait();                 Console.WriteLine();             }                    }         staticvoid Main(string[] args)         {             constint numberOfOperations = 500;             var sw = new Stopwatch();             sw.Start();             UseThreads(numberOfOperations);             sw.Stop();             Console.WriteLine("Execution time using thraeds:{0}",sw.ElapsedMilliseconds);             sw.Reset();             sw.Start();             UseThreadPool(numberOfOperations);             sw.Stop();             Console.WriteLine("Execution time using thraeds:{0}", sw.ElapsedMilliseconds);         }

当程序启动的时候,创建很多不同的线程,每个线程运行一个操作,该操作打印出线程id并阻塞线程100毫秒,我们创建500个线程,虽然使用了205毫秒,但是线程消耗了大量的操作系统资源。

然后我们执行同样的任务,只不过不为每个操作创建一个线程,将它们放入到线程池当中。线程池在快结束的时候创建更多的线程,但是仍然花费了更多的时间,在我的机器上是接近9秒,我们为操作系统节省了内存和线程数,但是执行时间长了

image.png

image.png

image.png

实现一个取消

staticvoid AsyncOperation1(CancellationToken token)         {             Console.WriteLine("Starting the first task");             for (int i = 0; i < 5; i++)             {                 if (token.IsCancellationRequested)                 {                     Console.WriteLine("The first task has been canceled");                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1));             }             Console.WriteLine("The first task has completed succesfuly");         }         staticvoid AsyncOperation2(CancellationToken token)         {             try            {                 Console.WriteLine("Starting the first task");                 for (int i = 0; i < 5; i++)                 {                     token.ThrowIfCancellationRequested();                     Thread.Sleep(TimeSpan.FromSeconds(1));                 }                 Console.WriteLine("The second task has completed succesfuly");             }             catch (OperationCanceledException)             {                 Console.WriteLine("The second task has been canceld");             }         }         privatestaticvoid AsyncOperation3(CancellationToken token)         {             bool cancellationFlag = false;             token.Register(() => cancellationFlag = true);             Console.WriteLine("Starting the third task");             for (int i = 0; i < 5; i++)             {                 if (cancellationFlag)                 {                     Console.WriteLine("The third task has been canceld");                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1));             }             Console.WriteLine("The third task has been succesfuly");         }         staticvoid Main(string[] args)         {             using (var cts=new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_=>AsyncOperation1(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             using (var cts = new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             using (var cts = new CancellationTokenSource())             {                 CancellationToken token = cts.Token;                 ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));                 Thread.Sleep(TimeSpan.FromSeconds(2));                 cts.Cancel();             }             Thread.Sleep(TimeSpan.FromSeconds(2));         }

image.png

这里我们用了CancellationTokenSource和CancellationToken两个新类,它们在.NET4.0被引入,这里使用了三种方式来实现取消标记功能。

第一种使用轮询来检查CancellationToken.IsCancellationRequested属性。如果该属性为true,则说明操作需要被取消,

第二种是抛出一个OperationCancelledException异常。这允许在操作之外控制取消过程,需要取消操作时,通过操作之外的代码来处理。

最后一种是注册一个回调函数,当操作被取消时,在线程池调用该回调函数,这允许链式传递一个取消逻辑到另一个异步操作中

 

 

在线程池中使用等待事件处理器及超时

static void RunOperations(TimeSpan workOperationTimeOut)         {             using (var evt = new ManualResetEvent(false))             using (var cts=new CancellationTokenSource())             {                 Console.WriteLine("Registering timeout operation....");                 var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkOperationWait(cts, isTimedOut), null, workOperationTimeOut, true);                 Console.WriteLine("Statrting long running operation...");                 ThreadPool.QueueUserWorkItem(_ => WorkOperation(cts.Token, evt));                 Thread.Sleep(workOperationTimeOut.Add(TimeSpan.FromSeconds(2)));                 worker.Unregister(evt);             }         }         private static void WorkOperationWait(CancellationTokenSource cts, bool isTimedOut)         {             if(isTimedOut)             {                 cts.Cancel();                 Console.WriteLine("Worker operation timed out and was canceled");             }             else            {                 Console.WriteLine("Worker operation successed");             }         }         private static void WorkOperation(CancellationToken token,ManualResetEvent evt)         {             for (int i = 0; i < 6; i++)             {                 if(token.IsCancellationRequested)                 {                     return;                 }                 Thread.Sleep(TimeSpan.FromSeconds(1)) ;             }             evt.Set();         }         static void Main(string[] args)         {             RunOperations(TimeSpan.FromSeconds(5));             RunOperations(TimeSpan.FromSeconds(7));         }


image.png

线程池还有一个有用的方法:ThreadPool.RegisterWaitForSingleObject.该方法允许我们将回调函数放入线程池中的队列中。当提供的等待事件处理器收到信号或发生超时时,该回调函数将被调用,这允许我们为线程池中的操作实现超时功能。

首先按顺序向线程池中放入一个耗时长的操作,它运行6秒,一旦完成,会设置一个ManualResetEvent信号类。其他情况。比如取消,则操作会被丢弃、

注册第二个异步操作,当从ManualResetEvent对象接受一个信号后,该异步操作会被调用。如果一个操作顺利完成,会设置该信号量。另一种情况是第一个操作还未完成就已经超时,如果发生该情况,会使用CancellationToken来取消第一个操作。

最后,为操作提供5秒的超时时间是不够的,这是因为操作会花费6秒来完成,只能取消该操作。所以如果提供7秒的超时时间是可行的。该操作会顺利完成。

 

相关实践学习
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
本场景将自定义告警信息同时分发至多个通知渠道的需求,例如短信、电子邮件及钉钉群组等。通过采用轻量消息队列(原 MNS)的主题模型的HTTP订阅方式,并结合应用实时监控服务提供的自定义集成能力,使得您能够以简便的配置方式实现上述多渠道同步通知的功能。
相关文章
|
2月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
192 2
|
10月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
483 60
【Java并发】【线程池】带你从0-1入门线程池
|
11月前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
8月前
|
Java
线程池是什么?线程池在实际工作中的应用
总的来说,线程池是一种有效的多线程处理方式,它可以提高系统的性能和稳定性。在实际工作中,我们需要根据任务的特性和系统的硬件能力来合理设置线程池的大小,以达到最佳的效果。
252 18
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
544 1
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
733 64
|
10月前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
317 38
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
1768 31
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
407 4

热门文章

最新文章