Introduction to beanstalk-clj
Getting started
Assume you have installed beanstalkd at the very begining.
To start our trip, simply start it using:
beanstalkd -VV
(Let’s see what it really happens!):
~ % beanstalkd -VV
pid 6034
bind 4 0.0.0.0:11300
To use beanstalk-clj
we have to use the library and set up
a connection to an (already running) beanstalkd server.
(use 'beanstalk-clj)
; Use `#{host}:#{port}` format string to set up client:
(def client (beanstalkd-factory "localhost:11300"))
; Or 2 parameters: host, port
(def client (beanstalkd-factory "localhost" 11300))
; Or default host ("localhost"), default port (11300)
(def client (beanstalkd-factory))
Inspect beanstalkd verbose log:
accept 6
Basic Operation
Now that we have a connection set up, we can enqueue jobs:
=> (put client "message")
Inspect beanstalkd:
<6 command put
<6 job 1
>6 reply INSERTED 1
Or we can request jobs:
=> (let [job (reserve client)]
(.body job))
Inspect beanstalkd:
<6 command reserve
>6 reply RESERVED 1 7
>6 job 1
Once we are done with processing a job, we have to mark it as
done, otherwise jobs are re-queued by beanstalkd after ttr
(time-to-run, 120 seconds default) is surpassed. A job is marked
as done by calling delete
:
=> (delete job)
Inspect beanstalkd:
<6 command delete
>6 reply DELETED
If you use a timeout of 0, reserve
will immediately return either
a job or nil
=> (reserve client :with-timeout 0)
nil
Inspect beanstalkd:
<6 command reserve-with-timeout
>6 reply TIMED_OUT
Note that beanstalk-clj requires a job bodies to be strings, otherwise throwing an exception:
=> (put client 1234)
ExceptionInfo throw+: {:type :type-error, :message "Job body must be a String"} beanstalk-clj.core/put (core.clj:192)
There is no restriction on what characters you can put in a job body, so the can be used to hold artibrary binary data:
- If you want to send images, just
put
the image data as a string; - If you want to send dictionary, just
put
the json or protobuf encoded string.
A more clojure-idiom
interface would look something like following:
=> (with-beanstalkd (beanstalkd-factory)
#_=> (use-tube "my-tube")
#_=> (put "hello" :priority 0 :delay 0 :ttr 120))
nil
=> (with-beanstalkd (beanstalkd-factory)
#_=> (watch-tube "my-tube")
#_=> (let [job (reserve)]
#_=> (println (.body job))
#_=> (delete job)))
hello
nil
Inspect beanstalkd:
<7 command use
>7 reply USING my-tube
<7 command put
<7 job 4
>7 reply INSERTED 4
close 7
accept 7
<7 command watch
>7 reply WATCHING 2
<7 command reserve
>7 reply RESERVED 4 5
>7 job 4
<7 command delete
>7 reply DELETED
close 7
Now for saving some typing with a little macro:
=> (defmacro beanstalk [& body]
#_=> `(with-beanstalkd (beanstalkd-factory)
#_=> ~@body))
Tube Management
A single beanstalkd server can provide many different queues,
called list-tubes
to see all available tubes:
=> (beanstalk (prn (list-tubes)))
("default")
nil
A beanstalkd client choose one tube into which its job are put. This is the tube “used” by the client. To see what tube you are currently using:
=> (beanstalk (prn (list-tube-used)))
"default"
nil
Unless told, otherwise, a client uses the default
tube.
If you want to use a different tube:
=> (beanstalk
#_=> (use-tube "foo")
#_=> (prn (list-tube-used)))
"foo"
nil
If you decide to use a tube, that does not yet exist, the tube is automatically created by beanstalk-clj:
=> (beanstalk
#_=> (use-tube "foo")
#_=> (prn (list-tubes)))
("default" "foo")
nil
Of course, you can always switch back to the default tube. Tubes that don’t have any client using or watching, vanish automatically:
=> (beanstalk (use-tube "foo"))
nil
=> (beanstalk
#_=> (prn (list-tubes))
#_=> (prn (list-tube-used)))
("default")
"default"
nil
Further, a beanstalkd client can choose many tubes to reserve
jobs from. These tubes are watched
by client. To see
what tubes you are currently watching:
=> (beanstalk (prn (watching)))
("default")
nil
To watch an additional tube:
=> (beanstalk
#_=> (watch-tube "bar")
#_=> (prn (watching)))
("default" "bar")
nil
As before, tubes that do not yet exist are created automatically once you start watching them.
=> (beanstalk
#_=> (watch-tube "bar")
#_=> (prn (list-tubes)))
("default" "bar")
nil
To stop watching a tube:
=> (beanstalk
#_=> (watch-tube "bar")
#_=> (ignore "bar")
#_=> (prn (watching)))
("default")
nil
You can’t watch zero tubes. So if you try to ignore the last tube you are watching, this is silently ignored.
To recap: each beanstalkd client manages two separate concerns: which tube newly created jobs are put into, and which tube(s) jobs are reserved from. Accordingly, there are two separate sets of functions for these concerns:
use
andusing
affect where jobs areput
;watch
andwatching
control where jobs arereserve
d from.
Note that these concerns are fully orthogonal: for example, when you use
a
tube, it is not automatically watch
ed. Neither does watch
ing a tube affect
the tube you are using
.
Statistics
Beanstalkd accumulated various statistics at the server/tube/job level. Statistical details for a job can only be retrieved during the job’s lifecycle. If you try to access job stats after the job was delted, you will get a command-failure exception:
=> (beanstalk
#_=> (put "yo")
#_=> (let [job (reserve)]
#_=> (prn "stats: " (stats job))
#_=> (delete job)
#_=> (stats job)))
"stats: " {:ttr 120, :age 6829, :file 0, :kicks 0, :state "reserved", :releases 0, :id 2, :buries 0, :time-left 119, :timeouts 2, :delay 0, :tube "default", :reserves 3, :pri 2147483648}
ExceptionInfo throw+: {:type :command-failure, :status "NOT_FOUND", :results nil} beanstalk-clj.core.Beanstalkd (core.clj:82)
Let’s have a look at some numbers for the default
tube:
=> (beanstalk (prn (stats-tube "default")))
{:current-jobs-ready 2, :current-jobs-buried 0, :current-jobs-urgent 0, :current-jobs-reserved 0, :name "default", :total-jobs 4, :cmd-pause-tube 0, :cmd-delete 2, :pause-time-left 0, :current-watching 3, :current-jobs-delayed 0, :pause 0, :current-waiting 0, :current-using 3}
nil
Finally, let’s go into server-level statistics:
=> (beanstalk (prn (stats-beanstalkd)))
{:binlog-records-written 0, :job-timeouts 0, :cmd-reserve 9, :current-producers 0, :current-jobs-ready 0, :current-jobs-buried 0, :total-connections 35, :cmd-reserve-with-timeout 7, :max-job-size 65535, :cmd-peek-ready 1, :cmd-ignore 2, :current-jobs-urgent 0, :cmd-stats-tube 2, :current-jobs-reserved 0, :binlog-current-index 0, :total-jobs 13, :uptime 122, :cmd-release 2, :rusage-utime 0.002703, :cmd-bury 1, :binlog-records-migrated 0, :hostname "Soasme-Retina-MacBook-Pro.local", :cmd-stats-job 10, :cmd-list-tubes 4, :cmd-pause-tube 0, :current-tubes 1, :cmd-peek-buried 0, :cmd-peek-delayed 0, :cmd-put 13, :cmd-peek 2, :pid 24244, :cmd-list-tube-used 6, :cmd-kick 0, :cmd-list-tubes-watched 3, :cmd-delete 13, :id "6fbc537ca40afe4a", :binlog-oldest-index 0, :cmd-stats 3, :rusage-stime 0.006188, :cmd-use 1, :current-jobs-delayed 0, :binlog-max-size 10485760, :version 1.9, :current-waiting 0, :cmd-touch 0, :cmd-watch 3, :current-workers 0, :current-connections 1}
nil
Advanced Operation
In “Basic Operation” above, we discussed the typical lifecycle of a job:
put reserve delete
-----> [READY] ---------> [RESERVED] --------> *poof*
(This picture was taken from beanstalkd's protocol documentation. It is
originally contained in `protocol.txt`, part of the beanstalkd
distribution.)
But besides ready
and reserved
, a job can also be delayed
or buried
.
Along with those states come a few transitions, so the full picture looks like
the following:
put with delay release with delay
----------------> [DELAYED] <------------.
| |
| (time passes) |
| |
put v reserve | delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
^ ^ | |
| \ release | |
| `-------------' |
| |
| kick |
| |
| bury |
[BURIED] <---------------'
|
| delete
`--------> *poof*
(This picture was taken from beanstalkd’s protocol documentation. It is
originally contained in protocol.txt
, part of the beanstalkd
distribution.)
Now let’s have a practical look at those new possibilities. For a start, we can create a job with a delay. Such a job will only be available for reservation once this delay passes:
=> (beanstalk
#_=> (put "yo" :delay 1)
#_=> (prn (reserve :with-timeout 0))
#_=> (let [job (reserve :with-timeout 1)]
#_=> (prn (.body job))
#_=> (delete job)))
nil
"yo"
nil
To release job will put it back into the tube it came from;
To bury job will put it aside and not available until execute kick
;
To kick with a bound number will send many jobs to be alived again:
=> (beanstalk
#_=> (let [jid (put "yo") job (reserve)]
#_=> (release job)
#_=> (prn (:state (stats-job jid)))
#_=> (reserve)
#_=> (bury job)
#_=> (prn (:state (stats-job jid)))
#_=> (kick 1)
#_=> (prn (:state (stats-job jid)))
#_=> (reserve)
#_=> (delete jid)))
"ready"
"buried"
"ready"
nil
Inspecting jobs
Peek command allow us to inpect jobs without reserving and modifying their states, Note that this peek did not reserve the job:
=> (beanstak
(let [jid (put "yo")]
(prn (peek jid))
(prn (:state (stats-job jid))))))
#<Job beanstalk_clj.core.Job@53273445>
"ready"
If you try to peek at a non-existing job, you’ll get an exception:
=> (peek client 1234)
ExceptionInfo throw+: {:type :command-failure, :status "NOT_FOUND", :results nil} beanstalk-clj.core.Beanstalkd (core.clj:82)
You can also use peek-delayed
and peek-buried
to inspect delayed jobs
and buried jobs.
Job Priorities
If need arises, you can override this behaviour by giving different jobs different priorities. There are three hard facts to know about job priorities:
-
Jobs with lower priority numbers are reserved before jobs with higher priority numbers.
-
beanstalkd priorities are 32-bit unsigned integers (they range from 0 to 2**32 - 1).
-
beanstalkc uses 2**31 as default job priority (
beanstalkc.DEFAULT_PRIORITY
).
=> (beanstalk
#_=> (let [jid42 (put "42" :priority 42)
#_=> jid21 (put "21" :priority 21)
#_=> jid21x2 (put "21x2" :priority 21)]
#_=> (prn (. (reserve) body))
#_=> (delete jid21)
#_=> (prn (. (reserve) body))
#_=> (delete jid21x2)
#_=> (prn (. (reserve) body))
#_=> (delete jid42)))
"21"
"21x2"
"42"
Fin!
=> (.close client)
That’s it, for now. We’ve left a few capabilities untouched (touch and time-to-run). But if you’ve really read through all of the above, send me a message and tell me what you think of it. And then go get yourself a treat. You certainly deserve it.