Monday 13 May 2013

Parallel and ordered

One of Go's greatest features is its built-in support for concurrency. It allows to easily express concurrency algorithms without needing third party libraries or similar solutions as in other programming languages. This feature combined with its multi-core capabilities allow to easily express algorithms that perform several tasks in parallel easily and efficiently. For example, for me it is a common situation to write algorithms that perform a single task over a series of elements or records, like processing a group of input files.

In Go, a naive and simplify way to accomplish this kind of tasks would look something like:
1:  package main  
2:    
3:  import (  
4:       "fmt"  
5:       "runtime"  
6:       "time"  
7:  )  
8:    
9:  func init() {  
10:       runtime.GOMAXPROCS(4)  
11:  }  
12:    
13:  func send(ch chan int, i int) {  
14:       time.Sleep(2e9) // Processing goes here  
15:       // The output is sent via a specialized channel  
16:       ch <- i  
17:  }  
18:    
19:  func main() {  
20:       ch := make(chan int, 4)  
21:       fired := 0  
22:       for _, i := range []int{0, 1, 2, 3} {  
23:            fired++  
24:            go send(ch, i)  
25:       }  
26:    
27:  LOOP:  
28:       for {  
29:            select {  
30:            case v := <-ch:  
31:                 fired--  
32:                 fmt.Println(v)  
33:            default:  
34:                 if fired == 0 {  
35:                      break LOOP  
36:                 } else {  
37:                      continue  
38:                 }  
39:            }  
40:       }  
41:  }  
42:    

If you run the above program you would get something like this:
 $ go build goChans.go && time ./goChans   
 1  
 0  
 3  
 2  
 real     0m2.006s  
 user     0m2.004s  
 sys     0m0.000s  

There are two things to note here:
  • The four goroutines fired in line 24 are effectively run in parallel (the total time of the program is roughly 2 seconds even when the workers slept for 2 seconds each)
  • The order of the output is not ordered, meaning that the order in which the goroutines finish determines the order in which their messages are printed.
This last point means that in real examples, the time a goroutine spends in computing determines the order of the global output of the program. This may or may not be a problem depending on the requirements of your program.

If you want the order to be preserved you have to tweak the program a bit. Here I will present  2 options that rely on channels to do the task. Both helped me to understand better how channels work and how to use them effectively.



Solution 1: Synchronization channels


The first alternative uses specialized channels to synchronize how the results are passed down the main output channel:
1:  package main  
2:    
3:  import (  
4:       "fmt"  
5:       "runtime"  
6:       "time"  
7:  )  
8:    
9:  func init() {  
10:       runtime.GOMAXPROCS(4)  
11:  }  
12:    
13:  func send(ch, prevc, nextc chan int, i int) {  
14:       time.Sleep(2e9)  
15:       <-prevc // wait for prev goroutine to send its result  
16:       ch <- i  
17:       nextc <- 0 // notify next goroutine  
18:  }  
19:    
20:  func main() {  
21:       ch := make(chan int, 4)  
22:       fired := 0  
23:       prevc := make(chan int)  
24:       origc := prevc  
25:       nextc := make(chan int)  
26:       for _, i := range []int{0, 1, 2, 3} {  
27:            fired++  
28:            go send(ch, prevc, nextc, i)  
29:            prevc = nextc  
30:            nextc = make(chan int)  
31:       }  
32:       origc <- 0  
33:    
34:  LOOP:  
35:       for {  
36:            select {  
37:            case v := <-ch:  
38:                 fired--  
39:                 fmt.Println(v)  
40:            default:  
41:                 if fired == 0 {  
42:                      break LOOP  
43:                 } else {  
44:                      continue  
45:                 }  
46:            }  
47:       }  
48:  }  
49:    

These specialized channels are connected in the goroutines in such a way that when the first one finishes its computation, it stores the value in the main output channel and sends a signal to the next goroutine telling her that the output channel is available. This is repeated for all the goroutines.

These synchronization channels are first fired in line 32, after all goroutines have been already spawned.

Note that, because all the goroutines run in parallel, the computation (in this case the sleep) is done in parallel, and only the population of the output channel is synchronized:
 $ go build goChans_ordered1.go && time ./goChans_ordered1   
 0  
 1  
 2  
 3  
 real     0m2.006s  
 user     0m0.004s  
 sys     0m0.000s  


Solution 2: Pre-booking slots


Another approach to achieve the same result is to "book" a place in the final output channel as we create the goroutines:

1:  package main  
2:    
3:  import (  
4:       "fmt"  
5:       "runtime"  
6:       "time"  
7:  )  
8:    
9:  func init() {  
10:       runtime.GOMAXPROCS(4)  
11:  }  
12:    
13:  func send(ch chan int, i int) {  
14:       time.Sleep(2e9)  
15:       ch <- i  
16:  }  
17:    
18:  func main() {  
19:       ch := make(chan chan int, 4)  
20:       for _, i := range []int{0, 1, 2, 3} {  
21:            chval := make(chan int)  
22:            ch <- chval  
23:            go send(chval, i)  
24:       }  
25:       close(ch)  
26:    
27:       for sch := range ch {  
28:            select {  
29:            case v := <-sch:  
30:                 fmt.Println(v)  
31:            }  
32:       }  
33:  }  
34:    

Note the initialization of the output channel in line 19.  Here, we are creating a channel of channels. Instead of accepting ints, the main output channel now accepts unbuffered channels of ints. These channels are created (line 21) and sent to the main output channel (line 22) immediately before spawning the goroutines (line 23). This means that by the time all the goroutines are spawned, the channel is already full and we can close it (line 25), but the four inner (underlying) channels are still open and we can select on them. This simplifies the code collecting the data.

I like the mental representation of this solution a lot: Just before the creation of the workers, we apply a "hose" connecting the worker with the output channel. Once all the hoses are in place, we can close the output channel and wait for the results that come out of the hoses. Because we are ranging over all the hoses in order (line 27), the output will also be ordered as we wanted.

If this channel of channels data structure scares you, there is a slightly different approach that may be a bit simpler: Instead of creating a channel of channels you can create a slice and append the channels to the slice as the goroutines are fired (i.e. substituting the main output channel for a slice).

No comments:

Post a Comment