这么一搞,再也不怕线程打架了

简介: 假如我们需要处理一个文本文件,里面有 100万行数据,需要对每条数据做处理,比如将每行数据的数字做一个运算,放入到另一个文件里。

假如我们需要处理一个文本文件,里面有 100万行数据,需要对每条数据做处理,比如将每行数据的数字做一个运算,放入到另一个文件里。

最简单的办法就是打开文件,逐行读取,每读取一行,对这一行做下处理,添加到目标文件中,再回来读取下一行。

这就是线性处理方式,假如处理一行数据需要 0.1 秒,那么用线性处理方式就需要:


10万秒,即大概 28个小时


显然对我们来说,这个时间有点长,有没用办法缩短呢?

当然有办法,那就是用 多线程 处理!

为什么呢?是因为多线程是提高效率,实现更有效程序的必然状态。

比如,需要处理大量的数据,需要响应多样的请求,需要与慢速的处理过程交互等等,都需要用到线程编程。

但是,线程这个概念不太好理解,用起来也总是不方便,而且容易出错,一方面是因为,我们的思路是线性的,另一方面是多线程本身有很多需要掌握的概念,学习理解难度比较高。

今天我将分享一下我在工作中是如何利用多线程技术,提速增效的。

对于前面那个例子,可以将原来的一个处理流程,分解为多个,例如之前的处理可以分解为:

读取行、做运算、存文件 三个自流程。

这样的话,相当于将只能一个人做的工作,可以让更多的人来做,从而形成类似的流水线效应,如图所示:


3.jpg

流水线


这是一张 CPU 处理指令的流水线示意图,可以看到在 t3 和 t4 的时间,四个工作在同时进行。

那么用多线程,就可以使我们的三个工作出现同时运行的状态,提升效率,比如先读取一行,然后再处理数据的同时,读取下一行,如此往复。

是不是感觉很好?

先别着急,首先需要解决一个问题 ——

如何避免重复读和跳读

重复读指的是,一个以上线程读取到了同一条数据;

跳读指的是,有些数据行没有任何线程处理。

这里介绍一个帮助我处理了很多多线程问题的方法,一个数据源类。


多线程数据源类

数据源类,就是将数据集中管理,然后以线程安全的方式为多线程程序提供数据。

注意:并非最佳方法,但很实用

废话不多说,直接看代码:


import threading
class DataSource:
    def __init__(self, dataFileName, startLine=0, maxcount=None):
        self.dataFileName = dataFileName
        self.startLine = startLine  # 第一行行号为1
        self.line_index = startLine # 当前读取位置
        self.maxcount = maxcount  # 读取最大行数
        self.lock = threading.RLock() # 同步锁        
        self.__data__ = open(self.dataFileName, 'r', encoding= 'utf-8')
        for i in range(self.startLine):
            l = self.__data__.readline()
    def getLine(self):
        self.lock.acquire()
        try:
            if self.maxcount is None or self.line_index < (self.startLine + self.maxcount):
                line = self.__data__.readline()
                if line:
                    self.line_index += 1
                    return True, line
                else:
                    return False, None
            else:
                return False, None
        except Exception as e:
            return False, "处理出错:" + e.args
        finally:
            self.lock.release()
    def __del__(self):
        if not self.__data__.closed:
            self.__data__.close()
            print("关闭数据源:", self.dataFileName)


  • __init__ 初始化方法,接受 3 个参数
    lock 属性是一个同步锁,以便在多线程读取不出现冲突
  • dataFileName 是数据文件路径
  • startLine 开始读取行,对于大文件需要分配处理时特别有用,
  • maxcount 读取最大行数,通过和 startLine 配合可以读取指定部分的数据,默认为全部读取
  • getLine 方法,每次调用会返回一个元组,包含状态和得到的,数据
  • __del__ 方法会在对象销毁时调用,在此记录当前处理位置

这样就是可以应用在多线程程序中,承担读取待处理记录的任务了。


业务处理

例如核心处理程序如下:


import time
def process(worker_id, datasource):
    count = 0
    while True:
        status, data = datasource.getLine()
        if status:
            print(">>> 线程[%d] 获得数据, 正在处理……" % worker_id)
            time.sleep(3) # 等待3秒模拟处理过程
            print(">>> 线程[%d] 处理数据 完成" % worker_id)
            count += 1
        else:
            break # 退出循环
    print(">>> 线程[%d] 结束, 共处理[%d]条数据" % (worker_id, count))


  • 参数 worker_id 是线程号,用于区分输出消息
  • 参数 datasourceDataSource 的实例,作为各线程的共享数据源
  • count 用于记录当前线程处理的记录数
  • 用一个死循环,驱动反复处理,直到读取没数据可读


组装

线程组装部分就也很简单:


import threading
def main():
    datasource = DataSource('data.txt') 
    workercount = 10 # 开启的线程数,注意:并非越多越快哦
    workers = []
    for i in range(workercount):
        worker = threading.Thread(target=process, args=(i+1, datasource))
        worker.start()
        workers.append(worker)
    for worker in workers:
        worker.join()


  • 先初始化一个 DataSource
  • workercount 为需要创建的线程数,在实际应用中可以通过配置或者参数提供,另外不是线程越多越好,一般设置为CPU核心数的两倍即可
  • threading.Thread 是线程类,可以实例化一个线程,target 参数是线程处理方法,这里就是前面定义的 process 方法,args 为提供给处理方法的参数
  • 线程的 start 方法是启动线程,因为创建不等于启动,start 是个异步方法,调用会瞬间完成
  • join 方法是等待线程处理完成,是同步方法,只有线程真正处理完成才会结束


