Microservice background jobs

API services, especially low latency ones, will resort to background jobs to process data faster on ingesting it, and aggregating and flushing it to backend services in an optimal way.

Adding background jobs to our service

In our stats microservice, we issue insert queries for each request directly into the database. The database supports extended inserts, meaning we could flush thousands of rows at the same time, and actually increase throughput, since adding 10K rows to the database with a single query is significantly faster than adding 1 row 10 thousand times over multiple connections.

To take advantage of the bulk insert features of the database, we need to start a goroutine that will periodically aggregate and flush collected to the database. How often this runs is up to you, but if you care about data loss due to service restarts or shutdowns, you’ll need to set a reasonably low interval.

To facilitate creating background jobs (and other possible initialization), we can extend our Server codegen to satisfy the following interface:

Init(context.Context) error
Shutdown()

The Init function is there to take the global cancellation context and start any background jobs that (hopefully), react to this context as well. When the service shuts down, your background jobs can stop accepting new data and flush whatever is collected to the database. The Shutdown function is there so the main program can wait for the background jobs flush to complete.

To sum up: Init() starts the job, context controls job cancellation, and Shutdown is there to wait for the cleanup if needed.

--- a/templates/server_server.go.tpl
+++ b/templates/server_server.go.tpl
@@ -13,4 +13,15 @@ type Server struct {
        db *sqlx.DB
 }
 
+// Init can start background jobs
+func (*Server) Init(_ context.Context) error {
+       // Any background jobs go here,
+       // ctx has global signal cancellation
+       return nil
+}
+
+// Shutdown is a cleanup hook after SIGTERM
+func (*Server) Shutdown() {
+}

Let’s modify the template for main.go to call Init and Shutdown appropriately. We also start the service itself as a goroutine, and wait for context cancellation in main. As soon as the context will be cancelled, srv.Shutdown() will be called.

--- a/templates/cmd_main.go.tpl
+++ b/templates/cmd_main.go.tpl
@@ -43,9 +43,14 @@ func main() {
        if err != nil {
                log.Fatalf("Error in service.New(): %+v", err)
        }
-
+       if err := srv.Init(ctx); err != nil {
+               log.Fatal(err)
+       }
        twirpHandler := ${SERVICE}.New${SERVICE_CAMEL}ServiceServer(srv, internal.NewServerHooks())
 
        log.Println("Starting service on port :3000")
-       http.ListenAndServe(":3000", internal.WrapAll(twirpHandler))
+       go http.ListenAndServe(":3000", internal.WrapAll(twirpHandler))
+       <-ctx.Done()
+
+       srv.Shutdown()
 }

Do we really need Init?

Looking at the code, the Init() function may be redundant. Wire already takes ctx, we just need to see if we can actually start a goroutine from there. Let’s see what wire can do in our case, let’s try to modify wire.go:

func New(ctx context.Context) (*Server, error) {
	s := new(Server)
	wire.Build(
		inject.Inject,
		wire.Struct(new(Server), "*"),
	)
	if err := s.Background(); err != nil {
		return nil, err
	}
	return s, nil
}

Unfortunately this results in the following error message:

wire: a call to wire.Build indicates that this function is an injector, but injectors must consist of only the wire.Build call and an optional return

Another option would be to just create a dependency for the background job. This way you would just add the field to the Server{} struct, and let wire invoke it and provide dependencies to it as well.

Starting up the background job would be taken care of, but we’d still need to issue the Shutdown signal here. The main questions we should answer here:

  • Do we want the background Job(s?) to have their own API?
  • Is the responsibility between Server{} and Background{} shared?

These are tough questions. From the perspective of a microservice, the complete implementation serves a single responsibility. From the perspective of individual components, a background service is distinctly different from the API layer, and the relationship between them may be 1:N.

If we implement multiple background jobs, the microservice Shutdown() function will be just a proxy for individual background jobs shutdowns. If we don’t, it may be just spagetti code, which is already a smell of shared responsibility.

If we implement the background jobs on the same level as the Server{}, we wouldn’t need to re-specify dependencies for each job. That might seem like a good idea until you realize that it again blurs the boundary between what the microservice needs, and what the particular background job needs. If we learned anything, we learned that we need to favour explicit declarations over implicit scope. This way it will be clear what particularly is needed by background jobs.

As we would only be adding a level of indirection with Init, and blurring the various responsibilities and dependencies between the background job and the microservice API, let’s remove Init() and implement our background job with the single responsibility principle in mind.

As we make this decision, we also realize that replacing the implementation of the background job in this case is trivial - we only need to satisfy whatever interface we create for the background job API. If this was merged with Server{}, it would, at best, be problematic to isolate and refactor at some later point.

Remove Init from our generators, and let’s create a background job.

The background job

It’s good to follow some naming conventions here, to ensure that our background job is designed with a single responsibility in mind. With that, I quote Effective Go - Interface names:

