Sling Academy
Home/Golang/Building a Concurrent Pub/Sub System with Go Channels

Building a Concurrent Pub/Sub System with Go Channels

Last updated: November 27, 2024

Building a concurrent Pub/Sub (Publish/Subscribe) system is an excellent way to deepen your understanding of Go's concurrency primitives, particularly channels. In a Pub/Sub system, publishers send messages to a topic, and subscribers receive messages from topics they're interested in, all concurrently without directly communicating with each other.

Understanding Go Channels

Go channels are powerful tools for communication between goroutines, allowing the passage of typed data. They are crucial in building a concurrent pub/sub system due to their ability to handle multiple senders and receivers efficiently.

// Basic channel creation
messages := make(chan string)

Creating a channel involves specifying the type of data it will carry. In the example above, messages is a channel that carries strings.

Developing the Pub/Sub System

To construct our pub/sub system, we'll need:

  • A Message Bus to manage channels for each topic.
  • Publishers that send messages to a topic.
  • Subscribers that listen for messages on topics.

Creating a Message Bus

The message bus will keep track of all topics and their subscribers using a map of channels.

type MessageBus struct {
    topics map[string][]chan string
}

func NewMessageBus() *MessageBus {
    return &MessageBus{
        topics: make(map[string][]chan string),
    }
}

We add a function to allow subscription to a topic.

func (mb *MessageBus) Subscribe(topic string) <-chan string {
    ch := make(chan string)
    mb.topics[topic] = append(mb.topics[topic], ch)
    return ch
}

The Subscribe method registers a new channel for the given topic and returns it for subscriber use.

Developing the Publisher

The publisher will send messages to a topic, and the message bus will propagate them to all registered subscribers for that topic.

func (mb *MessageBus) Publish(topic, msg string) {
    if chans, found := mb.topics[topic]; found {
        for _, ch := range chans {
            go func(c chan string) {
                c <- msg
            }(ch)
        }
    }
}

The Publish method sends a message to all channels registered under a topic. Note the use of a goroutine to concurrently send the message.

Setting Up the Subscriber

Subscribers read messages from their channel as they arrive. Let's implement a subscriber function to illustrate this.

func subscriber(id int, topic string, ch <-chan string) {
    for msg := range ch {
        fmt.Printf("Subscriber %d received on %s: %s\n", id, topic, msg)
    }
}

This subscriber function continuously listens to messages on its channel and prints them as they arrive.

Bringing It All Together

Finally, here's how to wire everything up in a main function:

func main() {
    bus := NewMessageBus()
    sub1 := bus.Subscribe("topic1")
    sub2 := bus.Subscribe("topic1")

    go subscriber(1, "topic1", sub1)
    go subscriber(2, "topic1", sub2)

    bus.Publish("topic1", "Hello, World!")
    bus.Publish("topic1", "Go Concurrency is Awesome!")

    // Wait to allow message processing
    time.Sleep(time.Second * 1)
}

This main function sets up a message bus, subscribes two subscribers to a topic, and publishes two messages. Note the use of time.Sleep to ensure that all messages are processed before the program exits.

By leveraging Go's channels and goroutines, we create an efficient Pub/Sub system that's simple and scalable. Experiment with different data types, subscription models, and error handling capabilities to further enhance this foundational system.

Next Article: Concurrency Debugging Tools: Tracing and Profiling in Go

Previous Article: Concurrency Challenges: Writing an Async Task Manager in Go

Series: Concurrency and Synchronization in Go

Golang

Related Articles

You May Also Like

  • How to remove HTML tags in a string in Go
  • How to remove special characters in a string in Go
  • How to remove consecutive whitespace in a string in Go
  • How to count words and characters in a string in Go
  • Relative imports in Go: Tutorial & Examples
  • How to run Python code with Go
  • How to generate slug from title in Go
  • How to create an XML sitemap in Go
  • How to redirect in Go (301, 302, etc)
  • Using Go with MongoDB: CRUD example
  • Auto deploy Go apps with CI/ CD and GitHub Actions
  • Fixing Go error: method redeclared with different receiver type
  • Fixing Go error: copy argument must have slice type
  • Fixing Go error: attempted to use nil slice
  • Fixing Go error: assignment to constant variable
  • Fixing Go error: cannot compare X (type Y) with Z (type W)
  • Fixing Go error: method has pointer receiver, not called with pointer
  • Fixing Go error: assignment mismatch: X variables but Y values
  • Fixing Go error: array index must be non-negative integer constant