Microservice background jobs
API services, especially low latency ones, will resort to background jobs to process data faster on ingesting it, and aggregating and flushing it to backend services in an optimal way.
Adding background jobs to our service
In our stats microservice, we issue insert queries for each request directly into the database. The database supports extended inserts, meaning we could flush thousands of rows at the same time, and actually increase throughput, since adding 10K rows to the database with a single query is significantly faster than adding 1 row 10 thousand times over multiple connections.
To take advantage of the bulk insert features of the database, we need to start a goroutine that will periodically aggregate and flush collected to the database. How often this runs is up to you, but if you care about data loss due to service restarts or shutdowns, you’ll need to set a reasonably low interval.
To facilitate creating background jobs (and other possible initialization), we can extend our Server codegen to satisfy the following interface:
Init(context.Context) error
Shutdown()
The Init
function is there to take the global cancellation context and start
any background jobs that (hopefully), react to this context as well. When the
service shuts down, your background jobs can stop accepting new data and flush
whatever is collected to the database. The Shutdown
function is there so the
main program can wait for the background jobs flush to complete.
To sum up: Init() starts the job, context controls job cancellation, and Shutdown is there to wait for the cleanup if needed.
--- a/templates/server_server.go.tpl
+++ b/templates/server_server.go.tpl
@@ -13,4 +13,15 @@ type Server struct {
db *sqlx.DB
}
+// Init can start background jobs
+func (*Server) Init(_ context.Context) error {
+ // Any background jobs go here,
+ // ctx has global signal cancellation
+ return nil
+}
+
+// Shutdown is a cleanup hook after SIGTERM
+func (*Server) Shutdown() {
+}
Let’s modify the template for main.go to call Init and Shutdown appropriately.
We also start the service itself as a goroutine, and wait for context cancellation
in main. As soon as the context will be cancelled, srv.Shutdown()
will be called.
--- a/templates/cmd_main.go.tpl
+++ b/templates/cmd_main.go.tpl
@@ -43,9 +43,14 @@ func main() {
if err != nil {
log.Fatalf("Error in service.New(): %+v", err)
}
-
+ if err := srv.Init(ctx); err != nil {
+ log.Fatal(err)
+ }
twirpHandler := ${SERVICE}.New${SERVICE_CAMEL}ServiceServer(srv, internal.NewServerHooks())
log.Println("Starting service on port :3000")
- http.ListenAndServe(":3000", internal.WrapAll(twirpHandler))
+ go http.ListenAndServe(":3000", internal.WrapAll(twirpHandler))
+ <-ctx.Done()
+
+ srv.Shutdown()
}
Do we really need Init?
Looking at the code, the Init() function may be redundant. Wire already takes ctx
,
we just need to see if we can actually start a goroutine from there. Let’s see what
wire can do in our case, let’s try to modify wire.go:
func New(ctx context.Context) (*Server, error) {
s := new(Server)
wire.Build(
inject.Inject,
wire.Struct(new(Server), "*"),
)
if err := s.Background(); err != nil {
return nil, err
}
return s, nil
}
Unfortunately this results in the following error message:
wire: a call to wire.Build indicates that this function is an injector, but injectors must consist of only the wire.Build call and an optional return
Another option would be to just create a dependency for the background job. This way you would just add the field to the Server{} struct, and let wire invoke it and provide dependencies to it as well.
Starting up the background job would be taken care of, but we’d still need to issue the Shutdown signal here. The main questions we should answer here:
- Do we want the background Job(s?) to have their own API?
- Is the responsibility between Server{} and Background{} shared?
These are tough questions. From the perspective of a microservice, the complete implementation serves a single responsibility. From the perspective of individual components, a background service is distinctly different from the API layer, and the relationship between them may be 1:N.
If we implement multiple background jobs, the microservice Shutdown() function will be just a proxy for individual background jobs shutdowns. If we don’t, it may be just spagetti code, which is already a smell of shared responsibility.
If we implement the background jobs on the same level as the Server{}, we wouldn’t need to re-specify dependencies for each job. That might seem like a good idea until you realize that it again blurs the boundary between what the microservice needs, and what the particular background job needs. If we learned anything, we learned that we need to favour explicit declarations over implicit scope. This way it will be clear what particularly is needed by background jobs.
As we would only be adding a level of indirection with Init, and blurring the various responsibilities and dependencies between the background job and the microservice API, let’s remove Init() and implement our background job with the single responsibility principle in mind.
As we make this decision, we also realize that replacing the implementation of the background job in this case is trivial - we only need to satisfy whatever interface we create for the background job API. If this was merged with Server{}, it would, at best, be problematic to isolate and refactor at some later point.
Remove Init
from our generators, and let’s create a background job.
The background job
It’s good to follow some naming conventions here, to ensure that our background job is designed with a single responsibility in mind. With that, I quote Effective Go - Interface names:
By convention, one-method interfaces are named by the method name plus an -er suffix or similar modification to construct an agent noun:
Reader
,Writer
,Formatter
,CloseNotifier
etc.
We are not creating a generic interface, but as the struct has a specific utility here,
we can resort to the same naming convention for our struct. We will create a Flusher
.
Under server/stats/
create flusher.go
:
package stats
import (
"context"
"log"
"time"
"github.com/jmoiron/sqlx"
"go.uber.org/atomic"
)
We’re importing go.uber.org/atomic for their
wrappers around stdlib sync/atomic
. They provide wrapped data types that make it
easier to work with values in a thread-safe way.
// Flusher is a context-driven background data flush job
type Flusher struct {
context.Context
finish func()
enabled *atomic.Bool
db *sqlx.DB
}
Our Flusher{} struct embeds context.Context
so we don’t need to write our own
Shutdown function, but can invoke <-flusher.Done()
to wait for it to gracefully
finish. This is a local cancellation context.
// NewFlusher creates a *Flusher
func NewFlusher(ctx context.Context, db *sqlx.DB) (*Flusher, error) {
job := &Flusher{
db: db,
enabled: atomic.NewBool(true),
}
job.Context, job.finish = context.WithCancel(context.Background())
go job.run(ctx)
return job, nil
}
When creating the flusher, we set the dependencies, enable pushing data into the flusher (enabled=true), and set the cancellation context.
func (job *Flusher) run(ctx context.Context) {
log.Println("Started background job")
defer job.finish()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
job.flush()
continue
case <-ctx.Done():
log.Println("Got cancel")
job.enabled.Store(false)
job.flush()
}
break
}
log.Println("Exiting Run")
}
The background job itself listens to the global context cancellation (ctx.Done), and
when received disables pushing new data to the internal queue. We also create a ticker
to run our flush job once a second, and use defer
to invoke context cancellation as
soon as we exit from the function.
func (job *Flusher) flush() {
log.Println("Background flush")
}
At this point, our flusher is a no-op, and we just print something to the log, to see
that everything is working as it should. Let’s adjust server.go
to include flusher
as a dependency, and create our Shutdown function:
// Server implements stats.StatsService
type Server struct {
db *sqlx.DB
sonyflake *sonyflake.Sonyflake
flusher *Flusher
}
// Shutdown is a cleanup hook after SIGTERM
func (s *Server) Shutdown() {
<-s.flusher.Done()
}
All we need to do now is some housekeeping.
Housekeeping
We need to add NewFlusher
to wire.go as a wire.Inject
parameter. In order to
modify wire.go
which is auto generated, we need to modify our Makefile and create
a conditional generator script (fix under templates.%
target):
- @envsubst < templates/server_wire.go.tpl > server/$(SERVICE)/wire.go
- @echo "~ server/$(SERVICE)/wire.go"
+ @./templates/server_wire.go.sh
And we create templates/server_wire.go.sh
and set it as executable:
#!/bin/bash
cd $(dirname $(dirname $(readlink -f $0)))
if [ -z "${SERVICE}" ]; then
echo "Usage: SERVICE=[name] MODULE=... $0"
exit 255
fi
OUTPUT="server/${SERVICE}/wire.go"
if [ ! -f "$OUTPUT" ]; then
envsubst < templates/server_wire.go.tpl > $OUTPUT
echo "~ $OUTPUT"
fi
With these changes, wire.go will only be written if it doesn’t exist. This means
we can now add NewFlusher
to wire.go for our stats service.
Verifying everything works
As you should already be used to by now, run:
make
,make docker
,docker-compose up -d
This will build everything and restart your stats container locally.
To inspect if everything works, issue docker-compose stop stats
and
docker-compose logs stats
after some time.
stats_1 | 2020/01/04 23:20:13 Started background job
stats_1 | 2020/01/04 23:20:14 Background flush
stats_1 | 2020/01/04 23:20:15 Background flush
stats_1 | 2020/01/04 23:20:16 Background flush
stats_1 | 2020/01/04 23:20:17 Background flush
stats_1 | 2020/01/04 23:20:18 Got cancel
stats_1 | 2020/01/04 23:20:18 Background flush
stats_1 | 2020/01/04 23:20:18 Exiting Run
stats_1 | 2020/01/04 23:20:18 Done.
We now created a background job runner which we can access from our API implementation. This means that we can queue up data in the RAM and flush it from a background worker, bringing our API edge latency down.
While I have you here...
It would be great if you buy one of my books:
- Go with Databases
- Advent of Go Microservices
- API Foundations in Go
- 12 Factor Apps with Docker and Go
Feel free to send me an email if you want to book my time for consultancy/freelance services. I'm great at APIs, Go, Docker, VueJS and scaling services, among many other things.
Want to stay up to date with new posts?
Stay up to date with new posts about Docker, Go, JavaScript and my thoughts on Technology. I post about twice per month, and notify you when I post. You can also follow me on my Twitter if you prefer.