扩展

通过这样的方式,帮我处理了很多实际的业务,比如爬取关键字信息,合并数据等等。

如果处理的数据不是文本文件,只要修改一下 DataSource 的 getLine 实现就可以了,比如数据源来自数据库等。

另外,上面的 DataSource 并非最优的,只是起到了规范读取接口,防止数据误读的作用,完全谈不上性能最优。

那么如何实现更优呢,这里提供一个思路就是,使用生产者消费者模型,利用 队列,以及 预读取 技术来实现更优的数据源类。

例如,DataSource 中,是逐行读取的,可以采用预读取,即提前读取一些数据,当线程需要数据时,先给出预读取的,等预读取的数据消费到一定量时,再异步读取一部分。

这样的好处是,各个线程不必等待 IO 时间(简单理解为从文件或者网络读取的等待时间)。

如何实现呢,可以了解一下队列(queue)的概念,Python 中提供了两种队列,同步队列 queue[1]队列集[2]

想想具体应该怎么做呢?欢迎在留言区写下你的方法和建议。


总结

今天分享了一个在实际工作中用到的,多线程处理数据的例子,例子虽然简单,但很实用,已经帮助我了很多重要的工作。

谈一些感悟,Python 的应用并不仅限于数据分析、AI 领域等热门领域,更多的可以应用在于处理日常生活工作中,比如处理数据,代替手工操作,简单运算等。

我们知道,学会一个东西最好的方式是使用,对于 Python 技能来说,也是一样的,多在日常工作中用,多去解决实际问题,不用卯足了劲儿,憋个大招。

祝你在 Python 大道上越走越顺,比心!


参考资料

[1]

同步队列: https://docs.python.org/3/library/queue.html

[2]

队列集: https://docs.python.org/zh-cn/3/library/asyncio-queue.html

目录
相关文章
|
Java 程序员
IT学不好没什么,大不了躺平
IT学不好没什么,大不了躺平
|
消息中间件 JavaScript 小程序
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
|
存储 机器学习/深度学习 监控
我是傻x,被迫看了 1 天源码,千万别学我!
大家好,我是零一,之前一直很忙,业余时间的输入和输出都 24k铝合金人眼可见 得下降,这不最近上海疫情严重么,算了一下居家办公也已经将近 1个月了,这才有些许时间学习,所以最近也是一直在鼓捣点新东西,不为别的,主要是想再多输入一些新的知识
186 0
我是傻x,被迫看了 1 天源码,千万别学我!
|
设计模式 架构师 Java
为什么有些蛮厉害的人,后来都不咋样了
写这篇文章目的是之前在一篇文章中谈到,我实习那会有个老哥很牛皮,业务能力嘎嘎厉害,但是后面发展一般般,这引起我的思考,最近有个同事发了篇腾讯pcg的同学关于review 相关的文章,里面也谈到架构师的层次,也再次引起我关于架构师的相关思考,接下来我们展开聊聊吧~
164 0
|
芯片
程序人生 - 手上总有静电该怎么处理?
程序人生 - 手上总有静电该怎么处理?
160 0
程序人生 - 手上总有静电该怎么处理?
|
缓存 Java
学会这些,再也不怕面试被问线程知识了
继承Thread类创建线程,重写run方法,实现Runnable接口创建线程,实例化thread类,使用Callable和Future创建线程,使用线程池例如用Executor框架
117 0
|
人工智能 安全 数据挖掘
这么一搞,再也不怕线程打架了
假如我们需要处理一个文本文件,里面有 100万行数据,需要对每条数据做处理,比如将每行数据的数字做一个运算,放入到另一个文件里。 最简单的办法就是打开文件,逐行读取,每读取一行,对这一行做下处理,添加到目标文件中,再回来读取下一行。 这就是线性处理方式,假如处理一行数据需要 0.1 秒,那么用线性处理方式就需要:
224 0
这么一搞,再也不怕线程打架了
|
Java
学弟学妹们,学会霍夫曼编码后,再也不用担心网络带宽了!(2)
学弟学妹们,学会霍夫曼编码后,再也不用担心网络带宽了!
132 0
学弟学妹们,学会霍夫曼编码后,再也不用担心网络带宽了!(2)
|
算法 搜索推荐 程序员
学弟学妹们,学会霍夫曼编码后,再也不用担心网络带宽了!(1)
学弟学妹们,学会霍夫曼编码后,再也不用担心网络带宽了!
124 0
学弟学妹们,学会霍夫曼编码后,再也不用担心网络带宽了!(1)
|
缓存 Java 程序员
一篇文章带你完全了解JAVA线程池,再也不用担心被面试官问了
线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位,我们的程序最终都是由线程进行运作。在Java中,创建和销毁线程的动作是很消耗资源的,因此就出现了所谓“池化资源”技术。线程池是池化资源技术的一个应用,所谓线程池,顾名思义就是预先按某个规定创建若干个可执行线程放入一个容器中(线程池),需要使用的时候从线程池中去取,用完之后不销毁而是放回去,从而减少了线程创建和销毁的次数,达到节约资源的目的。

相关实验场景

更多