🐹 Go - Multiple Goroutines
Updated at 2014-03-09 21:43
Using channels to communicate between goroutines.
package main
import (
"fmt"
"time"
)
// Writes integers to the channel.
func MrWriter(numberChan chan<- int) {
numberChan <- 100
}
// Reads integers from the channel.
func MsReader(numberChan <-chan int) {
i := <-numberChan
fmt.Printf("Someone sent me %d.\n", i)
}
func main() {
numberChan := make(chan int)
go MsReader(numberChan)
go MrWriter(numberChan)
time.Sleep(2 * time.Second)
fmt.Print("End.")
// => Someone sent me 100. End.
}
Channel counter approach. Acting after multiple goroutines complete while using a channel and a count.
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
numberChan := make(chan int)
for i := 0; i < 10; i++ {
go func(i int) {
sleep := time.Duration(rand.Intn(1000))
time.Sleep(sleep * time.Millisecond)
numberChan <- i
}(i)
}
for i := 0; i < 10; i++ {
fmt.Println(<-numberChan)
}
}
Wait group approach. Acting after multiple goroutines have complete while using a wait group.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
messages := make(chan int)
var waitGroup sync.WaitGroup
waitGroup.Add(3)
// You could use `waitGroup.Add(1)` three times if you want.
go func() {
defer waitGroup.Done() // do when exits the function
time.Sleep(time.Second * 3)
messages <- 1
}()
go func() {
defer waitGroup.Done() // do when exits the function
time.Sleep(time.Second * 2)
messages <- 2
}()
go func() {
defer waitGroup.Done() // do when exits the function
time.Sleep(time.Second * 1)
messages <- 3
}()
go func() {
for i := range messages {
fmt.Println(i)
}
}()
waitGroup.Wait()
fmt.Println("All Done!")
// => 3, 2, 1, All Done
}
Starting multiple goroutines at the same time. You can kill goroutines using quit channels.
package main
import (
"fmt"
"time"
)
func worker(receiveToStart chan bool) {
<-receiveToStart
fmt.Print("x")
}
func main() {
closeToStart := make(chan bool)
for i := 0; i < 100; i++ {
go worker(closeToStart)
}
close(closeToStart)
time.Sleep(time.Second)
}
Worker pool pattern. Creating a goroutine worker pool.
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "processing", j)
time.Sleep(time.Second)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
workerCount := 3
jobCount := 10
for w := 1; w <= workerCount; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= jobCount; j++ {
jobs <- j
}
close(jobs) // These are all the jobs.
for a := 1; a <= jobCount; a++ {
result := <-results
fmt.Println("got result", result)
}
}
Worker pool using wait groups.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
workGroup := sync.WaitGroup{}
workQueue := make(chan string)
workings := []string{"gorilla", "banana", "cat", "dog"}
workerCount := 3
for id := 0; id < workerCount; id++ {
// define outside of the goroutine so reaching .Done doesn't
// pass through if goroutines take their time to start
workGroup.Add(1)
go doIt(id, workQueue, &workGroup)
}
// queue all items to work queue
for i := 0; i < len(workings); i++ {
workQueue <- workings[i]
}
// close the queue after all
// values have been received
close(workQueue)
// wait for works to finish
fmt.Println("main is waiting...")
workGroup.Wait()
fmt.Println("all done!")
}
func doIt(workerId int, workQueue <-chan string, workGroup *sync.WaitGroup) {
defer workGroup.Done()
fmt.Printf("[%v] is running\n", workerId)
for item := range workQueue {
fmt.Printf("[%v] working on %v\n", workerId, item)
time.Sleep(time.Second)
fmt.Printf("[%v] done with %v\n", workerId, item)
}
fmt.Printf("[%v] is dead\n", workerId)
return
}
Worker pool using with multiple work queues. Note that this only works if the channels are unbuffered.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
closeToKillPool := make(chan struct{})
workQueue1 := make(chan string)
workQueue2 := make(chan string)
workings1 := []string{"car", "plane", "bike", "motorcycle", "house"}
workings2 := []string{"flag", "morale", "taxes"}
workGroup := sync.WaitGroup{}
workerCount := 3
for id := 0; id < workerCount; id++ {
workGroup.Add(1)
go doIt(id, workQueue1, workQueue2, &workGroup, closeToKillPool)
}
queueGroup := sync.WaitGroup{}
queueGroup.Add(1)
go queueItems(workQueue1, workings1, &queueGroup)
queueGroup.Add(1)
go queueItems(workQueue2, workings2, &queueGroup)
// After everything is queued,
// we can send the kill signal which is
// caught up by the workers when both work
// queues are empty
queueGroup.Wait()
fmt.Println("main: all jobs queued...")
close(closeToKillPool)
fmt.Println("main: waiting for workers to finish...")
workGroup.Wait()
fmt.Println("all done!")
}
func doIt(workerId int, workQueue1 <-chan string, workQueue2 <-chan string, workGroup *sync.WaitGroup, receiveToDie <-chan struct{}) {
defer workGroup.Done()
fmt.Printf("[%v] IS ALIVE!\n", workerId)
for {
select {
case item := <-workQueue1:
fmt.Printf("[%v] %v: building...\n", workerId, item)
time.Sleep(time.Second * 2)
fmt.Printf("[%v] %v: done \n", workerId, item)
case item := <-workQueue2:
fmt.Printf("[%v] %v: raising...\n", workerId, item)
time.Sleep(time.Second)
fmt.Printf("[%v] %v: done!\n", workerId, item)
case <-receiveToDie:
fmt.Printf("[%v] IS DEAD :(\n", workerId)
return
}
}
}