channel通道(goroutine之间通信的管道)
如果说 goroutine 是 Go语言程序的并发体的话,那么 channels 就是它们之间的通信机制。一个 channels 是一个通信机制,它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。
Go语言提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
这里通信的方法就是使用通道(channel),如下图所示
在地铁站、食堂、洗手间等公共场所人很多的情况下,大家养成了排队的习惯,目的也是避免拥挤、插队导致的低效的资源使用和交换过程。代码与数据也是如此,多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种队列一样的结构。
# 1. 通道的基本使用
# 通道的特性
Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。
通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。
# 管道(channel)特质介绍
管道本质就是一个数据结构-队列
数据是先进先出
自身线程安全,多协程访问时,不需要加锁,channel本身就是线程安全的
管道有类型的,一个string的管道只能存放string类型数据
# 声明通道类型
通道本身需要一个类型进行修饰,就像切片类型需要标识元素类型。通道的元素类型就是在其内部传输的数据类型,声明如下:
var 通道变量 chan 通道类型
通道类型:通道内的数据类型。
通道变量:保存通道的变量。
chan 类型的空值是 nil,声明后需要配合 make 后才能使用。
# 创建通道
通道是引用类型,需要使用 make 进行创建,格式如下:
通道实例 := make(chan 数据类型,容量)
数据类型:通道内传输的元素类型。
通道实例:通过make创建的通道句柄。
容量:创建通道占用容量
# 示例
ch1 := make(chan int) // 创建一个整型类型的通道
ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式
type Equip struct{ /* 一些字段 */ }
ch2 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
intChan = make(chan int,3) //通过make初始化:管道可以存放3个int类型的数据
2
3
4
5
# 使用通道发送数据
通道创建后,就可以使用通道进行发送和接收操作。
# 格式
通道的发送使用特殊的操作符<-
,将数据通过通道发送的格式为:
通道变量 <- 值
通道变量:通过make创建好的通道实例。
值:可以是变量、常量、表达式或者函数返回值等。值的类型必须与ch通道的元素类型一致。
# 通过通道发送数据的例子
使用 make 创建一个通道后,就可以使用<-
向通道发送数据,代码如下:
// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"
2
3
4
5
6
# 发送将持续阻塞直到数据被接收
把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,代码如下:
package main
func main() {
// 创建一个整型通道
ch := make(chan int)
// 尝试将0通过通道发送
ch <- 0
}
// fatal error: all goroutines are asleep - deadlock!
2
3
4
5
6
7
8
9
报错的意思是:运行时发现所有的 goroutine(包括main)都处于等待 goroutine。也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码
# 代码示例
package main
import "fmt"
type cer struct {
name string
age int
}
func main() {
// 示例一: 创建通道类型为int 容量为3
channel := make(chan int, 3)
channel <- 20
channel <- 10
num := 99
channel <- num
//channel <- num // 超过通道定义的容量就会报错: fatal error: all goroutines are asleep - deadlock!
fmt.Println(channel)
// 示例二: 创建一个结构体通道
ch := make(chan cer, 2)
ch <- cer{name: "小明", age: 29}
ch <- cer{name: "小虎", age: 29}
fmt.Println(ch)
}
/*
0xc0000b6080
0xc0000c6000
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 使用通道接收数据
通道接收同样使用 <- 操作符,通道接收有如下特性:
① 通道的收发操作在不同的两个 goroutine 间进行。
由于通道的数据在没有接收方处理时,数据发送方会持续阻塞,因此通道的接收必定在另外一个 goroutine 中进行。
② 接收将持续阻塞直到发送方发送数据。
如果接收方接收时,通道中没有发送方发送数据,接收方也会发生阻塞,直到发送方发送数据为止。
③ 每次接收一个元素。
通道一次只能接收一个数据元素。
通道的数据接收一共有以下 4 种写法。
# 阻塞接收数据
阻塞模式接收数据时,将接收变量作为<-
操作符的左值,格式如下:
data := <-ch
执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。
# 非阻塞接收数据
使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下:
data, ok := <-ch
data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。
ok:表示是否接收到数据。
非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计时器 channel 进行,可以参见后面的内容。
# 接收任意数据,忽略接收的数据
阻塞接收数据后,忽略从通道返回的数据,格式如下:
<-ch
执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。
# 循环接收
通道的数据接收可以借用 for range 语句进行多个元素的接收操作,格式如下:
for data := range ch {
}
2
通道 ch 是可以进行遍历的,遍历的结果就是接收到的数据。数据类型就是通道的数据类型。通过 for 遍历获得的变量只有一个,即上面例子中的 data。
管道支持for-range的方式进行遍历,请注意两个细节(不关闭管道的化遍历到最后一个会出错)
在遍历时,如果管道没有关闭,则会出现deadlock的错误
在遍历时,如果管道已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
# 代码示例
package main
import "fmt"
type cher struct {
name string
age int
}
func main() {
ch := make(chan cher, 4)
ch <- cher{name: "小明", age: 29}
ch <- cher{name: "小辣", age: 29}
ch <- cher{name: "小虎", age: 29}
ch <- cher{name: "小谢", age: 29}
// 方式一: 阻塞接收数据
v1 := <-ch
fmt.Println(v1)
// 方式二: 非阻塞接收数据
v2, ok := <-ch
fmt.Println(v2, ok) // 未接收到返回零值,和false
// 方式三: 接收任意数据,忽略接收的数据
<-ch
// 方式四: 循环接收 管道支持for-range的方式进行遍历,请注意两个细节
// 1)在遍历时,如果管道没有关闭,则会出现deadlock的错误
// 2)在遍历时,如果管道已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
close(ch)
for v3 := range ch {
fmt.Println(v3)
}
}
/*
{小明 29}
{小辣 29} true
{小谢 29}
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 管道的关闭
使用内置函数close可以关闭管道,当管道关闭后,就不能再向管道写数据了,但是仍然可以从该管道读取数据。
# 代码示例
package main
import (
"fmt"
)
func main() {
//定义管道 、 声明管道
var intChan chan int
//通过make初始化:管道可以存放3个int类型的数据
intChan = make(chan int, 3)
//在管道中存放数据:
intChan <- 10
intChan <- 20
//关闭管道:
close(intChan)
//再次写入数据:--->报错
//intChan<- 30
//当管道关闭后,读取数据是可以的:
num := <-intChan
fmt.Println(num)
}
/*
10
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 2. 协程和通道协同工作的案例
# 案例需求
请完成协程和管道协同工作的案例,具体要求:
开启一个writeData协程,向管道中写入10个整数.
开启一个readData协程,从管道中读取writeData写入的数据。
注意: writeData和readDate操作的是同一个管道
主线程需要等待writeData和readDate协程都完成工作才能退出
# 原理图
# 代码示例
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup //只定义无需赋值
// 写:
func writeData(intChan chan int) {
defer wg.Done()
for i := 1; i <= 10; i++ {
intChan <- i
fmt.Println("写入的数据为:", i)
time.Sleep(time.Second)
}
//管道关闭:
close(intChan)
}
// 读:
func readData(intChan chan int) {
defer wg.Done()
//遍历:
for v := range intChan {
fmt.Println("读取的数据为:", v)
time.Sleep(time.Second)
}
}
func main() { //主线程
//写协程和读协程共同操作同一个管道-》定义管道:
intChan := make(chan int, 10)
wg.Add(2)
//开启读和写的协程:
go writeData(intChan)
go readData(intChan)
//主线程一直在阻塞,什么时候wg减为0了,就停止
wg.Wait()
fmt.Println("执行完毕")
}
/*
写入的数据为: 1
读取的数据为: 1
写入的数据为: 2
读取的数据为: 2
写入的数据为: 3
读取的数据为: 3
写入的数据为: 4
读取的数据为: 4
写入的数据为: 5
读取的数据为: 5
写入的数据为: 6
读取的数据为: 6
写入的数据为: 7
读取的数据为: 7
写入的数据为: 8
读取的数据为: 8
写入的数据为: 9
读取的数据为: 9
写入的数据为: 10
读取的数据为: 10
执行完毕
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 3. 单向通道(只读/只写)
Go语言单向通道——通道中的单行道 (opens new window)
Go语言的类型系统提供了单方向的 channel 类型,顾名思义,单向 channel 就是只能用于写入或者只能用于读取数据。当然 channel 本身必然是同时支持读写的,否则根本没法用。
假如一个 channel 真的只能读取数据,那么它肯定只会是空的,因为你没机会往里面写数据。同理,如果一个 channel 只允许写入数据,即使写进去了,也没有丝毫意义,因为没有办法读取到里面的数据。所谓的单向 channel 概念,其实只是对 channel 的一种使用限制。
# 单向通道的声明格式
我们在将一个 channel 变量传递到一个函数时,可以通过将其指定为单向 channel 变量,从而限制该函数中可以对此 channel 的操作,比如只能往这个 channel 中写入数据,或者只能从这个 channel 读取数据。
单向 channel 变量的声明非常简单,只能写入数据的通道类型为 chan<- ,只能读取数据的通道类型为 <-chan ,格式如下:
只读
var 通道实例 <-chan 元素类型 // 只能读取数据的通道
只写
var 通道实例 chan<- 元素类型 // 只能写入数据的通道
元素类型:通道包含的元素类型。
通道实例:声明的通道变量。
# 使用 make 创建通道时,也可以创建一个只写入或只读取的通道
ch := make(<-chan int)
var chReadOnly <-chan int = ch
<-chReadOnly
2
3
# time包中的单向通道
time 包中的计时器会返回一个 timer 实例,代码如下:
timer := time.NewTimer(time.Second)
timer的Timer类型定义如下:
type Timer struct {
C <-chan Time
r runtimeTimer
}
2
3
4
第 2 行中 C 通道的类型就是一种只能读取的单向通道。如果此处不进行通道方向约束,一旦外部向通道写入数据,将会造成其他使用到计时器的地方逻辑产生混乱。
因此,单向通道有利于代码接口的严谨性。
# 代码示例
package main
import (
"fmt"
)
func main() {
//默认情况下,管道是双向的--》可读可写:
//var intChan1 chan int
//声明为只写:
var intChan2 chan<- int // 管道具备<- 只写性质
intChan2 = make(chan int, 3)
intChan2 <- 20
//num := <-intChan2 // 读取报错
fmt.Println("intChan2:", intChan2)
//声明为只读:
var intChan3 <-chan int // 管道具备<- 只读性质
if intChan3 != nil {
num1 := <-intChan3
fmt.Println("num1:", num1)
}
//intChan3<- 30 报错
}
/*
intChan2: 0xc00007a080
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 4. 无缓冲的通道
Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。
如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
阻塞指的是由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足才解除阻塞。
同步指的是在两个或多个协程(线程)之间,保持数据内容一致性的机制。
下图展示两个 goroutine 如何利用无缓冲的通道来共享一个值。
本质就是:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换
// 这个示例程序展示如何用无缓冲的通道来模拟:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换
// 2 个goroutine 间的网球比赛
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wgp 用来等待程序结束
var wgp sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main 是所有Go 程序的入口
func main() {
// 创建一个无缓冲的通道
court := make(chan int)
// 计数加 2,表示要等待两个goroutine
wgp.Add(2)
// 启动两个选手
go player("Nadal", court)
go player("Djokovic", court)
// 发球
court <- 1
// 等待游戏结束
wgp.Wait()
fmt.Println("游戏结束")
}
// player 模拟一个选手在打网球
func player(name string, court chan int) {
// 在函数退出时调用Done 来通知main 函数工作已经完成
defer wgp.Done()
for {
// 等待球被击打过来
ball, ok := <-court
if !ok {
// 如果通道被关闭,我们就赢了
fmt.Printf("Player %s Won\n", name)
return
}
// 选随机数,然后用这个数来判断我们是否丢球
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// 关闭通道,表示我们输了
close(court)
return
}
// 显示击球数,并将击球数加1
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// 将球打向对手
court <- ball
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 无缓冲的通道-阻塞
如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
阻塞指的是由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足才解除阻塞。
package main
import (
"fmt"
"sync"
"time"
)
var wgroup1 sync.WaitGroup //只定义无需赋值
// 写:
func writeData2(intChan chan int) {
defer wgroup1.Done()
for i := 1; i <= 10; i++ {
intChan <- i
fmt.Println("写入的数据为:", i)
time.Sleep(time.Second * 5)
}
close(intChan)
}
// 读:
func readData2(intChan chan int) {
defer wgroup1.Done()
//遍历:
for v := range intChan {
fmt.Println("读取的数据为:", v)
time.Sleep(time.Second)
}
}
func main() { //主线程
// 读写不一致读取协程发送阻塞(5秒写一次,一秒读取一次,这样就导致读取协程发送阻塞)
intChan := make(chan int)
wgroup1.Add(2)
//开启读和写的协程:
go writeData2(intChan)
go readData2(intChan)
//主线程一直在阻塞,什么时候wgroup1减为0了,就停止
wgroup1.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 5. 带缓冲的通道
Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。
在无缓冲通道的基础上,为通道增加一个有限大小的存储空间形成带缓冲通道。带缓冲通道在发送时无需等待接收方接收即可完成发送过程,并且不会发生阻塞,只有当存储空间满时才会发生阻塞。同理,如果缓冲通道中有数据,接收时将不会发生阻塞,直到通道中没有数据可读时,通道将会再度阻塞。
# 创建带缓冲通道
通道实例 := make(chan 通道类型, 缓冲大小)
通道类型:和无缓冲通道用法一致,影响通道发送和接收的数据类型。
缓冲大小:决定通道最多可以保存的元素数量。
通道实例:被创建出的通道实例。
# 示例
package main
import "fmt"
func main() {
// 创建一个3个元素缓冲大小的整型通道
ch := make(chan int, 3)
// 查看当前通道的大小
fmt.Println(len(ch))
// 发送3个整型元素到通道
ch <- 1
ch <- 2
ch <- 3
// 查看当前通道的大小
fmt.Println(len(ch))
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 带缓冲的通道-阻塞
带缓冲通道在很多特性上和无缓冲通道是类似的。无缓冲通道可以看作是长度永远为 0 的带缓冲通道。因此根据这个特性,带缓冲通道在下面列举的情况下依然会发生阻塞:
带缓冲通道被填满时,尝试再次发送数据时发生阻塞。
带缓冲通道为空时,尝试接收数据时发生阻塞。
一直在读取,一直在写入就会发送阻塞
package main
import (
"fmt"
"sync"
)
var wgroup sync.WaitGroup //只定义无需赋值
// 写:
func writeData1(intChan chan int) {
defer wgroup.Done()
for i := 1; i <= 10; i++ {
intChan <- i
fmt.Println("写入的数据为:", i)
}
close(intChan)
}
// 读:
func readData1(intChan chan int) {
defer wgroup.Done()
for v := range intChan {
fmt.Println("读取的数据为:", v)
//time.Sleep(time.Second)
}
}
func main() { //主线程
intChan := make(chan int, 10)
wgroup.Add(2)
go writeData1(intChan) // 一直在读
//go readData1(intChan) // 一直在写
wgroup.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 为什么Go语言对通道要限制长度而不提供无限长度的通道?
我们知道通道(channel)是在两个 goroutine 间通信的桥梁。使用 goroutine 的代码必然有一方提供数据,一方消费数据。当提供数据一方的数据供给速度大于消费方的数据处理速度时,如果通道不限制长度,那么内存将不断膨胀直到应用崩溃。因此,限制通道的长度有利于约束数据提供方的供给速度,供给数据量必须在消费方处理量+通道长度的范围内,才能正常地处理数据
# 6. 超时机制 select
Go语言没有提供直接的超时处理机制,所谓超时可以理解为当我们上网浏览一些网站时,如果一段时间之后不作操作,就需要重新登录。
那么我们应该如何实现这一功能呢,这时就可以使用 select 来设置超时。
虽然 select 机制不是专门为超时而设计的,却能很方便的解决超时问题,因为 select 的特点是只要其中有一个 case 已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况。
超时机制本身虽然也会带来一些问题,比如在运行比较快的机器或者高速的网络上运行正常的程序,到了慢速的机器或者网络上运行就会出问题,从而出现结果不一致的现象,但从根本上来说,解决死锁问题的价值要远大于所带来的问题。
# 语法格式
select 的用法与 switch 语言非常类似,由 select 开始一个新的选择块,每个选择条件由 case 语句来描述。
与 switch 语句相比,select 有比较多的限制,其中最大的一条限制就是每个 case 语句里必须是一个 IO 操作,大致的结构如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
2
3
4
5
6
7
8
在一个 select 语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。
如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有如下两种可能的情况:
如果给出了 default 语句,那么就会执行 default 语句,同时程序的执行会从 select 语句后的语句中恢复;
如果没有 default 语句,那么 select 语句将被阻塞,直到至少有一个通信可以进行下去。
# 代码示例
package main
import (
"fmt"
"time"
)
func main() {
// 往一个通道写入数据,如果超过3秒没有写入就往quit退出通道添加一个true然后在退出程序
// 定义二个通道
ch := make(chan int)
quit := make(chan bool)
// 新开一个协程
go func() {
for {
select {
case num := <-ch:
fmt.Println("num = ", num)
case <-time.After(3 * time.Second):
fmt.Println("超时")
quit <- true
}
}
}()
// 每秒写入数据
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
// 读取quit通道数据判断程序是否可以退出
_, isTrue := <-quit
if isTrue {
fmt.Println("程序结束")
}
}
/*
num = 0
num = 1
num = 2
num = 3
num = 4
超时
程序结束
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47