Optimizing requests with a queue

With our background jobs interface in place, we can create an API that will queue our incoming data and flush it to the database at a defined frequency. Flushing our data in bulk will give us performance advantages.

The Queue

A basic Queue in Go is a slice. To extend the queue, you can call append, which allocates a larger slice if required. Since this is not thread safe, we need to use locking to ensure that the queue is modified safely.

// Queue provides a queuing structure for Incoming{}
type Queue struct {
	sync.RWMutex
	values []*Incoming
}

// NewQueue creates a new *Queue instance
func NewQueue() *Queue {
	return &Queue{
		values: make([]*Incoming, 0),
	}
}

We also need to hold several Queues to split up incoming write requests. If we only used one queue, we would have high lock contention between all the clients trying to write to the queue, as well as delays because of the flusher which needs to clear out the queue as it’s written to the database.

// NewQueues creates a slice of *Queue instances
func NewQueues(size int) []*Queue {
	result := make([]*Queue, size)
	for i := 0; i < size; i++ {
		result[i] = NewQueue()
	}
	return result
}

The queue needs a few functions, in order to write and flush data from the queue:

  • Push(*Incoming) error - add a new item to the queue, return error if any,
  • Clear() []*Incoming - return the current items in the queue and clear it,
  • Length() int - report the current queued item count

The Push function should return an error when the queue is full. It is up to you to limit the queue size, so that your application doesn’t eat up all your memory in case of traffic spikes or a delay in flushing the data to the database.

There is no reasonable limit here, so it’s up to you to implement this.

// Push adds a new item to the queue
func (p *Queue) Push(item *Incoming) error {
	p.Lock()
	defer p.Unlock()
	p.values = append(p.values, item)
	return nil
}

Flushing the queue to the database requires us to read the current items of the queue, and clear them for any future calls to Push(). The current implementation keeps memory usage low, but allocates memory when values are pushed to the queue.

// Clear returns current queue items and clears it
func (p *Queue) Clear() (result []*Incoming) {
	length := p.Length()

	p.Lock()
	defer p.Unlock()

	result, p.values = p.values[:length], p.values[length:]
	return
}

And finally - an utility function to get the size of the queue:

// Length returns the current queue size
func (p *Queue) Length() int {
	p.RLock()
	defer p.RUnlock()
	return len(p.values)
}

In order for a queue to be functional it needs two additional things:

  1. the producer: writing data to the queue,
  2. the consumer: reading data from the queue

The Producer

Our producer is easy to implement, we can just modify our service Push() function:

-       fields := strings.Join(IncomingFields, ",")
-       named := ":" + strings.Join(IncomingFields, ",:")
-
-       query := fmt.Sprintf("insert into %s (%s) values (%s)", IncomingTable, fields, named)
-       _, err = svc.db.NamedExecContext(ctx, query, row)
-       return new(stats.PushResponse), err
+       return pushResponseDefault, svc.flusher.Push(row)

Basically, instead of calling the database query directly, we now call svc.flusher.Push(*Incoming), which returns a possible error in case the request couldn’t be queued up.

A small change to make our API a bit more optimal is replacing new(stats.PushResponse) with a pre-allocated variable. Put the following snippet just before the Push function in server_push.go:

// Keep returning a single object to avoid allocations
var pushResponseDefault = new(stats.PushResponse)

When implementing the producer in our Flusher struct, we need some additional fields:

	// queueIndex is a key for []queues
	queueIndex *atomic.Uint32
	// queueMask is a masking value for queueIndex -> key
	queueMask uint32
	// queues hold a set of writable queues
	queues []*Queue

Go provides the sync/atomic package in the standard library, which provides atomic operations over some numeric types and pointers. However, we will use the go.uber.org/atomic package that provides convenience types wrapped around the standard library.

We need to update NewFlusher in order to initialize the new fields:

