In modern software development, handling concurrency efficiently is essential for designing scalable and responsive systems. In Go, one elegant way to handle concurrency is through the use of goroutines and channels. Two prominent concurrency patterns are fan-in and fan-out. This article will walk through these patterns with examples in Go, illustrating how they can solve typical concurrent processing problems.
Understanding Fan-out
The fan-out pattern allows spreading the workload across multiple goroutines. Each goroutine handles a portion of the incoming data stream, enabling parallel processing. This pattern is useful when you want to take advantage of multi-core processors for speeding up processing tasks.
Fan-out Example
package main
import (
"fmt"
"sync"
)
// worker function that processes an integer
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
results <- j * 2 // simulate processing by doubling the input
fmt.Printf("Worker %d finished job %d\n", id, j)
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
for w := 1; w <= 3; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(w)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
close(results)
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}
Understanding Fan-in
The fan-in pattern funnels multiple input streams into a single channel, collecting results from several processing pipelines. This pattern simplifies aggregating data from concurrent operations.
Fan-in Example
package main
import (
"fmt"
"sync"
)
// merge function combines multiple channels into a single channel
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs.
// Output copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are done.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
chan1 := make(chan int, 2)
chan2 := make(chan int, 2)
chan3 := make(chan int, 2)
for i := 1; i <= 2; i++ {
chan1 <- i
chan2 <- i * 10
chan3 <- i * 100
}
close(chan1)
close(chan2)
close(chan3)
for n := range merge(chan1, chan2, chan3) {
fmt.Println(n)
}
}
In this simple example, the merge
function collects integer values from three input channels, combining them into a single channel to be processed further.
Combining Fan-in and Fan-out
In practice, fan-in and fan-out are often used together to balance concurrent workloads efficiently. The combination of both patterns allows a flexible structure in designing systems that require input data to be distributed across multiple workers and then aggregated back into a coherent stream.