使用python实现一个用户态协程

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
注册配置 MSE Nacos/ZooKeeper,118元/月
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作

简介

本文使用python模拟golang的协程功能。

通过模拟Golang的CSP(Communicating Sequential Processes)模型,作者创建了一个名为Queuey的类来实现无阻塞的放入和取出操作。
Queuey使用锁来同步访问,并提供了同步和异步的接口。
异步操作利用了Python的async/await语法。
通过检查调用堆栈来决定使用同步还是异步方法。
示例展示了如何在协程中使用这个类进行异步生产者-消费者模式。
mandala曼德罗符号.png

1 并发的使用场景

有很多地方需要并发地实现访问或提供服务,可以使用到异步的方式编程,其中协程是一个流行的方式。在golang中 启动协程很容易,这里将在python中也实现类似的功能。

channel 用于在协程直接通信,并且可以根据channle条件退出协程,一个缓冲channel定义如下

chanNoSize = make(chan int)

chanWithSize = make(chan int,2)

go的并发模型是一种被称之为CSP的类型,在CSP 模型中 chanel是第一类对象, 它不关注发送消息的实体。

而关注与发送消息使用的channel communicating sequential Process 简称CSP顺序通信进程,是一种用户态和系统级线程控制的混合编程模型。

csp可以被认为是一种形式语言,用于描述系统中的互动模式.

在go内部channel实现了以下几个功能:

     qcount 保存队列的项目/数据计数

     dataqsize 是循环队列大小。 这用于缓冲通道场景,也就是make的第二参数
     elemsize 是通道相对单个元素的大小
     buf 是我们使用缓冲通道时存储数据的实际循环队列
     closed 表示通道是否关闭。 

语法 close(chanWithSize) 默认为0,关闭时设置为1.

chanel 是被单独创建并且可以在进程之间传递。

通信模式类似于 boss-worker模式。 一个实体通过将消息发送到 channel
监听这个channel的实体处理时,两个实体之间是匿名的,

2 实现python版本的协程

而python这类线程 默认的开发方式为 线程内存同步方式.
我们可以使用一些内部函数,实现类型go的协程,稍微会复杂一些。

我们定义一个协程通信管理队列,类似于channel

    class Queuey():
        def __init__(self, maxsize):
            self.mutex = Lock()
            self.maxsize = maxsize
            self.items = list()
            self.getters = list()
            self.putters = list()

实现一个可冲入锁对象 self.mutex 一个锁对象是一个同步基元。
要创建一个锁 的方法是: -调用 threading.Lock()。 有以下方法。

  acquire() -- 锁定,可能会阻塞,直到获得锁。
  release() -- 解除对锁的锁定
  locked() -- 测试该锁是否被锁定

锁不属于锁定它的线程;另一个线程可以 解锁它。 如果一个线程试图锁定一个它已经锁定的锁 将会阻塞,直到另一个线程将其解锁。 死锁可能会随之产生。

acquire(blocking=True, timeout=-1) -> bool
(acquisition_lock()是一个弃用的同义词)

锁定该锁。 没有参数,如果该锁已经被 锁定(即使是被同一个线程),等待另一个线程释放 锁,一旦获得锁,则返回True。

有参数时,只有当参数为 True 时才会阻塞。

并且返回值反映了是否获得了锁。

阻塞操作是可中断的。

3 无阻塞放入和取出

先获取到锁, 如果结果队列items中不为空,那么就在待处理的 待返回对象中取出一个,唤醒一个 左部待添加的 fut 元素,并将其设置为 True, 返回队列 左部 一个元素,同时 错误信息返回None,表示没有错误(对应go的nil)

def get_noblock(self):
    with self.mutex:
        if self.items: 
            if self.putters:
                self.putters.pop(0).set_result(True)

            return self.items.pop(0), None
        else:
            fut = Future()
            self.getters.append(fut)
            return fut, None

