How to implement VerneMQ plugins using a TCP sidecar
The VerneMQ Events sidecar plugin provides an easy and flexible way to relay events on VerneMQ using tcp hooks.
The idea of VerneMQ Events sidecar id very simple: you can register an TCP endpoint with a VerneMQ plugin hook and whenever the hook (such as auth_on_register) is called, the VerneMQ Events sidecar dispatches a TCP request to the registered endpoint. Note that unlike the webhook plugin, this is an async plugin which only relays the events and no control flow is done based on responses returned.
Configuring webhooks
To enable webhooks make sure to set:
plugins.vmq_events_sidecar = on
And then each webhook can be configured like this:
We recommend placing the endpoint implementation locally on each VerneMQ node such that each request can go over localhost without being subject to network issues.
Connection pool configuration
The plugin uses by default a connection pool containing maximally 100 connections. This can be changed by setting vmq_events_sidecar.pool_size to a different value.
Below is a very simple example of a tcp sidecar implemented in Go. It receives and logs OnSubscribe events. Follow this guide to generate Go code for on_subscribe event: https://developers.google.com/protocol-buffers/docs/reference/go-generated
Note that this example code uses compiled proto code for on_subscribe event in protos package.
It runs a tcp server at port 8890 (default port for events sidecar plugin) that receives events and writes them on to a log file at /tmp/sidecar.log.
package main
import (
"encoding/binary"
"errors"
"fmt"
"log"
"net"
"os"
"sidecar/protos"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
const LOGFILE = "/tmp/sidecar.log"
func main() {
logFile, err := os.OpenFile(LOGFILE, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
log.Panic(err)
}
defer logFile.Close()
// Set log out put and enjoy :)
log.SetOutput(logFile)
// optional: log date-time, filename, and line number
log.SetFlags(log.Lshortfile | log.LstdFlags)
addr := net.TCPAddr{
Port: 8890,
IP: net.ParseIP("127.0.0.1"),
}
l, err := net.ListenTCP("tcp", &addr)
if err != nil {
log.Fatal(err)
}
d := Decoder{lengthFieldOffset: 4}
for {
conn, err := l.Accept()
if err != nil {
log.Println(err)
}
go func() {
for {
p := make([]byte, 2048)
_, err := conn.Read(p)
m, err := d.Decode(p)
log.Println(m, err)
}
}()
}
}
type Decoder struct {
lengthFieldOffset int
}
func (decoder *Decoder) Decode(b []byte) (proto.Message, error) {
payload := b[decoder.lengthFieldOffset:]
length := binary.BigEndian.Uint32(b[:decoder.lengthFieldOffset])
fmt.Println(length)
if len(payload) != int(length) {
fmt.Println("equals lengths")
return nil, errors.New("length_field_error")
}
eventAny := new(anypb.Any)
if err := proto.Unmarshal(payload, eventAny); err != nil {
return nil, errors.New("unmarshal_error")
}
tm, ok := newProtoFuncMap[eventAny.GetTypeUrl()]
if tm.name != "on_subscribe" || !ok {
return nil, errors.New("type_mapper_not_defined")
}
onSubEvent := tm.new()
if err := proto.Unmarshal(eventAny.GetValue(), onSubEvent); err != nil {
return nil, errors.New("could_not_unmarshal_to_type_mapper")
}
return onSubEvent, nil
}
const typePrefix = "type.googleapis.com/eventssidecar.v1."
type TypeMapper struct {
name string
new func() proto.Message
}
var newProtoFuncMap = map[string]*TypeMapper{
typePrefix + "OnSubscribe": {
name: "on_subscribe",
new: func() proto.Message { return new(protos.OnSubscribe) },
},
}