Using Beanstalkd With Go and Docker

Using Beanstalkd With Go and Docker

An introduction to using Beanstalkd with Go

2017-12-04

Why Beanstalkd?

There are a lot of Job queues out there, and choosing which one to use is usually a tough choice. I am guessing many of you have been there, spending hours to figure out which technology to use for your next project or feature. listing the pros and cons of each system, putting the emphasis on the ones you really need.
Beanstalkd offers a lightweight fast solution to the queuing problem - it’s transactional, durable and quite simple. on the other hand it has it’s own protocol (so it’s hard to replace), it is scaled “at the client level” which means no support for redundancy and it has no security at all. Oh and it’s open source - which is always good. Note that you can use something like ghostunnel as a security layer.
So, if simplicity and speed are appealing to you and you are running in a closed network (so you don’t need security) Beanstalkd might just be for you!

Take it for a spin

I am going to use Docker for running the server, so if you don’t have docker you can go here. You can also install Beanstalkd locally with the instructions here.
Run Beanstalkd with docker:

$ docker run -d -p 11300:11300 schickling/beanstalkd

Verify that it’s running:


$ docker ps
CONTAINER ID        IMAGE                   COMMAND                 CREATED             STATUS              PORTS                      NAMES
1b67ae4804ca        schickling/beanstalkd   "beanstalkd -p 11300"   3 seconds ago       Up 1 second         0.0.0.0:11300->11300/tcp   zealous_noether

Great! it’s running.
One of the cooler things about Beanstalkd is that it’s protocol is in clear text. So we can kick the tires with telnet.
If you want, you can read the protocol documentation here.

$ telnet 127.0.0.1 11300
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

The stats command prints out statistical information on the system:

stats
\-\-\-
OK 899
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
cmd-put: 0
cmd-peek: 0
cmd-peek-ready: 0
cmd-peek-delayed: 0
cmd-peek-buried: 0
cmd-reserve: 0
cmd-reserve-with-timeout: 0
cmd-delete: 0
cmd-release: 0
cmd-use: 0
cmd-watch: 0
cmd-ignore: 0
cmd-bury: 0
cmd-kick: 0
cmd-touch: 0
cmd-stats: 1
cmd-stats-job: 0
cmd-stats-tube: 0
cmd-list-tubes: 0
cmd-list-tube-used: 0
cmd-list-tubes-watched: 0
cmd-pause-tube: 0
job-timeouts: 0
total-jobs: 0
max-job-size: 65535
current-tubes: 1
current-connections: 1
current-producers: 0
current-workers: 0
current-waiting: 0
total-connections: 1
pid: 1
version: 1.10
rusage-utime: 0.020000
rusage-stime: 0.003000
uptime: 491
binlog-oldest-index: 0
binlog-current-index: 0
binlog-records-migrated: 0
binlog-records-written: 0
binlog-max-size: 10485760
id: fd7af7e8d3eb1a86
hostname: 1b67ae4804ca

The Queue Flow

There are producers, workers and tubes.
The tubes are named job queues that you can use from both sides. In General - the producers puts jobs in the tubes and the workers process the jobs and mark them as complete. sound like a job queue right?
A jobs has four states:

  1. ready - the job is ready to be consumed by a worker
  2. reserved - one of the workers has reserved the job and it’s in processing
  3. delayed - the job is waiting to be ready
  4. buried - the job is stored in a FIFO list and waiting until one of the workers makes a kick command. more on that below.

The Producer

The producer picks a tube and puts a job in it.
when that happens the producer can decide on a few options:

  • priority - integer, smaller number means more urgent.
  • delay - integer, the number of seconds to wait before the job would be ready to run.
  • time to run - integer, the number of seconds to allow a worker to run the job. if the work is not done in this time period it would be released and another worker could take it.

The Worker