先获取到锁, 如果结果队列items中 小于缓冲大小 maxsize,那么添加到item处理队列中,再进行 getters 操作,如果getters队列不为空,那么从items队列的左部获取一个值,并设置结果到getters左部。

如果结果队列已经达到最大缓冲大小,那么将待返回结果 future 添加到 putters 队列(如果结果队列有对象需要返回时将从这里取得并返回)。

   def put_noblock(self, item):
        with self.mutex:  
            if len(self.items) < self.maxsize:
                self.items.append(item) 
                if self.getters:
                    self.getters.pop(0).set_result(
                        self.items.pop(0)
                    )
            else:
                fut = Future()
                self.putters.append(fut)
                return fut

获取的任务返回状态默认是pending。在无阻塞的操作函数完成后,基于此,我们可以实现同步 或 异步地操作协程。

  • 同步存和同步取

         def get_sync(self):
              item, fut = self.get_noblock()
              if fut:
                  item = fut.result()
              return item
    
        def put_sync(self, item):
              while True:
                  fut = self.put_noblock(item)
                  if fut is None:
                      return
                  fut.result()
    
  • 异步取和存

     async def get_async(self):
                 item, fut = self.get_noblock()
                 if fut:
                     item = await wait_for(wrap_future(fut), None)
                 return item
    
     async def put_async(self, item):
         while True:
             fut = self.put_noblock(item)
             if fut is None:
                 return
             await wait_for(wrap_future(fut), None)
    

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def get(self):
        if sys._getframe(2).f_code.co_flags & 0x380: 
            print(f"get async item")
            return self.get_async()
        else:  # 不是协程,使用同步的方式 取值
            print(f"get sync item:")

            return self.get_sync() if len(self.items) > 0 else None

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def put(self, item):
        if sys._getframe(2).f_code.co_flags & 0x380:  
            print(f"put async item:", item)
            return self.put_async(item)
        else:   
            print(f"put sync item:", item)
            return self.put_sync(item)

判断一个函数是否异步和协程的方式,在python中可以使用一个内置的方法,

    sys._getframe(2).f_code.co_flags & 0x380

这使python知道函数需要被协程一样的使用, 一种进入函数的方式的判断,在py3中需要在 调用函数 之前加 await 关键字

sys._getframe 从调用栈中返回一个框架对象。 如果给定了可选的整数深度,则返回比堆栈顶部调用次数少的那个框架对象。

调用堆栈的顶部以下的框架对象。 如果这个深度超过了调用 栈,ValueError将被引发。 默认的深度是0,返回 调用堆栈顶部的框架。

返回的 CodeType 中代码标记如果 为 十进制 896, 这个函数应该被用于内部和专门的目的 仅用于内部和专门用途。

4 在函数 使用

最后在函数中使用这个类, 在 线程中执行 异步取值

async def aproducer(q, n):
    for i in range(n):

        await q.put(i)

    await q.put(None)

async def aconsumer(q):

    while True:

        item = await q.get() 
        if item is None:
            break
        print("Async Got:", item)

5 小结

我们可以使用 wait_for 实现类似的功能。 它等待单个Future或coroutine完成,有超时参数可以设置。

Coroutine将被包裹在Task中。 返回Future或coroutine的结果。 当超时发生时。 它取消任务并引发TimeoutError。
为了避免任务取消,将其包裹在shield()中。 如果等待被取消了,任务也被取消了。

这个函数是一个coroutine。

下一节,我们完成同步存和异步取,异步存和异步取,并对此函数的做示例,与go的实现做对比。

