ruk·si

🐹 Go
Multiple Goroutines

Updated at 2014-03-09 21:43

Using channels to communicate between goroutines.

package main

import (
    "fmt"
    "time"
)

// Writes integers to the channel.
func MrWriter(numberChan chan<- int) {
    numberChan <- 100
}

// Reads integers from the channel.
func MsReader(numberChan <-chan int) {
    i := <-numberChan
    fmt.Printf("Someone sent me %d.\n", i)
}

func main() {
    numberChan := make(chan int)
    go MsReader(numberChan)
    go MrWriter(numberChan)
    time.Sleep(2 * time.Second)
    fmt.Print("End.")
    // => Someone sent me 100. End.
}

Channel counter approach. Acting after multiple goroutines complete while using a channel and a count.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    numberChan := make(chan int)
    for i := 0; i < 10; i++ {
        go func(i int) {
            sleep := time.Duration(rand.Intn(1000))
            time.Sleep(sleep * time.Millisecond)
            numberChan <- i
        }(i)
    }
    for i := 0; i < 10; i++ {
        fmt.Println(<-numberChan)
    }
}

Wait group approach. Acting after multiple goroutines have complete while using a wait group.

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    messages := make(chan int)
    var waitGroup sync.WaitGroup

    waitGroup.Add(3)
    // You could use `waitGroup.Add(1)` three times if you want.

    go func() {
        defer waitGroup.Done() // do when exits the function
        time.Sleep(time.Second * 3)
        messages <- 1
    }()
    go func() {
        defer waitGroup.Done() // do when exits the function
        time.Sleep(time.Second * 2)
        messages <- 2
    }()
    go func() {
        defer waitGroup.Done() // do when exits the function
        time.Sleep(time.Second * 1)
        messages <- 3
    }()

    go func() {
        for i := range messages {
            fmt.Println(i)
        }
    }()

    waitGroup.Wait()
    fmt.Println("All Done!")
    // => 3, 2, 1, All Done
}

Starting multiple goroutines at the same time. You can kill goroutines using quit channels.

package main

import (
    "fmt"
    "time"
)

func worker(receiveToStart chan bool) {
    <-receiveToStart
    fmt.Print("x")
}

func main() {
    closeToStart := make(chan bool)
    for i := 0; i < 100; i++ {
        go worker(closeToStart)
    }
    close(closeToStart)
    time.Sleep(time.Second)
}

Worker pool pattern. Creating a goroutine worker pool.

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "processing", j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    workerCount := 3
    jobCount := 10

    for w := 1; w <= workerCount; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= jobCount; j++ {
        jobs <- j
    }
    close(jobs) // These are all the jobs.

    for a := 1; a <= jobCount; a++ {
        result := <-results
        fmt.Println("got result", result)
    }
}

Worker pool using wait groups.

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    workGroup := sync.WaitGroup{}
    workQueue := make(chan string)
    workings := []string{"gorilla", "banana", "cat", "dog"}

    workerCount := 3
    for id := 0; id < workerCount; id++ {
        // define outside of the goroutine so reaching .Done doesn't
        // pass through if goroutines take their time to start
        workGroup.Add(1)
        go doIt(id, workQueue, &workGroup)
    }

    // queue all items to work queue
    for i := 0; i < len(workings); i++ {
        workQueue <- workings[i]
    }

    // close the queue after all
    // values have been received
    close(workQueue)

    // wait for works to finish
    fmt.Println("main is waiting...")
    workGroup.Wait()
    fmt.Println("all done!")
}

func doIt(workerId int, workQueue <-chan string, workGroup *sync.WaitGroup) {
    defer workGroup.Done()
    fmt.Printf("[%v] is running\n", workerId)
    for item := range workQueue {
        fmt.Printf("[%v] working on %v\n", workerId, item)
        time.Sleep(time.Second)
        fmt.Printf("[%v] done with %v\n", workerId, item)
    }
    fmt.Printf("[%v] is dead\n", workerId)
    return
}

Worker pool using with multiple work queues. Note that this only works if the channels are unbuffered.

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    closeToKillPool := make(chan struct{})

    workQueue1 := make(chan string)
    workQueue2 := make(chan string)

    workings1 := []string{"car", "plane", "bike", "motorcycle", "house"}
    workings2 := []string{"flag", "morale", "taxes"}

    workGroup := sync.WaitGroup{}
    workerCount := 3
    for id := 0; id < workerCount; id++ {
        workGroup.Add(1)
        go doIt(id, workQueue1, workQueue2, &workGroup, closeToKillPool)
    }

    queueGroup := sync.WaitGroup{}
    queueGroup.Add(1)
    go queueItems(workQueue1, workings1, &queueGroup)
    queueGroup.Add(1)
    go queueItems(workQueue2, workings2, &queueGroup)

    // After everything is queued,
    // we can send the kill signal which is
    // caught up by the workers when both work
    // queues are empty
    queueGroup.Wait()
    fmt.Println("main: all jobs queued...")
    close(closeToKillPool)

    fmt.Println("main: waiting for workers to finish...")
    workGroup.Wait()
    fmt.Println("all done!")
}

func doIt(workerId int, workQueue1 <-chan string, workQueue2 <-chan string, workGroup *sync.WaitGroup, receiveToDie <-chan struct{}) {
    defer workGroup.Done()
    fmt.Printf("[%v] IS ALIVE!\n", workerId)
    for {
        select {
        case item := <-workQueue1:
            fmt.Printf("[%v] %v: building...\n", workerId, item)
            time.Sleep(time.Second * 2)
            fmt.Printf("[%v] %v: done \n", workerId, item)
        case item := <-workQueue2:
            fmt.Printf("[%v] %v: raising...\n", workerId, item)
            time.Sleep(time.Second)
            fmt.Printf("[%v] %v: done!\n", workerId, item)
        case <-receiveToDie:
            fmt.Printf("[%v] IS DEAD :(\n", workerId)
            return
        }
    }
}

Sources