The worker decides which tubes to watch, then issues a reserve command. this will return a fresh reserved job, reserved means that only that particular worker would get the job.
after the job was performed the worker issues a delete command that will remove that job from the queue. if the worker can’t complete the job it can also release it, actually putting the job back on the queue. the worker can also “bury” a job, which puts the job in a “buried” state. the “buried” job would stay buried until one of the workers issues a “kick” command that will make the job “ready” again.

Go Client

Beanstalkd’s protocol is pretty simple, what makes it very tempting to just write a client of your own. that’s not such a bad idea but wait… we shouldn’t reinvent the wheel.. right? well there’s a bunch of client libraries you can use for various programming languages. since I have chosen to use Go, I’ll pick one of the Go client libraries. gobeanstalk looks nice enough, let’s go with that one.

Example App

In your Go application you might use Beanstalkd for sending emails, crunching some data, resizing cat pictures or whatever your application needs. since our purpose is learning we’ll implement an app that saves comments. the producer will push the comments to the queue and the workers will just save the comments to disk.

Note that for the sake of simplicity I’ll put all our files in the main package, you might want to divide your codebase differently. there’s a blog post you can read about how to name and package the elements of your Go program.

Let’s start by defining our data structures and interfaces. the first one will be the comment.

common.go

package main

import "time"

type Comment struct {
	UserName string
	Date time.Time
	Text string
}

We need to be able to pass our comments through Beanstalkd which uses strings for the Job body. we’ll do that with an interface so we can have several implementations (we’ll have only one though) for our protocol:

common.go

type CommentProtocol interface {
	Decode(encodedComment []byte) (*Comment, error)
	Encode(comment *Comment) ([]byte, error)
}

Our implementation of the CommentProtocol interface would use json:
json_protocol.go

package main

import "encoding/json"

type JsonCommentProtocol struct {

}

func (protocol *JsonCommentProtocol) Decode(encodedComment []byte) (*Comment, error) {
	unCodedComment := Comment{}
	err := json.Unmarshal(encodedComment, &unCodedComment)
	if err != nil {
		return nil, err
	}
	return &unCodedComment, nil
}

func (protocol *JsonCommentProtocol) Encode(comment *Comment) ([]byte, error) {
	encodedComment, err := json.Marshal(comment)
	if err != nil {
		return nil, err
	}
	return encodedComment, nil
}

func MakeJsonCommentProtocol() *JsonCommentProtocol {
	return &JsonCommentProtocol{}
}

We also need to define what to do with the data once we get it on the worker side. we’ll use an interface for that:

common.go

type CommentProcessor interface {
	DoProcess(comment *Comment) error
}

And our dummy implementation:

comment_processor.go

package main

import (
	"os"
	"fmt"
	"time"
)

type DiskCommentProcessor struct {
	dir string
}

func (processor *DiskCommentProcessor) DoProcess(comment *Comment) error {
	filePath := fmt.Sprintf("%s/%s_%s.txt", processor.dir, comment.Date.Format(time.RFC3339), comment.UserName)
	commentFile, err := os.Create(filePath)
	if err != nil {
		return err
	}
	defer commentFile.Close()
	commentFile.Write(comment.Text)
	return nil
}

func MakeNewCommentProcessor(dir string) *DiskCommentProcessor {
	return &DiskCommentProcessor{dir: dir}
}

As you can see it doesn’t do much, but it’s enough for us.

Now that we have defined our Comment data structure, how to pass it as text and what to do with it, we can start using Beanstalkd. of course, we need to get the library first.

$ go get github.com/iwanbk/gobeanstalk

Since we want to stay DRY we’ll implement a struct to handle the common functions that both the Producer and Worker will have.
beanstalk_common.go

package main

import (
	"github.com/iwanbk/gobeanstalk"
	"fmt"
	"os"
)

type PapaBeanstalk struct {
	ServerAddress string
	serverConnection *gobeanstalk.Conn
}

