Slow NATS Go Subscribers

Heads-up for anyone using Golang NATS client: setting up a subscription using Subscribe or QueueSubscribe will not setup a worker pool. As far as I can tell, using either Subscribe or QueueSubscribe will only setup a handler backed by a single goroutine.

What’s Your Evidence?

I setup a small experiment using a single sender and a single subscriber. The sender publishes an incrementing integer to a subject once every 100 milliseconds. The recipient subscribes to that subject and will display the integer when it’s passed to the hander. To simulate work, the recipient will sleep for a second before printing the response.

Here’s the source code of the sender:

package main

import ( “fmt” “github.com/nats-io/nats.go” “log” “time” )

func main() { nc, err := nats.Connect(“nats://127.0.0.1:4222”) if err != nil { log.Fatalf(“cannot create nats: %v”, err) } defer nc.Close()

i := 0
for {
    fmt.Printf("Sent: %v\n", i)
    nc.Publish("slow.horses", []byte(fmt.Sprintf("%d", i)))
    i++

    time.Sleep(50 * time.Millisecond)
}

}

And here’s the source code for the subscriber:

package main

import ( “fmt” “github.com/nats-io/nats.go” “log” “time” )

func main() { nc, err := nats.Connect(“nats://127.0.0.1:4222”) if err != nil { log.Fatalf(“cannot create nats: %v”, err) } defer nc.Close()

nc.QueueSubscribe("slow.horses", "main", func(msg *nats.Msg) {
    fmt.Printf("Received: %s\n", string(msg.Data))
    time.Sleep(1 * time.Second)
    msg.Ack()
})

<-make(chan int)

}

If you were to launch this at the same time, you’d see this:

If the client were multi-threaded, then I’d expect to see the subscriber printing a few times a second. But that’s not the case I see here.

Now granted, the client seems to do a decent job queuing the messages in some way. I can’t see it being queued at the NATS broker itself, since the pub-sub messaging scheme doesn’t support that — I think you’ll need to use Jetstream for that behaviour. I think they’re being queued by the client itself. The documentation makes reference to a pending limit which I think controls how many messages the client will keep in an in-memory queue. If the pending limit is reached, the NATS client should call the error handler indicating that it’s dropping the queued messages since the subscription handler is unable to keep up.

To confirm this, I bought the pending limit of the subscription way to 1 message, and added an error handler to log any errors the client would raise. This was the result:

Ok, So How Should This Be Fixed?

I can think of three ways to do this:

  • Setup multiple subscription handlers.

  • Have the subscriber send messages to a worker pool via a channel.

  • A hybrid approach of the two.

Below are the results of my experimentation.

Multiple Subscriptions

Multiple subscription handlers should be possible thanks to NATS queue groups, where a single message is sent to a group of subscriptions with the same queue name. It acts as form of load balancing that comes built-in to NATS.

In theory it should be possible to set up 10 subscription handlers for a single client and have it keep up with the rate of messages coming from the sender:

for i := 0; i < 10; i++ {
    nc.QueueSubscribe("slow.horses", "main", func(msg *nats.Msg) {
        fmt.Printf("Received: %s\n", string(msg.Data))
        time.Sleep(1 * time.Second)
        msg.Ack()
    })
}

This is the result:

It’s better than a single subscription handler, but it’s still having a bit of trouble keeping up. And it looks like you’re governed by how NATS balances the load across the subscriptions. If you take a look at this still from the GIF above, you’d see that message 8 was handled while the other subscriptions where handling messages in the 20’s.

The messages are handled out of order as well, but that’s to be expected.

Worker Pool

What about using a worker pool? I went back to a single subscription handler and created a 10 goroutine worker pool with backed by a buffered channel with 50 slots.

msgs := make(chan *nats.Msg, 50)
for i := 0; i < 10; i++ {
    go func() {
        for msg := range msgs {
            fmt.Printf("Received: %s\n", string(msg.Data))
            time.Sleep(1 * time.Second)
            msg.Ack()
         }
    }()
}

nc.QueueSubscribe(“slow.horses”, “main”, func(msg *nats.Msg) { msgs <- msg })

Here were the results of that:

I’m glad this actually worked. I was afraid that only acknowledging the message after waiting would’ve prevent either the broker or subscription handler to pull the next message, lest the client wanted to maintain some form of FIFO ordering. But that doesn’t seem to be the case.

The messages do seem to be coming in order now (although I wouldn’t want to rely on this behaviour), but the consumer seemed to be handling messages in a more “halting” fashion. I attribute this to the 10 goroutines picking up a message as soon as it’s available only to be blocked while the workers are “doing the work” (i.e. sleeping).

Does it make sense having the buffered channel? It’s something I tend to do whenever I build such a construct in Go, but it might be enough just letting the NATS consumer do the message queuing itself. Certainly it’s fast enough. If I were to add a log message to the subscription handler when the message was pulled but before it was sent to the channel, it looks like the subscription handler is able to pull messages at a decent rate, and is only “locked” to the speed of the worker pool when the channel buffer fills up.

The Hybrid Approach

Finally, the hybrid approach, where both a worker pool and multiple subscription handlers are used at the same time:

msgs := make(chan *nats.Msg, 50)
for i := 0; i < 10; i++ {
    go func() {
        for msg := range msgs {
            fmt.Printf("Received: %s\n", string(msg.Data))
            time.Sleep(1 * time.Second)
            msg.Ack()
        }
    }()
}

for i := 0; i < 5; i++ { nc.QueueSubscribe(“slow.horses”, “main”, func(msg *nats.Msg) { msgs <- msg }) }

Here hows that looked:

I was honestly a little surprised at the results, given how much it resembled a single subscription queue backed by a worker pool. I guess I shouldn’t have been though. All the subscribers are doing is pulling the message from their queue and placing it on the channel.

I should also note that although the GIF gives the impression that the messages are pulled in order, it’s actually not the case. If you look at the screenshot of the experiment when I terminated it, you would see the messages added to the channel out of order.

So does having multiple subscribers give you anything? It’s actually hard to tell. I added the pull message again, this time including the subscription handler number in square brackets:

It didn’t seem like much of a gain. In fact, if you were to compare this with the “pull” results from earlier, there’s not much of a difference at all:

It clear that we got parallel execution when we had multiple subscription handlers doing the work inline, so each one must be running on a separate goroutine. But I suspect that benefit is lost when all the subscription handlers are doing is putting the message onto a channel.

So What’s The Best Approach To Speed Things Up?

It will ultimately depend on your system, but based on my experimentation, I think either approach would work:

  • Having multiple subscription handlers with the same queue group, or,

  • Using a single subscription handler that sends the message to a worker pool

It all comes down to where you’d prefer to do your queueing. It seems to me that the NATS queue is good enough, but if it were up to me, I’d probably stick with a single subscription handler that would send the message to a worker pool, just to avoid the overhead of the NATS broker managing multiple subscriptions for a single process.

In either case, I’m not convinced it make sense going with the hybrid approach. Might be that if you were to go spelunking through the NATS client code, you’d find a reason to use it, but from my experiments, I don’t see the benefits myself.