目录
相关文章
|
3天前
|
开发者 Python
探索Python中的异步编程:理解Asyncio和协程
【9月更文挑战第18天】在Python的世界中,异步编程是一个强大而神秘的概念。它像是一把双刃剑,掌握得好可以大幅提升程序的效率和性能;使用不当则可能让代码变得难以维护和理解。本文将带你一探究竟,通过深入浅出的方式介绍Python中asyncio库和协程的基本概念、使用方法及其背后的原理,让你对异步编程有一个全新的认识。
|
27天前
|
数据处理 调度 开发者
解密Python的异步编程:协程与事件循环的实战应用
在现代应用程序开发中,异步编程已经成为提高性能和响应速度的关键技术。Python的异步编程通过协程和事件循环提供了高效处理并发任务的能力。本文将深入探讨Python中异步编程的核心概念,包括协程的基本用法、事件循环的工作机制以及如何在实际项目中应用这些技术。通过对比同步和异步编程的性能差异,读者将能够理解异步编程的优势,并学会如何在Python中实现高效的异步任务处理。
|
2月前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
【7月更文挑战第15天】Python的协程与异步函数优化Web性能,通过非阻塞I/O提升并发处理能力。使用aiohttp库构建异步服务器,示例代码展示如何处理GET请求。异步处理减少资源消耗,提高响应速度和吞吐量,适用于高并发场景。掌握这项技术对提升Web应用性能至关重要。
71 10
|
2月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
【7月更文挑战第15天】Python 3.5+引入的协程和异步函数革新了并发编程。协程,轻量级线程,由程序控制切换,降低开销。异步函数是协程的高级形式,允许等待异步操作。通过`asyncio`库,如示例所示,能并发执行任务,提高I/O密集型任务效率,实现并发而非并行,优化CPU利用率。理解和掌握这些工具对于构建高效网络应用至关重要。
41 6
|
2月前
|
大数据 数据处理 API
性能飞跃:Python协程与异步函数在数据处理中的高效应用
【7月更文挑战第15天】在大数据时代,Python的协程和异步函数解决了同步编程的性能瓶颈问题。同步编程在处理I/O密集型任务时效率低下,而Python的`asyncio`库支持的异步编程利用协程实现并发,通过`async def`和`await`避免了不必要的等待,提升了CPU利用率。例如,从多个API获取数据,异步方式使用`aiohttp`并发请求,显著提高了效率。掌握异步编程对于高效处理大规模数据至关重要。
41 4
|
2月前
|
设计模式 机器学习/深度学习 测试技术
设计模式转型:从传统同步到Python协程异步编程的实践与思考
【7月更文挑战第15天】探索从同步到Python协程异步编程的转变,异步处理I/O密集型任务提升效率。async/await关键词定义异步函数,asyncio库管理事件循环。面对挑战,如思维转变、错误处理和调试,可通过逐步迁移、学习资源、编写测试和使用辅助库来适应。通过实践和学习,开发者能有效优化性能和响应速度。
43 3
|
2月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
43 2
|
2月前
|
Python
从零到一:构建Python异步编程思维,掌握协程与异步函数
【7月更文挑战第15天】Python异步编程提升效率,通过协程与异步函数实现并发。从async def定义异步函数,如`say_hello()`,使用`await`等待异步操作。`asyncio.run()`驱动事件循环。并发执行任务,如`asyncio.gather()`同时处理`fetch_data()`任务,降低总体耗时。入门异步编程,解锁高效代码。
51 1
|
1月前
|
Python
python 协程 自定义互斥锁
【8月更文挑战第6天】这段代码展示了如何在Python的异步编程中自定义一个互斥锁(`CustomMutex`类)。该类通过`asyncio.Lock`实现,并提供`acquire`和`release`方法来控制锁的获取与释放。示例还包含了使用此自定义锁的场景:两个任务(`task1`和`task2`)尝试按序获取锁执行操作,直观地演示了互斥锁的作用。这有助于理解Python协程中互斥锁的自定义实现及其基本用法。
|
1月前
|
大数据 API 调度
Python中的异步编程:理解asyncio模块与协程
在现代编程中,异步编程越来越重要,特别是在处理大规模并发任务时。Python的asyncio模块提供了强大的工具来实现异步操作,其中协程是其核心机制之一。本文将深入探讨asyncio模块的基本概念、如何编写和管理异步任务,以及协程的工作原理和实际应用。