func (papa *PapaBeanstalk) Connect() {
	beanstalkConnection, err := gobeanstalk.Dial(papa.ServerAddress)
	if err != nil {
		// do retries or whatever you need
		fmt.Println(err)
		os.Exit(1)
	}
	fmt.Println("connected!")
	papa.serverConnection = beanstalkConnection
}

func (papa *PapaBeanstalk) Close() {
	if papa.serverConnection != nil {
		papa.serverConnection.Quit()
	}
}

Let’s implement the Producer:

producer.go

package main

import (
	"time"
	"fmt"
)

type Producer struct {
	PapaBeanstalk
	protocol CommentProtocol
}


func (producer *Producer) PutComment(comment *Comment) error {
	body, err := producer.protocol.Encode(comment)
	if err != nil {
		return err
	}
	priority := uint32(10)
	delay := 0 * time.Second
	time_to_run := 20 * time.Second
	jobId, err := producer.serverConnection.Put(body, priority, delay, time_to_run)
	if err != nil {
		return err
	}
	fmt.Println("inserted Job id: ", jobId)
	return nil
}

func MakeNewProducer(serverAdress string, protocol CommentProtocol) *Producer {
	producer := Producer{protocol: protocol}
	producer.ServerAddress = serverAdress
	return &producer
}

As you can see we embed PapaBeanstalk into the Producer struct making the Connect() and Close() functions available to it. If you are a newbie to Go and more accustomed to using inheritance, this would seem a bit odd to you. but the fact is that Go just doesn’t have inheritance and this works just as well.
You probably noticed that we are not using any tube! Beanstalkd defaults to using the tube “default”, so we’re good in the mean time.

Our next step will be implementing the worker:
worker.go

package main

import (
	"fmt"
	"time"
	"github.com/iwanbk/gobeanstalk"
)

type CommentWorker struct {
	PapaBeanstalk
	protocol CommentProtocol
	processor CommentProcessor
}


func (worker *CommentWorker) ProcessJob() {
	fmt.Println("reserving job")
	job, err := worker.serverConnection.Reserve()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("got job Id: ", job.ID)
	comment, err := worker.protocol.Decode(job.Body)
	if err != nil {
		worker.handleError(job, err)
		return
	}
	processError := worker.processor.DoProcess(comment)
	if processError != nil {
		worker.handleError(job, err)
		return
	}
	fmt.Println("processed job id: ", job.ID)
	worker.serverConnection.Delete(job.ID)
}

func (worker *CommentWorker) handleError(job *gobeanstalk.Job, err error) {
	fmt.Println(err)
	priority := uint32(5)
	delay := 0 * time.Second
	worker.serverConnection.Release(job.ID ,priority, delay) // hey I can't handle this
}

func MakeNewWorker(serverAddress string, protocol CommentProtocol, processor CommentProcessor) *CommentWorker {
	worker := CommentWorker{protocol: protocol, processor: processor}
	worker.ServerAddress = serverAddress
	return &worker
}

Here we are using the reserve command, it will block until it has a job to return. then after we got a job we try to decode it and process it. in case of error we release the job back to queue, if we suceeded we delete the job.
When sending the release command you can decide if you want to change the job’s priority, or give it a delay.
If you want or need to poll, as opposed to blocking until a job comes, you can use the “reserve-with-timeout” command that returns an error after the timeout was exceeded.

Now that we have both the Worker and the Producer, we only need the main function to make it work.
main.go

package main

import (
	"time"
	"os"
	"fmt"
)

func ProducerMain() {
	var comments = []Comment{
		{UserName: "some_user", Text:"i love your cat", Date: time.Now()},
		{UserName: "some_other_user", Text:"i prefer dogs", Date: time.Now()},
		{UserName: "another_user", Text:"please close this thread", Date: time.Now()},
		{UserName: "admin", Text:"thread closed - not relevant", Date: time.Now()},
	}
	protocol := MakeJsonCommentProtocol()
	producer := MakeNewProducer("localhost:11300", protocol)
	producer.Connect()
	defer producer.Close()
	for _, comment := range comments {
		producer.PutComment(&comment)
	}
}

