怎么还蹦出来个 “ 数据管道 ”

简介: 怎么还蹦出来个 “ 数据管道 ”

创建数据处理管道


问题核心:★★   口感:天津肉包子


问题


你想以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,你有个大量的数据  需要处理,但是不能将它们一次性放入内存中。


解决方案


生成器函数是一个实现管道机制的好办法。

假定你要处理一个非常大的  日志文件目录:


foo/
access-log-012007.gz
access-log-022007.gz
access-log-032007.gz
...
access-log-012008
bar/
access-log-092007.bz2
...
access-log-022008


假设每个日志文件包含这样的数据:

    124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
    210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
    210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
    61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
    ...
    


    为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器

    就像这样:

      import os
      import fnmatch
      import gzip
      import bz2
      import re
      def gen_find(filepat, top):
            '''
            查找目录树中与外壳通配符模式匹配的所有文件名
            '''
            for path, dirlist, filelist in os.walk(top):
                for name in fnmatch.filter(filelist, filepat):
                    yield os.path.join(path,name)
      def gen_opener(filenames):
            '''
            每次打开一个文件名序列,生成一个file对象。在进行下一次迭代时,文件将立即关闭。
            '''
            for filename in filenames:
                if filename.endswith('.gz'):
                    f = gzip.open(filename, 'rt')
                elif filename.endswith('.bz2'):
                    f = bz2.open(filename, 'rt')
                else:
                    f = open(filename, 'rt')
                yield f
                f.close()
      def gen_concatenate(iterators):
          '''
          将一个迭代器序列链接到一个单独的序列中
          '''
          for it in iterators:
              yield from it
      def gen_grep(pattern, lines):
          '''
          在一个行序列中查找regex模式
          '''
          pat = re.compile(pattern)
              for line in lines:
                  if pat.search(line):
                        yield line


      现在你可以很容易的将这些函数连起来创建一个处理管道。比如,为了查找包含单  词  Python 的所有日志行.

      你可以这样做:

        lognames = gen_find('access-log*', 'www')
        files = gen_opener(lognames)
        lines = gen_concatenate(files)
        pylines = gen_grep('(?i)python', lines)
        for line in pylines:
            print(line)
        


        如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。比如,下面这个版本计算出传输的字节数并计算其总和。


        lognames = gen_find('access-log*', 'www')
        files = gen_opener(lognames)
        lines = gen_concatenate(files)
        pylines = gen_grep('(?i)python', lines)
        bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
        bytes = (int(x) for x in bytecolumn if x != '-')
        print('Total', sum(bytes))


        结论


        以管道方式处理数据可以用来解决各类其他问题,包括解析读取实时数据定时轮询等。

        为了理解上述代码,重点是要明白yield 语句作为数据的生产者而 for 循环语句  作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元  素传递给迭代处理管道的下一阶段。

        在例子最后部分sum() 函数是最终的程序驱动者,每次从生成器管道中提取出一个元素。这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就  很容易编写和维护它们了。

        事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小  的内存

        在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的  是将输入序列拼接成一个很长的行序列。itertools.chain() 函数同样有类似的功能,  但是它需要将所有可迭代对象最为参数传入。

        在上面这个例子中,你可能会写类似这样  的语句 lines = itertools.chain(*files) ,这将导致 gen_opener() 生成器被提前  全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭  代步骤时文件就关闭了,因此 chain() 在这里不能这样使用,,当然上面的方案可以避免这种情况。

        相关文章
        |
        3月前
        |
        负载均衡 安全 Java
        并行流的人生
        【10月更文挑战第8天】
        31 1
        java流是指在Java中用来读写数据的一组有序的数据序列,它可以将数据从一个地方带到另一个地方。java流分为输入流和输出流,输入流是从源读取数据的流,而输出流是将数据写入到目的地的流。Java流又可以分为字节流和字符流,字节流读取的最小单位是一个字节(1byte=8bit),而字符流一次可以读取一个字符(1char = 2byte = 16bit)。Java流还可以分为节点流和处理流,节点流是直接从一个源读写数据的流(这个流没有经过包装和修饰),处理流是在对节点流封装的基础上的一种流。
        135 0
        |
        8月前
        |
        前端开发 API
        18_管道——转换
        18_管道——转换
        47 0
        |
        8月前
        |
        Linux Shell
        使用输入输出重定向与管道命令——管道
        使用输入输出重定向与管道命令——管道。
        57 0
        |
        SQL JavaScript 前端开发
        开始使用流
        Java 8 中的 Stream 俗称为流,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念 Stream 用于对集合对象进行各种非常便利、高效的聚合操作,或者大批量数据操作 Stream API 借助于 Lambda 表达式,极大的提高编程效率和程序可读性 同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势 通过下面的例子我们可以初步体会到使用 Stream 处理集合的便利性
        63 1
        I/O流
        IO流:I的全称是Input,O的全称是Output。表示读取,流可以看做是程序传输数据的通道。 作用:解决程序请求资源,输出资源的问题。
        62 0
        |
        存储 Java
        流及其相关操作
        流及其相关操作
        |
        Java 数据库
        I/O 流总结
        I/O 流总结
        102 0
        |
        负载均衡
        进程间通信--管道
        进程间通信--管道