Optimizing requests with a queue
With our background jobs interface in place, we can create an API that will queue our incoming data and flush it to the database at a defined frequency. Flushing our data in bulk will give us performance advantages.
The Queue
A basic Queue in Go is a slice. To extend the queue, you can call append
,
which allocates a larger slice if required. Since this is not thread safe,
we need to use locking to ensure that the queue is modified safely.
// Queue provides a queuing structure for Incoming{}
type Queue struct {
sync.RWMutex
values []*Incoming
}
// NewQueue creates a new *Queue instance
func NewQueue() *Queue {
return &Queue{
values: make([]*Incoming, 0),
}
}
We also need to hold several Queues to split up incoming write requests. If we only used one queue, we would have high lock contention between all the clients trying to write to the queue, as well as delays because of the flusher which needs to clear out the queue as it’s written to the database.
// NewQueues creates a slice of *Queue instances
func NewQueues(size int) []*Queue {
result := make([]*Queue, size)
for i := 0; i < size; i++ {
result[i] = NewQueue()
}
return result
}
The queue needs a few functions, in order to write and flush data from the queue:
Push(*Incoming) error
- add a new item to the queue, return error if any,Clear() []*Incoming
- return the current items in the queue and clear it,Length() int
- report the current queued item count
The Push function should return an error when the queue is full. It is up to you to limit the queue size, so that your application doesn’t eat up all your memory in case of traffic spikes or a delay in flushing the data to the database.
There is no reasonable limit here, so it’s up to you to implement this.
// Push adds a new item to the queue
func (p *Queue) Push(item *Incoming) error {
p.Lock()
defer p.Unlock()
p.values = append(p.values, item)
return nil
}
Flushing the queue to the database requires us to read the current items of the queue, and clear them for any future calls to Push(). The current implementation keeps memory usage low, but allocates memory when values are pushed to the queue.
// Clear returns current queue items and clears it
func (p *Queue) Clear() (result []*Incoming) {
length := p.Length()
p.Lock()
defer p.Unlock()
result, p.values = p.values[:length], p.values[length:]
return
}
And finally - an utility function to get the size of the queue:
// Length returns the current queue size
func (p *Queue) Length() int {
p.RLock()
defer p.RUnlock()
return len(p.values)
}
In order for a queue to be functional it needs two additional things:
- the producer: writing data to the queue,
- the consumer: reading data from the queue
The Producer
Our producer is easy to implement, we can just modify our service Push() function:
- fields := strings.Join(IncomingFields, ",")
- named := ":" + strings.Join(IncomingFields, ",:")
-
- query := fmt.Sprintf("insert into %s (%s) values (%s)", IncomingTable, fields, named)
- _, err = svc.db.NamedExecContext(ctx, query, row)
- return new(stats.PushResponse), err
+ return pushResponseDefault, svc.flusher.Push(row)
Basically, instead of calling the database query directly, we now call svc.flusher.Push(*Incoming)
,
which returns a possible error in case the request couldn’t be queued up.
A small change to make our API a bit more optimal is replacing new(stats.PushResponse)
with a pre-allocated
variable. Put the following snippet just before the Push function in server_push.go
:
// Keep returning a single object to avoid allocations
var pushResponseDefault = new(stats.PushResponse)
When implementing the producer in our Flusher
struct, we need some additional fields:
// queueIndex is a key for []queues
queueIndex *atomic.Uint32
// queueMask is a masking value for queueIndex -> key
queueMask uint32
// queues hold a set of writable queues
queues []*Queue
Go provides the sync/atomic package in the standard library, which provides atomic operations over some numeric types and pointers. However, we will use the go.uber.org/atomic package that provides convenience types wrapped around the standard library.
We need to update NewFlusher
in order to initialize the new fields:
func NewFlusher(ctx context.Context, db *sqlx.DB) (*Flusher, error) {
queueSize := 1 << 4
job := &Flusher{
db: db,
enabled: atomic.NewBool(true),
queueIndex: atomic.NewUint32(0),
queueMask: uint32(queueSize - 1),
queues: NewQueues(queueSize),
}
...
We set the queueSize
to be a power of 2 value - we are shifting the value 1 left by 4 bits,
meaning that 1 << 4
is the same as writing 16
(binary 10000
). If you wanted a larger
queue, you’d shift by a larger number of bits.
The queueMask
is the binary masking value - for a queueSize=16, this evaluates to binary 1111
.
We can use the queueMask over any uint32 value to get only the lower 4 bits of that value.
This is an optimisation for speed. If your queue size would be 100, you’d have to use the modulo (%100
)
operator to give you the remainder of the division against this number. We could have used %16
, but
if we use the binary AND &
with &15
, this operation will be slightly faster.
We can now write the following Push() function to take new *Incoming
values:
// Push spreads queue writes evenly across all queues
func (job *Flusher) Push(item *Incoming) error {
if job.enabled.Load() {
index := job.queueIndex.Inc() & job.queueMask
return job.queues[index].Push(item)
}
return errFlusherDisabled
}
We are using the atomic queueIndex
value to designate into which queue the new item should
be written. If the flusher isn’t enabled, we return an expected error - we are disabling Push
when we are shutting down the process. The client(s) may retry on this error.
This takes care of writing our producer.
The Consumer
We only need to replace the flush()
function in order to implement our queue consumer:
func (job *Flusher) flush() {
var err error
fields := strings.Join(IncomingFields, ",")
named := ":" + strings.Join(IncomingFields, ",:")
query := fmt.Sprintf("insert into %s (%s) values (%s)", IncomingTable, fields, named)
var batchInsertSize int
for k, queue := range job.queues {
rows := queue.Clear()
for len(rows) > 0 {
batchInsertSize = 1000
if len(rows) < batchInsertSize {
batchInsertSize = len(rows)
}
log.Println("[flush] queue", k, "remaining", len(rows))
if _, err = job.db.NamedExec(query, rows[:batchInsertSize]); err != nil {
log.Println("Error when flushing data:", err)
}
rows = rows[batchInsertSize:]
}
}
log.Println("[flush] done")
}
As we are using jmoiron/sqlx, we have the capability to issue bulk inserts to the database, just by providing a slice of values for insertion. In order of implementation:
- we prepare a query for inserting our data,
- we read and clear individual queues of all items,
- we flush the read items to the database, 1000 rows at a time
In case you have issues with this, try issuing go get github.com/jmoiron/sqlx@master
to pull in the
latest version of sqlx into your project. Bulk inserts have been implemented only recently, and you
might have to use the master
version of sqlx.
Benchmarking
Since we already have the tests set up from the previous implementation, let’s compare how much the service is improved with the queuing:
Running 1m test @ http://172.29.0.2:3000/twirp/stats.StatsService/Push
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 12.54ms 21.19ms 416.79ms 91.30%
Req/Sec 3.80k 1.63k 11.03k 63.05%
909154 requests in 1.00m, 94.51MB read
Requests/sec: 15129.35
Transfer/sec: 1.57MB
And the previous version without queuing:
Running 1m test @ http://172.22.0.5:3000/twirp/stats.StatsService/Push
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 47.69ms 11.86ms 143.28ms 73.10%
Req/Sec 527.04 85.10 800.00 73.25%
125930 requests in 1.00m, 13.09MB read
Requests/sec: 2097.16
Transfer/sec: 223.23KB
Let’s also verify that our data is written:
mysql> select count(*) from incoming;
+----------+
| count(*) |
+----------+
| 909251 |
+----------+
1 row in set (0.06 sec)
The small discrepancy in requests may be explained by wrk closing the connection(s) before they could recieve the final response, but the request was logged in the database.
So, by queuing our incoming data in memory, we reduced our average latency by about 74%. Our throughput in requests/sec increased from 100% to 721% (7.2x!).
Notes and pitfalls
The flush process takes it’s own time when being slammed with benchmark requests:
[flush] begin
[flush] queue 0 rows 23388
[flush] queue 1 rows 24330
[flush] queue 2 rows 24578
[flush] queue 3 rows 23283
[flush] queue 4 rows 22134
[flush] queue 5 rows 20695
[flush] queue 6 rows 19347
[flush] queue 7 rows 17880
[flush] queue 8 rows 16416
[flush] queue 9 rows 14987
[flush] queue 10 rows 13359
[flush] queue 11 rows 11659
[flush] queue 12 rows 9964
[flush] queue 13 rows 7989
[flush] queue 14 rows 6301
[flush] queue 15 rows 4596
[flush] done
We can see that the data is queueing up nicely. The flush for the data does take a while - longer than the 5 second flush interval which we specified. Lets consider the pitfalls of the current implementation:
- Memory allocations,
- Error handling
We don’t limit memory allocations - theoretically, given similar stressful circumstances as our benchmark provides, it is possible for our program to reach resource exhaustion. One way to reach a level of safety is to introduce backpressure when our queues fill up. Backpressure means that new writes to the queue are rejected, until the existing queue is flushed to the database.
This also means that we could pre-allocate the queues, and avoid allocations when adding new data to the queue, by implementing a circular buffer. New items would overwrite older items until the queue fills up and rejects new writes.
Error handling: currently we don’t do any error handling, we just print an error when flushing to the database. A possible reason for this would be an unplanned database outage. We could wait and retry the writes, having some data security. This requires us to implement that circular buffer and backpressure, as resource exhaustion is more likely if we are handing this error scenario.
To sum up: set reasonable queue limits and keep allocations down to make your service more robust in case of unplanned traffic spikes or outages.
While I have you here...
It would be great if you buy one of my books:
- Go with Databases
- Advent of Go Microservices
- API Foundations in Go
- 12 Factor Apps with Docker and Go
Feel free to send me an email if you want to book my time for consultancy/freelance services. I'm great at APIs, Go, Docker, VueJS and scaling services, among many other things.
Want to stay up to date with new posts?
Stay up to date with new posts about Docker, Go, JavaScript and my thoughts on Technology. I post about twice per month, and notify you when I post. You can also follow me on my Twitter if you prefer.