func WorkerMain() {
	protocol := MakeJsonCommentProtocol()
	commentsDir := "./comments"
	os.Mkdir(commentsDir, 0777)
	processor := MakeNewCommentProcessor(commentsDir)
	worker := MakeNewWorker("localhost:11300", protocol, processor)
	worker.Connect()
	defer worker.Close()
	for {
		worker.ProcessJob()
	}

}

func printUsage() {
	fmt.Println("Usage: example-app worker/producer")
	os.Exit(1)
}

func main() {
	if len(os.Args) < 2 {
		printUsage()
	}
	if os.Args[1] == "worker" {
		WorkerMain()
	} else if os.Args[1] == "producer" {
		ProducerMain()
	} else {
		printUsage()
	}
}

What do we have here?

  • The ProducerMain function with some dummy data
  • The WorkerMain function with an endless loop
  • The main function that enables us to choose between the ProducerMain and the WorkerMain

Let’s build our new amazing app:

$ go build

The command will build an executable with the directory name in my case example-app

We can use the same executable for the Producer and the Worker.
Open a terminal window and run it with the “producer” argument:

$ ./example-app producer

You should see something like this:

connected!
inserted Job id:  1
inserted Job id:  2
inserted Job id:  3
inserted Job id:  4
connection closed

In another terminal window run it with the “worker” argument:

$ ./example-app worker

You should see something like this:

connected!
reserving job
got job Id:  1
processed job id:  1
reserving job
got job Id:  2
processed job id:  2
reserving job
got job Id:  3
processed job id:  3
reserving job
got job Id:  4
processed job id:  4
reserving job

It’s running an endless loop so type ctrl-C to stop it.

Great! we now have a very simple Beanstalkd application.

But wait… what about those tubes? let’s modify our app to use tubes.

We’ll add a tube variable to the worker struct:
worker.go

type CommentWorker struct {
	PapaBeanstalk
	protocol CommentProtocol
	processor CommentProcessor
	tube string
}

And also add it to the function that makes our Worker structs:
worker.go

func MakeNewWorker(serverAddress string, protocol CommentProtocol, processor CommentProcessor,
								tubeToListenOn string) *CommentWorker {
	worker := CommentWorker{protocol: protocol, processor: processor, tube: tubeToListenOn}
	worker.ServerAddress = serverAddress
	return &worker
}

Now we’ll implement the watch command and use it:
worker.go



func (worker *CommentWorker) watch() error {
	watching, err := worker.serverConnection.Watch(worker.tube)
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	fmt.Println("watching ", watching, " tubes")
	return nil
}

func (worker *CommentWorker) Connect() {
	worker.PapaBeanstalk.Connect()
	worker.watch()
}

We are now automatically watching the configured tube whenever the worker connects to the server. the worker is still listening on the default tube, which is fine for us for now. if you want the worker not to listen on the default tube, you need to use the ignore command.
The new updated worker now looks like this:
worker.go

package main

import (
	"fmt"
	"time"
	"github.com/iwanbk/gobeanstalk"
	"os"
)

type CommentWorker struct {
	PapaBeanstalk
	protocol CommentProtocol
	processor CommentProcessor
	tube string
}


func (worker *CommentWorker) ProcessJob() {
	fmt.Println("reserving job")
	job, err := worker.serverConnection.Reserve()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("got job Id: ", job.ID)
	comment, err := worker.protocol.Decode(job.Body)
	if err != nil {
		worker.handleError(job, err)
		return
	}
	processError := worker.processor.DoProcess(comment)
	if processError != nil {
		worker.handleError(job, err)
		return
	}
	fmt.Println("processed job id: ", job.ID)
	worker.serverConnection.Delete(job.ID)
}