func NewFlusher(ctx context.Context, db *sqlx.DB) (*Flusher, error) {
	queueSize := 1 << 4
	job := &Flusher{
		db:         db,
		enabled:    atomic.NewBool(true),
		queueIndex: atomic.NewUint32(0),
		queueMask:  uint32(queueSize - 1),
		queues:     NewQueues(queueSize),
	}
...

We set the queueSize to be a power of 2 value - we are shifting the value 1 left by 4 bits, meaning that 1 << 4 is the same as writing 16 (binary 10000). If you wanted a larger queue, you’d shift by a larger number of bits.

The queueMask is the binary masking value - for a queueSize=16, this evaluates to binary 1111. We can use the queueMask over any uint32 value to get only the lower 4 bits of that value.

This is an optimisation for speed. If your queue size would be 100, you’d have to use the modulo (%100) operator to give you the remainder of the division against this number. We could have used %16, but if we use the binary AND & with &15, this operation will be slightly faster.

We can now write the following Push() function to take new *Incoming values:

// Push spreads queue writes evenly across all queues
func (job *Flusher) Push(item *Incoming) error {
	if job.enabled.Load() {
		index := job.queueIndex.Inc() & job.queueMask
		return job.queues[index].Push(item)
	}
	return errFlusherDisabled
}

We are using the atomic queueIndex value to designate into which queue the new item should be written. If the flusher isn’t enabled, we return an expected error - we are disabling Push when we are shutting down the process. The client(s) may retry on this error.

This takes care of writing our producer.

The Consumer

We only need to replace the flush() function in order to implement our queue consumer:

func (job *Flusher) flush() {
	var err error

	fields := strings.Join(IncomingFields, ",")
	named := ":" + strings.Join(IncomingFields, ",:")
	query := fmt.Sprintf("insert into %s (%s) values (%s)", IncomingTable, fields, named)

	var batchInsertSize int
	for k, queue := range job.queues {
		rows := queue.Clear()

		for len(rows) > 0 {
			batchInsertSize = 1000
			if len(rows) < batchInsertSize {
				batchInsertSize = len(rows)
			}
			log.Println("[flush] queue", k, "remaining", len(rows))
			if _, err = job.db.NamedExec(query, rows[:batchInsertSize]); err != nil {
				log.Println("Error when flushing data:", err)
			}
			rows = rows[batchInsertSize:]
		}
	}

	log.Println("[flush] done")
}

As we are using jmoiron/sqlx, we have the capability to issue bulk inserts to the database, just by providing a slice of values for insertion. In order of implementation:

  1. we prepare a query for inserting our data,
  2. we read and clear individual queues of all items,
  3. we flush the read items to the database, 1000 rows at a time

In case you have issues with this, try issuing go get github.com/jmoiron/sqlx@master to pull in the latest version of sqlx into your project. Bulk inserts have been implemented only recently, and you might have to use the master version of sqlx.

Benchmarking

Since we already have the tests set up from the previous implementation, let’s compare how much the service is improved with the queuing:

Running 1m test @ http://172.29.0.2:3000/twirp/stats.StatsService/Push
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    12.54ms   21.19ms 416.79ms   91.30%
    Req/Sec     3.80k     1.63k   11.03k    63.05%
  909154 requests in 1.00m, 94.51MB read
Requests/sec:  15129.35
Transfer/sec:      1.57MB

And the previous version without queuing:

Running 1m test @ http://172.22.0.5:3000/twirp/stats.StatsService/Push
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    47.69ms   11.86ms 143.28ms   73.10%
    Req/Sec   527.04     85.10   800.00     73.25%
  125930 requests in 1.00m, 13.09MB read
Requests/sec:   2097.16
Transfer/sec:    223.23KB

Let’s also verify that our data is written:

mysql> select count(*) from incoming;
+----------+
| count(*) |
+----------+
|   909251 |
+----------+
1 row in set (0.06 sec)

The small discrepancy in requests may be explained by wrk closing the connection(s) before they could recieve the final response, but the request was logged in the database.

So, by queuing our incoming data in memory, we reduced our average latency by about 74%. Our throughput in requests/sec increased from 100% to 721% (7.2x!).

Notes and pitfalls

The flush process takes it’s own time when being slammed with benchmark requests:

[flush] begin
[flush] queue 0 rows 23388
[flush] queue 1 rows 24330
[flush] queue 2 rows 24578
[flush] queue 3 rows 23283
[flush] queue 4 rows 22134
[flush] queue 5 rows 20695
[flush] queue 6 rows 19347
[flush] queue 7 rows 17880
[flush] queue 8 rows 16416
[flush] queue 9 rows 14987
[flush] queue 10 rows 13359
[flush] queue 11 rows 11659
[flush] queue 12 rows 9964
[flush] queue 13 rows 7989
[flush] queue 14 rows 6301
[flush] queue 15 rows 4596
[flush] done

We can see that the data is queueing up nicely. The flush for the data does take a while - longer than the 5 second flush interval which we specified. Lets consider the pitfalls of the current implementation:

  • Memory allocations,
  • Error handling

We don’t limit memory allocations - theoretically, given similar stressful circumstances as our benchmark provides, it is possible for our program to reach resource exhaustion. One way to reach a level of safety is to introduce backpressure when our queues fill up. Backpressure means that new writes to the queue are rejected, until the existing queue is flushed to the database.

This also means that we could pre-allocate the queues, and avoid allocations when adding new data to the queue, by implementing a circular buffer. New items would overwrite older items until the queue fills up and rejects new writes.

Error handling: currently we don’t do any error handling, we just print an error when flushing to the database. A possible reason for this would be an unplanned database outage. We could wait and retry the writes, having some data security. This requires us to implement that circular buffer and backpressure, as resource exhaustion is more likely if we are handing this error scenario.

To sum up: set reasonable queue limits and keep allocations down to make your service more robust in case of unplanned traffic spikes or outages.

While I have you here...

It would be great if you buy one of my books:

I promise you'll learn a lot more if you buy one. Buying a copy supports me writing more about similar topics. Say thank you and buy my books.

Feel free to send me an email if you want to book my time for consultancy/freelance services. I'm great at APIs, Go, Docker, VueJS and scaling services, among many other things.