By convention, one-method interfaces are named by the method name plus an -er suffix or similar modification to construct an agent noun: Reader, Writer, Formatter, CloseNotifier etc.

We are not creating a generic interface, but as the struct has a specific utility here, we can resort to the same naming convention for our struct. We will create a Flusher.

Under server/stats/ create flusher.go:

package stats

import (
	"context"
	"log"
	"time"

	"github.com/jmoiron/sqlx"
	"go.uber.org/atomic"
)

We’re importing go.uber.org/atomic for their wrappers around stdlib sync/atomic. They provide wrapped data types that make it easier to work with values in a thread-safe way.

// Flusher is a context-driven background data flush job
type Flusher struct {
	context.Context
	finish func()

	enabled *atomic.Bool

	db *sqlx.DB
}

Our Flusher{} struct embeds context.Context so we don’t need to write our own Shutdown function, but can invoke <-flusher.Done() to wait for it to gracefully finish. This is a local cancellation context.

// NewFlusher creates a *Flusher
func NewFlusher(ctx context.Context, db *sqlx.DB) (*Flusher, error) {
	job := &Flusher{
		db:      db,
		enabled: atomic.NewBool(true),
	}
	job.Context, job.finish = context.WithCancel(context.Background())
	go job.run(ctx)
	return job, nil
}

When creating the flusher, we set the dependencies, enable pushing data into the flusher (enabled=true), and set the cancellation context.

func (job *Flusher) run(ctx context.Context) {
	log.Println("Started background job")

	defer job.finish()

	ticker := time.NewTicker(time.Second)

	for {
		select {
		case <-ticker.C:
			job.flush()
			continue
		case <-ctx.Done():
			log.Println("Got cancel")
			job.enabled.Store(false)
			job.flush()
		}
		break
	}

	log.Println("Exiting Run")
}

The background job itself listens to the global context cancellation (ctx.Done), and when received disables pushing new data to the internal queue. We also create a ticker to run our flush job once a second, and use defer to invoke context cancellation as soon as we exit from the function.

func (job *Flusher) flush() {
	log.Println("Background flush")
}

At this point, our flusher is a no-op, and we just print something to the log, to see that everything is working as it should. Let’s adjust server.go to include flusher as a dependency, and create our Shutdown function:

// Server implements stats.StatsService
type Server struct {
	db *sqlx.DB

	sonyflake *sonyflake.Sonyflake
	flusher   *Flusher
}

// Shutdown is a cleanup hook after SIGTERM
func (s *Server) Shutdown() {
	<-s.flusher.Done()
}

All we need to do now is some housekeeping.

Housekeeping

We need to add NewFlusher to wire.go as a wire.Inject parameter. In order to modify wire.go which is auto generated, we need to modify our Makefile and create a conditional generator script (fix under templates.% target):

-       @envsubst < templates/server_wire.go.tpl > server/$(SERVICE)/wire.go
-       @echo "~ server/$(SERVICE)/wire.go"
+       @./templates/server_wire.go.sh

And we create templates/server_wire.go.sh and set it as executable:

#!/bin/bash
cd $(dirname $(dirname $(readlink -f $0)))

if [ -z "${SERVICE}" ]; then
	echo "Usage: SERVICE=[name] MODULE=... $0"
	exit 255
fi

OUTPUT="server/${SERVICE}/wire.go"

if [ ! -f "$OUTPUT" ]; then
	envsubst < templates/server_wire.go.tpl > $OUTPUT
	echo "~ $OUTPUT"
fi

With these changes, wire.go will only be written if it doesn’t exist. This means we can now add NewFlusher to wire.go for our stats service.

Verifying everything works

As you should already be used to by now, run:

  • make,
  • make docker,
  • docker-compose up -d

This will build everything and restart your stats container locally. To inspect if everything works, issue docker-compose stop stats and docker-compose logs stats after some time.

stats_1  | 2020/01/04 23:20:13 Started background job
stats_1  | 2020/01/04 23:20:14 Background flush
stats_1  | 2020/01/04 23:20:15 Background flush
stats_1  | 2020/01/04 23:20:16 Background flush
stats_1  | 2020/01/04 23:20:17 Background flush
stats_1  | 2020/01/04 23:20:18 Got cancel
stats_1  | 2020/01/04 23:20:18 Background flush
stats_1  | 2020/01/04 23:20:18 Exiting Run
stats_1  | 2020/01/04 23:20:18 Done.

We now created a background job runner which we can access from our API implementation. This means that we can queue up data in the RAM and flush it from a background worker, bringing our API edge latency down.

While I have you here...

It would be great if you buy one of my books:

I promise you'll learn a lot more if you buy one. Buying a copy supports me writing more about similar topics. Say thank you and buy my books.

Feel free to send me an email if you want to book my time for consultancy/freelance services. I'm great at APIs, Go, Docker, VueJS and scaling services, among many other things.