func (worker *CommentWorker) watch() error {
	watching, err := worker.serverConnection.Watch(worker.tube)
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	fmt.Println("watching ", watching, " tubes")
	return nil
}

func (worker *CommentWorker) Connect() {
	worker.PapaBeanstalk.Connect()
	worker.watch()
}

func (worker *CommentWorker) handleError(job *gobeanstalk.Job, err error) {
	fmt.Println(err)
	priority := uint32(5)
	delay := 0 * time.Second
	worker.serverConnection.Release(job.ID ,priority, delay) // hey I can't handle this
}

func MakeNewWorker(serverAddress string, protocol CommentProtocol, processor CommentProcessor,
								tubeToListenOn string) *CommentWorker {
	worker := CommentWorker{protocol: protocol, processor: processor, tube: tubeToListenOn}
	worker.ServerAddress = serverAddress
	return &worker
}

We’ll now add the functionality to the Producer:
producer.go

func (producer *Producer) UseTube(tubeName string) error {
	return producer.serverConnection.Use(tubeName)
}

The only thing left is to modify the main functions:
main.go

package main

import (
	"time"
	"os"
	"fmt"
)

func ProducerMain(tubes []string) {
	var comments = []Comment{
		{UserName: "some_user", Text:"i love your cat", Date: time.Now()},
		{UserName: "some_other_user", Text:"i prefer dogs", Date: time.Now()},
		{UserName: "another_user", Text:"please close this thread", Date: time.Now()},
		{UserName: "admin", Text:"thread closed - not relevant", Date: time.Now()},
	}
	protocol := MakeJsonCommentProtocol()
	producer := MakeNewProducer("localhost:11300", protocol)
	producer.Connect()
	defer producer.Close()

	for _, tube := range tubes {
		producer.UseTube(tube)
		for _, comment := range comments {
			producer.PutComment(&comment)
		}
	}

}

func WorkerMain(commentsDir string, tube string) {
	protocol := MakeJsonCommentProtocol()
	os.Mkdir(commentsDir, 0777)
	processor := MakeNewCommentProcessor(commentsDir)
	worker := MakeNewWorker("localhost:11300", protocol, processor, tube)
	worker.Connect()
	defer worker.Close()
	for {
		worker.ProcessJob()
	}

}

func printUsage() {
	fmt.Println("Usage: example-app worker/producer")
	os.Exit(1)
}

func main() {
	if len(os.Args) < 2 {
		printUsage()
	}
	if os.Args[1] == "worker1" {
		WorkerMain("first_comments", "first")
	} else if os.Args[1] == "worker2" {
		WorkerMain("second_comments", "second")
	} else if os.Args[1] == "producer" {
		ProducerMain([]string{"first", "second"})
	} else {
		printUsage()
	}
}

We added a few stuff here.

  • ProducerMain accepts a list of tubes to use
  • ProducerMain iterates over the tubes and puts the dummy jobs in each one
  • WorkerMain accepts directory path and tube name, and makes the worker with them

Rebuild the app

$ go build

Open a terminal window and run it with the “producer” argument:

$ ./example-app producer

You should see 8 jobs inserted.

In another terminal window run it with the “worker1” argument:

$ ./example-app worker1

And in another terminal window run it with the “worker2” argument:

$ ./example-app worker2

You should see that the jobs were processed.
If it worked - the same number of comment files should appear in the “first_comments” and “second_comments” folders.

What this tutorial doesn’t cover

There’s more to Beanstalkd than the basic put-reserve-delete flow.
Here are the commands and subjects you should research for more advanced usage:

  • timeouts - “deadline_soon”, “timed_out”
  • touch command
  • bury command
  • ignore command
  • kick commands
  • list commands
  • stats commands
  • peek commands
  • pause tube

Summary

By now you should have a basic understanding of using Beanstalkd with Go.
The example app is on github so feel free to use it.
If you enjoyed this tutorial please comment, share or just tell your friends.
Thank you for reading.