Why Beanstalkd?
There are a lot of Job queues out there, and choosing which one to use is usually a tough choice. I am guessing many of you have been there, spending hours to figure out which technology to use for your next project or feature. listing the pros and cons of each system, putting the emphasis on the ones you really need.
Beanstalkd offers a lightweight fast solution to the queuing problem - it’s transactional, durable and quite simple. on the other hand it has it’s own protocol (so it’s hard to replace), it is scaled “at the client level” which means no support for redundancy and it has no security at all. Oh and it’s open source - which is always good.
Note that you can use something like ghostunnel as a security layer.
So, if simplicity and speed are appealing to you and you are running in a closed network (so you don’t need security) Beanstalkd might just be for you!
Take it for a spin
I am going to use Docker for running the server, so if you don’t have docker you can go here. You can also install Beanstalkd locally with the instructions here.
Run Beanstalkd with docker:
$ docker run -d -p 11300:11300 schickling/beanstalkd
Verify that it’s running:
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1b67ae4804ca schickling/beanstalkd "beanstalkd -p 11300" 3 seconds ago Up 1 second 0.0.0.0:11300->11300/tcp zealous_noether
Great! it’s running.
One of the cooler things about Beanstalkd is that it’s protocol is in clear text.
So we can kick the tires with telnet.
If you want, you can read the protocol documentation here.
$ telnet 127.0.0.1 11300
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
The stats command prints out statistical information on the system:
stats
\-\-\-
OK 899
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
cmd-put: 0
cmd-peek: 0
cmd-peek-ready: 0
cmd-peek-delayed: 0
cmd-peek-buried: 0
cmd-reserve: 0
cmd-reserve-with-timeout: 0
cmd-delete: 0
cmd-release: 0
cmd-use: 0
cmd-watch: 0
cmd-ignore: 0
cmd-bury: 0
cmd-kick: 0
cmd-touch: 0
cmd-stats: 1
cmd-stats-job: 0
cmd-stats-tube: 0
cmd-list-tubes: 0
cmd-list-tube-used: 0
cmd-list-tubes-watched: 0
cmd-pause-tube: 0
job-timeouts: 0
total-jobs: 0
max-job-size: 65535
current-tubes: 1
current-connections: 1
current-producers: 0
current-workers: 0
current-waiting: 0
total-connections: 1
pid: 1
version: 1.10
rusage-utime: 0.020000
rusage-stime: 0.003000
uptime: 491
binlog-oldest-index: 0
binlog-current-index: 0
binlog-records-migrated: 0
binlog-records-written: 0
binlog-max-size: 10485760
id: fd7af7e8d3eb1a86
hostname: 1b67ae4804ca
The Queue Flow
There are producers, workers and tubes.
The tubes are named job queues that you can use from both sides.
In General - the producers puts jobs in the tubes and the workers process the jobs and mark them as complete. sound like a job queue right?
A jobs has four states:
- ready - the job is ready to be consumed by a worker
- reserved - one of the workers has reserved the job and it’s in processing
- delayed - the job is waiting to be ready
- buried - the job is stored in a FIFO list and waiting until one of the workers makes a kick command. more on that below.
The Producer
The producer picks a tube and puts a job in it.
when that happens the producer can decide on a few options:
- priority - integer, smaller number means more urgent.
- delay - integer, the number of seconds to wait before the job would be ready to run.
- time to run - integer, the number of seconds to allow a worker to run the job. if the work is not done in this time period it would be released and another worker could take it.
The Worker
The worker decides which tubes to watch, then issues a reserve command. this will return a fresh reserved job, reserved means that only that particular worker would get the job.
after the job was performed the worker issues a delete command that will remove that job from the queue. if the worker can’t complete the job it can also release it, actually putting the job back on the queue. the worker can also “bury” a job, which puts the job in a “buried” state. the “buried” job would stay buried until one of the workers issues a “kick” command that will make the job “ready” again.
Go Client
Beanstalkd’s protocol is pretty simple, what makes it very tempting to just write a client of your own. that’s not such a bad idea but wait… we shouldn’t reinvent the wheel.. right? well there’s a bunch of client libraries you can use for various programming languages. since I have chosen to use Go, I’ll pick one of the Go client libraries. gobeanstalk looks nice enough, let’s go with that one.
Example App
In your Go application you might use Beanstalkd for sending emails, crunching some data, resizing cat pictures or whatever your application needs. since our purpose is learning we’ll implement an app that saves comments. the producer will push the comments to the queue and the workers will just save the comments to disk.
Note that for the sake of simplicity I’ll put all our files in the main package, you might want to divide your codebase differently. there’s a blog post you can read about how to name and package the elements of your Go program.
Let’s start by defining our data structures and interfaces. the first one will be the comment.
common.go
package main
import "time"
type Comment struct {
UserName string
Date time.Time
Text string
}
We need to be able to pass our comments through Beanstalkd which uses strings for the Job body. we’ll do that with an interface so we can have several implementations (we’ll have only one though) for our protocol:
common.go
type CommentProtocol interface {
Decode(encodedComment []byte) (*Comment, error)
Encode(comment *Comment) ([]byte, error)
}
Our implementation of the CommentProtocol interface would use json:
json_protocol.go
package main
import "encoding/json"
type JsonCommentProtocol struct {
}
func (protocol *JsonCommentProtocol) Decode(encodedComment []byte) (*Comment, error) {
unCodedComment := Comment{}
err := json.Unmarshal(encodedComment, &unCodedComment)
if err != nil {
return nil, err
}
return &unCodedComment, nil
}
func (protocol *JsonCommentProtocol) Encode(comment *Comment) ([]byte, error) {
encodedComment, err := json.Marshal(comment)
if err != nil {
return nil, err
}
return encodedComment, nil
}
func MakeJsonCommentProtocol() *JsonCommentProtocol {
return &JsonCommentProtocol{}
}
We also need to define what to do with the data once we get it on the worker side. we’ll use an interface for that:
common.go
type CommentProcessor interface {
DoProcess(comment *Comment) error
}
And our dummy implementation:
comment_processor.go
package main
import (
"os"
"fmt"
"time"
)
type DiskCommentProcessor struct {
dir string
}
func (processor *DiskCommentProcessor) DoProcess(comment *Comment) error {
filePath := fmt.Sprintf("%s/%s_%s.txt", processor.dir, comment.Date.Format(time.RFC3339), comment.UserName)
commentFile, err := os.Create(filePath)
if err != nil {
return err
}
defer commentFile.Close()
commentFile.Write(comment.Text)
return nil
}
func MakeNewCommentProcessor(dir string) *DiskCommentProcessor {
return &DiskCommentProcessor{dir: dir}
}
As you can see it doesn’t do much, but it’s enough for us.
Now that we have defined our Comment data structure, how to pass it as text and what to do with it, we can start using Beanstalkd. of course, we need to get the library first.
$ go get github.com/iwanbk/gobeanstalk
Since we want to stay DRY we’ll implement a struct to handle the common functions that both the Producer and Worker will have.
beanstalk_common.go
package main
import (
"github.com/iwanbk/gobeanstalk"
"fmt"
"os"
)
type PapaBeanstalk struct {
ServerAddress string
serverConnection *gobeanstalk.Conn
}
func (papa *PapaBeanstalk) Connect() {
beanstalkConnection, err := gobeanstalk.Dial(papa.ServerAddress)
if err != nil {
// do retries or whatever you need
fmt.Println(err)
os.Exit(1)
}
fmt.Println("connected!")
papa.serverConnection = beanstalkConnection
}
func (papa *PapaBeanstalk) Close() {
if papa.serverConnection != nil {
papa.serverConnection.Quit()
}
}
Let’s implement the Producer:
producer.go
package main
import (
"time"
"fmt"
)
type Producer struct {
PapaBeanstalk
protocol CommentProtocol
}
func (producer *Producer) PutComment(comment *Comment) error {
body, err := producer.protocol.Encode(comment)
if err != nil {
return err
}
priority := uint32(10)
delay := 0 * time.Second
time_to_run := 20 * time.Second
jobId, err := producer.serverConnection.Put(body, priority, delay, time_to_run)
if err != nil {
return err
}
fmt.Println("inserted Job id: ", jobId)
return nil
}
func MakeNewProducer(serverAdress string, protocol CommentProtocol) *Producer {
producer := Producer{protocol: protocol}
producer.ServerAddress = serverAdress
return &producer
}
As you can see we embed PapaBeanstalk into the Producer struct making the Connect() and Close() functions available to it.
If you are a newbie to Go and more accustomed to using inheritance, this would seem a bit odd to you. but the fact is that Go just doesn’t have inheritance and this works just as well.
You probably noticed that we are not using any tube! Beanstalkd defaults to using the tube “default”, so we’re good in the mean time.
Our next step will be implementing the worker:
worker.go
package main
import (
"fmt"
"time"
"github.com/iwanbk/gobeanstalk"
)
type CommentWorker struct {
PapaBeanstalk
protocol CommentProtocol
processor CommentProcessor
}
func (worker *CommentWorker) ProcessJob() {
fmt.Println("reserving job")
job, err := worker.serverConnection.Reserve()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("got job Id: ", job.ID)
comment, err := worker.protocol.Decode(job.Body)
if err != nil {
worker.handleError(job, err)
return
}
processError := worker.processor.DoProcess(comment)
if processError != nil {
worker.handleError(job, err)
return
}
fmt.Println("processed job id: ", job.ID)
worker.serverConnection.Delete(job.ID)
}
func (worker *CommentWorker) handleError(job *gobeanstalk.Job, err error) {
fmt.Println(err)
priority := uint32(5)
delay := 0 * time.Second
worker.serverConnection.Release(job.ID ,priority, delay) // hey I can't handle this
}
func MakeNewWorker(serverAddress string, protocol CommentProtocol, processor CommentProcessor) *CommentWorker {
worker := CommentWorker{protocol: protocol, processor: processor}
worker.ServerAddress = serverAddress
return &worker
}
Here we are using the reserve command, it will block until it has a job to return. then after we got a job we try to decode it and process it. in case of error we release the job back to queue, if we suceeded we delete the job.
When sending the release command you can decide if you want to change the job’s priority, or give it a delay.
If you want or need to poll, as opposed to blocking until a job comes, you can use the “reserve-with-timeout” command that returns an error after the timeout was exceeded.
Now that we have both the Worker and the Producer, we only need the main function to make it work.
main.go
package main
import (
"time"
"os"
"fmt"
)
func ProducerMain() {
var comments = []Comment{
{UserName: "some_user", Text:"i love your cat", Date: time.Now()},
{UserName: "some_other_user", Text:"i prefer dogs", Date: time.Now()},
{UserName: "another_user", Text:"please close this thread", Date: time.Now()},
{UserName: "admin", Text:"thread closed - not relevant", Date: time.Now()},
}
protocol := MakeJsonCommentProtocol()
producer := MakeNewProducer("localhost:11300", protocol)
producer.Connect()
defer producer.Close()
for _, comment := range comments {
producer.PutComment(&comment)
}
}
func WorkerMain() {
protocol := MakeJsonCommentProtocol()
commentsDir := "./comments"
os.Mkdir(commentsDir, 0777)
processor := MakeNewCommentProcessor(commentsDir)
worker := MakeNewWorker("localhost:11300", protocol, processor)
worker.Connect()
defer worker.Close()
for {
worker.ProcessJob()
}
}
func printUsage() {
fmt.Println("Usage: example-app worker/producer")
os.Exit(1)
}
func main() {
if len(os.Args) < 2 {
printUsage()
}
if os.Args[1] == "worker" {
WorkerMain()
} else if os.Args[1] == "producer" {
ProducerMain()
} else {
printUsage()
}
}
What do we have here?
- The ProducerMain function with some dummy data
- The WorkerMain function with an endless loop
- The main function that enables us to choose between the ProducerMain and the WorkerMain
Let’s build our new amazing app:
$ go build
The command will build an executable with the directory name in my case example-app
We can use the same executable for the Producer and the Worker.
Open a terminal window and run it with the “producer” argument:
$ ./example-app producer
You should see something like this:
connected!
inserted Job id: 1
inserted Job id: 2
inserted Job id: 3
inserted Job id: 4
connection closed
In another terminal window run it with the “worker” argument:
$ ./example-app worker
You should see something like this:
connected!
reserving job
got job Id: 1
processed job id: 1
reserving job
got job Id: 2
processed job id: 2
reserving job
got job Id: 3
processed job id: 3
reserving job
got job Id: 4
processed job id: 4
reserving job
It’s running an endless loop so type ctrl-C to stop it.
Great! we now have a very simple Beanstalkd application.
But wait… what about those tubes? let’s modify our app to use tubes.
We’ll add a tube variable to the worker struct:
worker.go
type CommentWorker struct {
PapaBeanstalk
protocol CommentProtocol
processor CommentProcessor
tube string
}
And also add it to the function that makes our Worker structs:
worker.go
func MakeNewWorker(serverAddress string, protocol CommentProtocol, processor CommentProcessor,
tubeToListenOn string) *CommentWorker {
worker := CommentWorker{protocol: protocol, processor: processor, tube: tubeToListenOn}
worker.ServerAddress = serverAddress
return &worker
}
Now we’ll implement the watch command and use it:
worker.go
func (worker *CommentWorker) watch() error {
watching, err := worker.serverConnection.Watch(worker.tube)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("watching ", watching, " tubes")
return nil
}
func (worker *CommentWorker) Connect() {
worker.PapaBeanstalk.Connect()
worker.watch()
}
We are now automatically watching the configured tube whenever the worker connects to the server. the worker is still listening on the default tube, which is fine for us for now. if you want the worker not to listen on the default tube, you need to use the ignore command.
The new updated worker now looks like this:
worker.go
package main
import (
"fmt"
"time"
"github.com/iwanbk/gobeanstalk"
"os"
)
type CommentWorker struct {
PapaBeanstalk
protocol CommentProtocol
processor CommentProcessor
tube string
}
func (worker *CommentWorker) ProcessJob() {
fmt.Println("reserving job")
job, err := worker.serverConnection.Reserve()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("got job Id: ", job.ID)
comment, err := worker.protocol.Decode(job.Body)
if err != nil {
worker.handleError(job, err)
return
}
processError := worker.processor.DoProcess(comment)
if processError != nil {
worker.handleError(job, err)
return
}
fmt.Println("processed job id: ", job.ID)
worker.serverConnection.Delete(job.ID)
}
func (worker *CommentWorker) watch() error {
watching, err := worker.serverConnection.Watch(worker.tube)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("watching ", watching, " tubes")
return nil
}
func (worker *CommentWorker) Connect() {
worker.PapaBeanstalk.Connect()
worker.watch()
}
func (worker *CommentWorker) handleError(job *gobeanstalk.Job, err error) {
fmt.Println(err)
priority := uint32(5)
delay := 0 * time.Second
worker.serverConnection.Release(job.ID ,priority, delay) // hey I can't handle this
}
func MakeNewWorker(serverAddress string, protocol CommentProtocol, processor CommentProcessor,
tubeToListenOn string) *CommentWorker {
worker := CommentWorker{protocol: protocol, processor: processor, tube: tubeToListenOn}
worker.ServerAddress = serverAddress
return &worker
}
We’ll now add the functionality to the Producer:
producer.go
func (producer *Producer) UseTube(tubeName string) error {
return producer.serverConnection.Use(tubeName)
}
The only thing left is to modify the main functions:
main.go
package main
import (
"time"
"os"
"fmt"
)
func ProducerMain(tubes []string) {
var comments = []Comment{
{UserName: "some_user", Text:"i love your cat", Date: time.Now()},
{UserName: "some_other_user", Text:"i prefer dogs", Date: time.Now()},
{UserName: "another_user", Text:"please close this thread", Date: time.Now()},
{UserName: "admin", Text:"thread closed - not relevant", Date: time.Now()},
}
protocol := MakeJsonCommentProtocol()
producer := MakeNewProducer("localhost:11300", protocol)
producer.Connect()
defer producer.Close()
for _, tube := range tubes {
producer.UseTube(tube)
for _, comment := range comments {
producer.PutComment(&comment)
}
}
}
func WorkerMain(commentsDir string, tube string) {
protocol := MakeJsonCommentProtocol()
os.Mkdir(commentsDir, 0777)
processor := MakeNewCommentProcessor(commentsDir)
worker := MakeNewWorker("localhost:11300", protocol, processor, tube)
worker.Connect()
defer worker.Close()
for {
worker.ProcessJob()
}
}
func printUsage() {
fmt.Println("Usage: example-app worker/producer")
os.Exit(1)
}
func main() {
if len(os.Args) < 2 {
printUsage()
}
if os.Args[1] == "worker1" {
WorkerMain("first_comments", "first")
} else if os.Args[1] == "worker2" {
WorkerMain("second_comments", "second")
} else if os.Args[1] == "producer" {
ProducerMain([]string{"first", "second"})
} else {
printUsage()
}
}
We added a few stuff here.
- ProducerMain accepts a list of tubes to use
- ProducerMain iterates over the tubes and puts the dummy jobs in each one
- WorkerMain accepts directory path and tube name, and makes the worker with them
Rebuild the app
$ go build
Open a terminal window and run it with the “producer” argument:
$ ./example-app producer
You should see 8 jobs inserted.
In another terminal window run it with the “worker1” argument:
$ ./example-app worker1
And in another terminal window run it with the “worker2” argument:
$ ./example-app worker2
You should see that the jobs were processed.
If it worked - the same number of comment files should appear in the “first_comments” and “second_comments” folders.
What this tutorial doesn’t cover
There’s more to Beanstalkd than the basic put-reserve-delete flow.
Here are the commands and subjects you should research for more advanced usage:
- timeouts - “deadline_soon”, “timed_out”
- touch command
- bury command
- ignore command
- kick commands
- list commands
- stats commands
- peek commands
- pause tube
Summary
By now you should have a basic understanding of using Beanstalkd with Go.
The example app is on github so feel free to use it.
If you enjoyed this tutorial please comment, share or just tell your friends.
Thank you for reading.