VerneMQ is a young but great MQTT broker. To install it on Pine64 with Ubuntu Xenial, follow the steps below.
Install the following packages
1 |
apt-get install libssl-dev libncurses5-dev openjdk-8-jre-headless openjdk-8-jdk-headless git libsnappy-dev erlang erlang-dev |
Start with installation
1 2 3 |
git clone git://github.com/erlio/vernemq.git cd vernemq make rpi32 |
The first compilation will give some errors, but we need it to download all the dependencies. Now we need to make some changes to the source files to change some details and to disable the snappy library (I cannot compile it with libsnappy enabled)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
vi _build/rpi32/lib/eleveldb/c_src/build_deps.sh comment from line 63 to line 73 # if [ ! -d snappy-$SNAPPY_VSN ]; then # tar -xzf snappy-$SNAPPY_VSN.tar.gz # (cd snappy-$SNAPPY_VSN && ./configure --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) # fi # (cd snappy-$SNAPPY_VSN && $MAKE && $MAKE install) # export CFLAGS="$CFLAGS -I $BASEDIR/system/include" # export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" # export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" # export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH" |
Another compilation that will fail, it’s fine 🙂
1 |
make rpi32 |
Another change to disable libsnappy
1 2 3 4 5 6 7 8 9 |
vi _build/rpi32/lib/eleveldb/c_src/leveldb/build_detect_platform line 175 changed to if [ "$?" = 355 ]; then cd _build/rpi32/lib/eleveldb/c_src/leveldb/ make clean cd - |
Make some other changes
1 2 3 4 5 6 7 8 9 |
vi _build/rpi32/lib/eleveldb/c_src/leveldb/port/atomic_pointer.h lin 96 add inline void MemoryBarrier() { // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. __asm__ __volatile__("" : : : "memory"); } #define LEVELDB_HAVE_MEMORY_BARRIER |
1 2 3 4 5 6 |
vi _build/rpi32/lib/eleveldb/c_src/Makefile line 38 # LDFLAGS += -shared leveldb/libleveldb.a system/lib/libsnappy.a -lstdc++ and add LDFLAGS += -shared leveldb/libleveldb.a -lstdc++ |
1 2 3 4 5 6 |
vi _build/rpi32/lib/mzmetrics/c_src/mzmetrics_nif.c line 67 // unsigned int align_num_resources(unsigned int num) and add static unsigned int align_num_resources(unsigned int num) |
Compile all again
1 |
make rpi32 |
Now it’s time to install it
1 2 |
cd _build/rpi32/rel cp -R vernemq/ /opt/ |
Create /etc/init.d/vernemq file with the following content
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
#! /bin/bash ### BEGIN INIT INFO # Provides: vernemq # Required-Start: $remote_fs $syslog # Required-Stop: $remote_fs $syslog # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: VerneMQ is a MQTT message broker # Description: VerneMQ is a MQTT message broker ### END INIT INFO NAME=vernemq DAEMON=/opt/vernemq/bin/$NAME SCRIPTNAME=/etc/init.d/$NAME export PATH=/opt/vernemq/bin:$PATH # Read configuration variable file if it is present [ -r /etc/default/$NAME ] && . /etc/default/$NAME # Load the VERBOSE setting and other rcS variables . /lib/init/vars.sh . /lib/lsb/init-functions # `service` strips all environmental VARS so # if no HOME was set in /etc/default/$NAME then set one here # to the data directory for erlexec's sake if [ -z "$HOME" ]; then export HOME=/opt/vernemq/data fi # # Function that starts the daemon/service # do_start() { # Return # 0 if daemon has been started # 1 if daemon was already running # 2 if daemon could not be started # Startup with the appropriate user start-stop-daemon --start \ --name vernemq \ --user vernemq \ --exec $DAEMON -- start \ || return 2 } # # Function that stops the daemon/service # do_stop() { # Identify the erts directory ERTS_PATH=`$DAEMON ertspath` # Attempt a clean shutdown. $DAEMON stop # Return # 0 if daemon has been stopped # 1 if daemon was already stopped # 2 if daemon could not be stopped # other if a failure occurred # Make sure it's down by using a more direct approach start-stop-daemon --stop \ --quiet \ --retry=TERM/30/KILL/5 \ --user vernemq \ --exec $ERTS_PATH/run_erl return $? } # # Function that graceful reload the daemon/service # do_reload() { # Restart the VM without exiting the process $DAEMON restart && return $? || return 2 } # Checks the status of a node do_status() { $DAEMON ping && echo $"$NAME is running" && return 0 echo $"$NAME is stopped" && return 2 } case "$1" in start) [ "$VERBOSE" != no ] && log_daemon_msg "Starting $NAME" $DAEMON ping >/dev/null 2>&1 && echo $"$NAME is already running" && exit 0 do_start case "$?" in 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; 2) [ "$VERBOSE" != no ] && log_end_msg 1 exit 1 ;; esac ;; stop) [ "$VERBOSE" != no ] && log_daemon_msg "Stopping $NAME" do_stop case "$?" in 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; 2) [ "$VERBOSE" != no ] && log_end_msg 1 exit 1 ;; esac ;; ping) # See if the VM is alive $DAEMON ping || exit $? ;; reload|force-reload) log_daemon_msg "Reloading $NAME" do_reload ES=$? log_end_msg $ES exit $ES ;; restart) log_daemon_msg "Restarting $NAME" do_stop case "$?" in 0|1) do_start case "$?" in 0) log_end_msg 0 ;; 1) log_end_msg 1 && exit 1 ;; # Old process is still running *) log_end_msg 1 && exit 1 ;; # Failed to start esac ;; *) # Failed to stop log_end_msg 1 && exit 1 ;; esac ;; status) do_status && exit 0 || exit $? ;; *) echo "Usage: $SCRIPTNAME {start|stop|ping|restart|force-reload|status}" >&2 exit 3 ;; esac : |
Setup the environment
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
update-rc.d vernemq defaults addgroup --system vernemq adduser --ingroup vernemq --home /opt/vernemq --disabled-password --system --shell /bin/bash --no-create-home --gecos "VerneMQ user" vernemq chown -R vernemq:vernemq /opt/vernemq/ vi /etc/default/vernemq # add the following line ulimit -n 65536 systemctl daemon-reload vi /etc/profile.d/vernemq.sh # add the following line export PATH=/opt/vernemq/bin:$PATH chmod 755 /etc/profile.d/vernemq.sh . /etc/profile.d/vernemq.sh |
And finally start the daemon
1 |
service vernemq start |
Add a test user to vernemq to run the test scripts
1 2 |
# use "test" as password vmq-passwd -c /opt/vernemq/etc/vmq.passwd test01 |
Create the consumer
1 2 3 |
cd pip install paho-mqtt vi consumer.py |
With the following content
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
import paho.mqtt.client as mqtt # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, rc): print("Connected with result code " + str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("$SYS/#") client.subscribe('test/temperature') # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print(msg.topic + " " + str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.username_pw_set('test01', password='test') client.connect("192.168.1.150", 1883, 60) # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. client.loop_forever() |
Create the publisher
1 |
vi publisher.py |
With the following content
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import paho.mqtt.client as paho import time def on_publish(client, userdata, mid): print("mid: " + str(mid)) client = paho.Client() client.on_publish = on_publish client.username_pw_set('test01', password='test') client.connect('192.168.1.150', 1883) client.loop_start() while True: temperature = 24 (rc, mid) = client.publish('test/temperature', str(temperature), qos=1) time.sleep(5) |
Run them in two different console
1 2 |
python consumer.py python publisher.py |
If all is fine, this will be a sample output. The first lines are the system queue. At the end you can see our messages published
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
$SYS/VerneMQ@127.0.0.1/cluster/bytes/dropped b'0' $SYS/VerneMQ@127.0.0.1/cluster/bytes/sent b'0' $SYS/VerneMQ@127.0.0.1/cluster/bytes/received b'0' $SYS/VerneMQ@127.0.0.1/client/expired b'0' $SYS/VerneMQ@127.0.0.1/queue/message/out b'1302' $SYS/VerneMQ@127.0.0.1/queue/message/in b'1302' $SYS/VerneMQ@127.0.0.1/queue/message/unhandled b'0' $SYS/VerneMQ@127.0.0.1/queue/message/drop b'0' $SYS/VerneMQ@127.0.0.1/queue/teardown b'2' $SYS/VerneMQ@127.0.0.1/queue/setup b'4' $SYS/VerneMQ@127.0.0.1/mqtt/unsubscribe/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/subscribe/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/publish/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/connect/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pubcomp/invalid/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pubrec/invalid/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/puback/invalid/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/publish/invalid/msg/size/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/subscribe/auth/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/publish/auth/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/connect/auth/error b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pingresp/sent b'5' $SYS/VerneMQ@127.0.0.1/mqtt/unsuback/sent b'0' $SYS/VerneMQ@127.0.0.1/mqtt/suback/sent b'3' $SYS/VerneMQ@127.0.0.1/mqtt/pubcomp/sent b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pubrel/sent b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pubrec/sent b'0' $SYS/VerneMQ@127.0.0.1/mqtt/puback/sent b'6' $SYS/VerneMQ@127.0.0.1/mqtt/publish/sent b'1302' $SYS/VerneMQ@127.0.0.1/mqtt/disconnect/received b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pingreq/received b'5' $SYS/VerneMQ@127.0.0.1/mqtt/unsubscribe/received b'0' $SYS/VerneMQ@127.0.0.1/mqtt/subscribe/received b'3' $SYS/VerneMQ@127.0.0.1/mqtt/pubcomp/received b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pubrel/received b'0' $SYS/VerneMQ@127.0.0.1/mqtt/pubrec/received b'0' $SYS/VerneMQ@127.0.0.1/mqtt/puback/received b'0' $SYS/VerneMQ@127.0.0.1/mqtt/publish/received b'6' $SYS/VerneMQ@127.0.0.1/mqtt/connect/received b'4' $SYS/VerneMQ@127.0.0.1/bytes/sent b'62792' $SYS/VerneMQ@127.0.0.1/bytes/received b'407' $SYS/VerneMQ@127.0.0.1/socket/error b'0' $SYS/VerneMQ@127.0.0.1/socket/close b'2' $SYS/VerneMQ@127.0.0.1/socket/open b'4' $SYS/VerneMQ@127.0.0.1/system/context/switches b'79917' $SYS/VerneMQ@127.0.0.1/system/exact/reductions b'14965199' $SYS/VerneMQ@127.0.0.1/system/gc/count b'25673' $SYS/VerneMQ@127.0.0.1/system/words/reclaimed/by/gc b'26453074' $SYS/VerneMQ@127.0.0.1/system/io/in b'7010841' $SYS/VerneMQ@127.0.0.1/system/io/out b'409965' $SYS/VerneMQ@127.0.0.1/system/reductions b'14965294' $SYS/VerneMQ@127.0.0.1/system/run/queue b'0' $SYS/VerneMQ@127.0.0.1/system/runtime b'24220' $SYS/VerneMQ@127.0.0.1/system/wallclock b'3311242' $SYS/VerneMQ@127.0.0.1/system/utilization b'0' $SYS/VerneMQ@127.0.0.1/system/utilization/scheduler/1 b'0' $SYS/VerneMQ@127.0.0.1/system/utilization/scheduler/2 b'0' $SYS/VerneMQ@127.0.0.1/system/utilization/scheduler/3 b'0' $SYS/VerneMQ@127.0.0.1/system/utilization/scheduler/4 b'0' $SYS/VerneMQ@127.0.0.1/router/subscriptions b'2' $SYS/VerneMQ@127.0.0.1/router/topics b'2' $SYS/VerneMQ@127.0.0.1/router/memory b'1618' $SYS/VerneMQ@127.0.0.1/retain/messages b'0' $SYS/VerneMQ@127.0.0.1/retain/memory b'1324' $SYS/VerneMQ@127.0.0.1/queue/processes b'2' test/temperature b'24' test/temperature b'24' test/temperature b'24' |