8000 GitHub - lynaghk/zmq-async: Threadsafe Clojure core.async interface to ZeroMQ
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

lynaghk/zmq-async

Repository files navigation

ZeroMQ Async

ZeroMQ is a message-oriented socket system that supports many communication styles (request/reply, publish/subscribe, fan-out, &c.) on top of many transport layers with bindings to many languages. However, ZeroMQ sockets are not thread safe---concurrent usage typically requires explicit locking or dedicated threads and queues. This library handles all of that for you, taking your ZeroMQ sockets and hiding them behind thread safe core.async channels.

Quick start | Caveats | Architecture | Thanks! | Other Clojure ZMQ libs

Quick start

Add to your project.clj:

[com.keminglabs/zmq-async "0.1.0"]

Your system should have ZeroMQ 3.2 installed:

brew install zeromq

or

apt-get install libzmq3

This library provides one function, register-socket!, which associates a ZeroMQ socket with core.async channel(s) in (into which strings or byte arrays are written) and/or out (whence byte arrays). Writing a Clojure collection of strings and/or byte arrays sends a multipart message; received multipart messages are put on core.async channels as a vector of byte arrays.

The easiest way to get started is to have zmq-async create sockets and the backing message pumps automagically for you:

(require '[com.keminglabs.zmq-async.core :refer [register-socket!]]
         '[clojure.core.async :refer [>! <! go chan sliding-buffer close!]])

(let [n 3, addr "inproc://ping-pong"
      [s-in s-out c-in c-out] (repeatedly 4 #(chan (sliding-buffer 64)))]

  (register-socket! {:in s-in :out s-out :socket-type :rep
                     :configurator (fn [socket] (.bind socket addr))})
  (register-socket! {:in c-in :out c-out :socket-type :req
                     :configurator (fn [socket] (.connect socket addr))})

  (go (dotimes [_ n]
        (println (String. (<! s-out)))
        (>! s-in "pong"))
    (close! s-in))

  (go (dotimes [_ n]
        (>! c-in "ping")
        (println (String. (<! c-out))))
    (close! c-in)))

Note that you must provide a :configurator function to setup the newly instantiated socket, including binding/connecting it to addresses. Take a look at the jzmq javadocs for more info on configuring ZeroMQ sockets.

Closing the core.async in channel associated with a socket closes the socket.

If you already have ZeroMQ sockets in hand, you can give them directly to this library:

(import '(org.zeromq ZMQ ZContext))
(let [my-sock (doto (.createSocket (ZContext.) ZMQ/PAIR)
                ;;twiddle ZeroMQ socket options here...
                (.bind addr))]
  
  (register-socket! {:socket my-sock :in in :out out}))

(Of course, after you've created a ZeroMQ socket and handed it off to the library, you shouldn't read/write against it since the sockets aren't thread safe and doing so may crash your JVM.)

The implicit context supporting register-socket! can only handle one incoming/outgoing message at a time. If you need sockets to work in parallel (i.e., you don't want to miss a small control message just because you're slurping in a 10GB message on another socket), then you'll need multiple zmq-async contexts. Contexts accept an optional name to aid in debugging/profiling:

(require '[zmq-async.core :refer [context initialize! register-socket!]])

(def my-context
  (doto (context "My Context")
    (initialize!)))

(register-socket! {:context my-context :in in :out out :socket my-sock})

Each context has an associated shutdown function, which will close all ZeroMQ sockets and core.async channels associated with the context and stop both threads:

((:shutdown my-context))

Caveats