In Go, a naive and simplify way to accomplish this kind of tasks would look something like:
1: package main
2:
3: import (
4: "fmt"
5: "runtime"
6: "time"
7: )
8:
9: func init() {
10: runtime.GOMAXPROCS(4)
11: }
12:
13: func send(ch chan int, i int) {
14: time.Sleep(2e9) // Processing goes here
15: // The output is sent via a specialized channel
16: ch <- i
17: }
18:
19: func main() {
20: ch := make(chan int, 4)
21: fired := 0
22: for _, i := range []int{0, 1, 2, 3} {
23: fired++
24: go send(ch, i)
25: }
26:
27: LOOP:
28: for {
29: select {
30: case v := <-ch:
31: fired--
32: fmt.Println(v)
33: default:
34: if fired == 0 {
35: break LOOP
36: } else {
37: continue
38: }
39: }
40: }
41: }
42:
If you run the above program you would get something like this:
$ go build goChans.go && time ./goChans
1
0
3
2
real 0m2.006s
user 0m2.004s
sys 0m0.000s
There are two things to note here:
- The four goroutines fired in line 24 are effectively run in parallel (the total time of the program is roughly 2 seconds even when the workers slept for 2 seconds each)
- The order of the output is not ordered, meaning that the order in which the goroutines finish determines the order in which their messages are printed.
If you want the order to be preserved you have to tweak the program a bit. Here I will present 2 options that rely on channels to do the task. Both helped me to understand better how channels work and how to use them effectively.
Solution 1: Synchronization channels
The first alternative uses specialized channels to synchronize how the results are passed down the main output channel:
1: package main
2:
3: import (
4: "fmt"
5: "runtime"
6: "time"
7: )
8:
9: func init() {
10: runtime.GOMAXPROCS(4)
11: }
12:
13: func send(ch, prevc, nextc chan int, i int) {
14: time.Sleep(2e9)
15: <-prevc // wait for prev goroutine to send its result
16: ch <- i
17: nextc <- 0 // notify next goroutine
18: }
19:
20: func main() {
21: ch := make(chan int, 4)
22: fired := 0
23: prevc := make(chan int)
24: origc := prevc
25: nextc := make(chan int)
26: for _, i := range []int{0, 1, 2, 3} {
27: fired++
28: go send(ch, prevc, nextc, i)
29: prevc = nextc
30: nextc = make(chan int)
31: }
32: origc <- 0
33:
34: LOOP:
35: for {
36: select {
37: case v := <-ch:
38: fired--
39: fmt.Println(v)
40: default:
41: if fired == 0 {
42: break LOOP
43: } else {
44: continue
45: }
46: }
47: }
48: }
49:
These specialized channels are connected in the goroutines in such a way that when the first one finishes its computation, it stores the value in the main output channel and sends a signal to the next goroutine telling her that the output channel is available. This is repeated for all the goroutines.
These synchronization channels are first fired in line 32, after all goroutines have been already spawned.
Note that, because all the goroutines run in parallel, the computation (in this case the sleep) is done in parallel, and only the population of the output channel is synchronized:
$ go build goChans_ordered1.go && time ./goChans_ordered1
0
1
2
3
real 0m2.006s
user 0m0.004s
sys 0m0.000s
Solution 2: Pre-booking slots
Another approach to achieve the same result is to "book" a place in the final output channel as we create the goroutines:
1: package main
2:
3: import (
4: "fmt"
5: "runtime"
6: "time"
7: )
8:
9: func init() {
10: runtime.GOMAXPROCS(4)
11: }
12:
13: func send(ch chan int, i int) {
14: time.Sleep(2e9)
15: ch <- i
16: }
17:
18: func main() {
19: ch := make(chan chan int, 4)
20: for _, i := range []int{0, 1, 2, 3} {
21: chval := make(chan int)
22: ch <- chval
23: go send(chval, i)
24: }
25: close(ch)
26:
27: for sch := range ch {
28: select {
29: case v := <-sch:
30: fmt.Println(v)
31: }
32: }
33: }
34:
Note the initialization of the output channel in line 19. Here, we are creating a channel of channels. Instead of accepting ints, the main output channel now accepts unbuffered channels of ints. These channels are created (line 21) and sent to the main output channel (line 22) immediately before spawning the goroutines (line 23). This means that by the time all the goroutines are spawned, the channel is already full and we can close it (line 25), but the four inner (underlying) channels are still open and we can select on them. This simplifies the code collecting the data.
I like the mental representation of this solution a lot: Just before the creation of the workers, we apply a "hose" connecting the worker with the output channel. Once all the hoses are in place, we can close the output channel and wait for the results that come out of the hoses. Because we are ranging over all the hoses in order (line 27), the output will also be ordered as we wanted.
If this channel of channels data structure scares you, there is a slightly different approach that may be a bit simpler: Instead of creating a channel of channels you can create a slice and append the channels to the slice as the goroutines are fired (i.e. substituting the main output channel for a slice).
No comments:
Post a Comment