kotlin通道讲解

简介: kotlin通道讲解

概览


关于通道你可以理解为生产者消费者模式,类似BlockQueue,一端生成数据发送数据,另外一边接收数据消费数据


通道基础


构建简单通道

我们构建一个简单的通道

本例中product和consumer两个方法都传入协程作用域和通道对象两个参数,

其中product单开一个协程中每隔100ms向通道中发送一条自增的int类型数据,发四条

consumer中也单开一个协程,协程中开启while循环,如果通道发送端没有关闭数据就一直轮询获取通道中的最新值打印出来

后面章节中的代码都在这个的基础上进行改动

  1. 代码
fun main() {
    channel1()
}
fun channel1() = runBlocking {
    val channel = Channel<Int>()
    product(this, channel)
    consumer(this, channel)
}
/**
 * 发送数据
 */
fun product(scope: CoroutineScope, channel: Channel<Int>) {
    var count = 0
    scope.launch {
        while (count < 4) {
            channel.send(count++)
            delay(100)
        }
    }
}
/**
 * 消费数据
 */
fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
    scope.launch {
        while (!channel.isClosedForSend) {
            delay(100)
            println(channel.receive())
        }
    }
}
复制代码
  1. 日志
0
1
2
3
复制代码


使用consume系列方法消费数据

消费者中可以使用consumeEachIndexed和consumeEach两个方法消费数据。

fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                println(it)
            }
        }
    }
复制代码


关闭与迭代通道

通道可以调用Channel.close来关闭通道,这样当我们不需要通道的时候可以主动关闭通道。

当我们调用Channel.close关闭通道的时候,如果通道中有消费者没有消费完成的数据,那么消费者会继续消费完剩余数据

  1. 代码
class Channel2{
    fun channel1() = runBlocking {
        val channel = Channel<Int>()
        product(this, channel)
        consumer(this, channel)
    }
    /**
     * 发送数据
     */
    fun product(scope: CoroutineScope, channel: Channel<Int>) {
        var count = 0
        scope.launch {
            while (count < 4) {
                channel.send(count++)
                delay(100)//延迟100ms
            }
            channel.close()//这里如果调用了close程序可以正常完成,如果不调用程序不能正常完成
        }
    }
    /**
     * 消费数据
     */
    fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                delay(200)//延迟200ms,因为生产者那里延迟了100ms,所以消费者的的存活时间是远低于生产者的
                println(it)
            }
        }
    }
}
复制代码


  1. 日志

不关闭通道日志:

image.png

关闭通道日志:

image.png

本例代码中我们必须在生产者发送完数据后主动调用close关闭通道,否则通道会一直阻止程序结束


使用 produce 函数构建管道


我们可以通过produce函数便携的构建一个生产者,如下所示,我们将produce函数构建为CoroutineScope的扩展函数,然后就可以直接在runBlocking中直接调用了。

管道可以被取消

  1. 代码
fun CoroutineScope.produce()=produce<Int> {
    for (i in 1..4){
        send(i)
    }
}
fun main()= runBlocking {
    produce().consumeEach {
        println(it)
    }
}
复制代码
  1. 日志

image.png


带缓冲的通道


  1. 代码
class BufferChannel {
    private val channel = Channel<Int>(4)
    private val scope = CoroutineScope(Dispatchers.Default)
    private var count = 0
    fun product() {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }
    fun consumer() {
        scope.launch {
            channel.consumeEach {
                println("消费数据:$it")
            }
        }
    }
}
fun main() = runBlocking {
    BufferChannel().apply {
        product()
        consumer()
    }
    println("执行结束")
}
复制代码
  1. 日志
执行结束
消费数据:0
消费数据:1
消费数据:2
消费数据:3
复制代码


带缓冲的通道


无缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调用,则它将被挂起直到接收被调用, 如果接收先被调用,它将被挂起直到发送被调用。

Channel和produce都可以指定一个缓冲,当缓冲没有满的时候生产者和消费者都去缓冲中操作数据,当缓冲满的时候才会进行阻塞

目前这个场景不好模拟,我只能先放上来代码了

  1. 代码
class BufferChannel {
    private val channel = Channel<Int>(5)
    private var count = 0
    fun product(scope: CoroutineScope) {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }
    fun consumer(scope: CoroutineScope) {
        scope.launch {
           while (!channel.isClosedForSend){
               println(channel.receive())
           }
        }
    }
}
fun main() = runBlocking {
    BufferChannel().let {
        it.product(this)
        it.consumer(this)
    }
    println("执行结束")
}
复制代码
  1. 日志
执行结束
0
1
2
3
4
5
6
7
8
9



相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
6月前
|
Go 开发者
Go 并发编程基础:无缓冲与有缓冲通道
本章深入探讨Go语言中通道(Channel)的两种类型:无缓冲通道与有缓冲通道。无缓冲通道要求发送和接收必须同步配对,适用于精确同步和信号通知;有缓冲通道通过内部队列实现异步通信,适合高吞吐量和生产者-消费者模型。文章通过示例对比两者的行为差异,并分析死锁风险及使用原则,帮助开发者根据场景选择合适的通道类型以实现高效并发编程。
|
网络协议 文件存储 Windows
Windows Server 2019 FTP服务器搭建
Windows Server 2019 FTP服务器搭建
411 0
|
存储 人工智能 边缘计算
云计算大势所趋:从基础到未来,探析发展趋势
云计算大势所趋:从基础到未来,探析发展趋势
|
Web App开发 资源调度 前端开发
electron 中如何安装或更新 vuejs-devtool 最新稳定版
electron 中如何安装或更新 vuejs-devtool 最新稳定版
|
监控 Linux Shell
|
安全 Unix Linux
CentOS介绍
【5月更文挑战第6天】CentOS介绍
1636 3
|
网络协议 网络性能优化 定位技术
ip呼叫是什么意思?
ip呼叫是什么意思?
|
存储 运维 安全
开源盛行:为什么学习国产达梦数据库?
开源盛行:为什么学习国产达梦数据库?
557 0
|
Java 索引
深入浅出JVM(五)之Java中方法调用
深入浅出JVM(五)之Java中方法调用
|
物联网 云栖大会 决策智能
从田间到市场 阿里云数智农业2.0发布
阿里云数智农业2.0提供了生产、流通、市场、服务的全链路支持。
1269 15
从田间到市场 阿里云数智农业2.0发布