Building a concurrent Pub/Sub (Publish/Subscribe) system is an excellent way to deepen your understanding of Go's concurrency primitives, particularly channels. In a Pub/Sub system, publishers send messages to a topic, and subscribers receive messages from topics they're interested in, all concurrently without directly communicating with each other.
Understanding Go Channels
Go channels are powerful tools for communication between goroutines, allowing the passage of typed data. They are crucial in building a concurrent pub/sub system due to their ability to handle multiple senders and receivers efficiently.
// Basic channel creation
messages := make(chan string)
Creating a channel involves specifying the type of data it will carry. In the example above, messages is a channel that carries strings.
Developing the Pub/Sub System
To construct our pub/sub system, we'll need:
- A Message Bus to manage channels for each topic.
- Publishers that send messages to a topic.
- Subscribers that listen for messages on topics.
Creating a Message Bus
The message bus will keep track of all topics and their subscribers using a map of channels.
type MessageBus struct {
topics map[string][]chan string
}
func NewMessageBus() *MessageBus {
return &MessageBus{
topics: make(map[string][]chan string),
}
}
We add a function to allow subscription to a topic.
func (mb *MessageBus) Subscribe(topic string) <-chan string {
ch := make(chan string)
mb.topics[topic] = append(mb.topics[topic], ch)
return ch
}
The Subscribe method registers a new channel for the given topic and returns it for subscriber use.
Developing the Publisher
The publisher will send messages to a topic, and the message bus will propagate them to all registered subscribers for that topic.
func (mb *MessageBus) Publish(topic, msg string) {
if chans, found := mb.topics[topic]; found {
for _, ch := range chans {
go func(c chan string) {
c <- msg
}(ch)
}
}
}
The Publish method sends a message to all channels registered under a topic. Note the use of a goroutine to concurrently send the message.
Setting Up the Subscriber
Subscribers read messages from their channel as they arrive. Let's implement a subscriber function to illustrate this.
func subscriber(id int, topic string, ch <-chan string) {
for msg := range ch {
fmt.Printf("Subscriber %d received on %s: %s\n", id, topic, msg)
}
}
This subscriber function continuously listens to messages on its channel and prints them as they arrive.
Bringing It All Together
Finally, here's how to wire everything up in a main function:
func main() {
bus := NewMessageBus()
sub1 := bus.Subscribe("topic1")
sub2 := bus.Subscribe("topic1")
go subscriber(1, "topic1", sub1)
go subscriber(2, "topic1", sub2)
bus.Publish("topic1", "Hello, World!")
bus.Publish("topic1", "Go Concurrency is Awesome!")
// Wait to allow message processing
time.Sleep(time.Second * 1)
}
This main function sets up a message bus, subscribes two subscribers to a topic, and publishes two messages. Note the use of time.Sleep to ensure that all messages are processed before the program exits.
By leveraging Go's channels and goroutines, we create an efficient Pub/Sub system that's simple and scalable. Experiment with different data types, subscription models, and error handling capabilities to further enhance this foundational system.