Skip to content

rafaeljusto/gomaxscale

Repository files navigation

gomaxscale

Go Reference license

Go library that allows consuming from MaxScale CDC listener. Useful for detecting database changes via binlog.

This consumer follows the connection protocol defined by MaxScale 6 here.

Testing with Docker

For this test environment the following file structure was used:

  • 📂 mariadb-config
    • 📄 mariadb.cnf
  • 📂 mariadb-init
    • 📄 00_schema.sql
  • 📂 maxscale-config
    • 📄 maxscale.cnf
  • 📄 docker-compose.yml
  • 📄 consumer.go

mariadb.cnf

We need to enable replication in the MariaDB master database:

[mysqld]
server_id=1
binlog_format=row
binlog_row_image=full
log-bin=/var/log/mysql/mariadb-bin

00_schema.sql

A basic schema adding the MaxScale user, and some testing database to play with:

RESET MASTER;

-- https://mariadb.com/kb/en/mariadb-maxscale-6-setting-up-mariadb-maxscale/#creating-a-user-account-for-maxscale
CREATE USER 'maxuser'@'%' IDENTIFIED BY 'maxpwd';
GRANT REPLICATION SLAVE ON *.* TO 'maxuser'@'%';
GRANT REPLICATION CLIENT ON *.* TO 'maxuser'@'%';
GRANT SELECT ON mysql.user TO 'maxuser'@'%';
GRANT SELECT ON mysql.db TO 'maxuser'@'%';
GRANT SELECT ON mysql.tables_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.columns_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.procs_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.proxies_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.roles_mapping TO 'maxuser'@'%';
GRANT SHOW DATABASES ON *.* TO 'maxuser'@'%';
FLUSH PRIVILEGES;

DROP DATABASE IF EXISTS example;
CREATE DATABASE IF NOT EXISTS example;

-- Allows MaxScale user to run 'SHOW CREATE TABLE' for DDL events
-- https://mariadb.com/kb/en/mariadb-maxscale-6-avrorouter/#avro-schema-generator
GRANT SELECT ON example.* TO 'maxuser'@'%';
FLUSH PRIVILEGES;

USE example;

DROP TABLE IF EXISTS users;
CREATE TABLE users (
  `id` INT NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL,
  `email` VARCHAR(255) NOT NULL,
  PRIMARY KEY (id)
);

INSERT INTO `users` (`name`, `email`) VALUES ('John Doe', '[email protected]');
INSERT INTO `users` (`name`, `email`) VALUES ('Jane Doe', '[email protected]');

maxscale.cnf

MaxScale configuration to configure the Avro router (direct replication mode) and expose a listener so gomaxscale can retrieve the information.

[MaxScale]
threads=1
admin_secure_gui=false
threads=auto
admin_host=0.0.0.0

[server1]
type=server
address=db
port=3306
protocol=MariaDBBackend

[cdc-service]
type=service
router=avrorouter
servers=server1
server_id=1
user=maxuser
password=maxpwd

[cdc-listener]
type=listener
service=cdc-service
protocol=CDC
port=4001

[MariaDB-Monitor]
type=monitor
module=mariadbmon
servers=server1
user=maxuser
password=maxpwd
monitor_interval=5000

docker-compose.yml

To setup a MariaDB database and a MaxScale server we will use docker-compose with the following configuration:

version: '2.4'
services:
  db:
    container_name: "lab-db"
    image: mariadb:10.3.8
    volumes:
      - ./mariadb-config:/etc/mysql/conf.d
      - ./mariadb-init:/docker-entrypoint-initdb.d
    environment:
      MYSQL_ROOT_PASSWORD: abc123
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "--silent"]

  dbproxy:
    container_name: "lab-maxscale"
    image: mariadb/maxscale:6.2
    volumes:
      - ./maxscale-config/maxscale.cnf:/etc/maxscale.cnf
    ports:
      - 4001:4001
    depends_on:
      - db

consumer.go

A local Go file will consume and log the modified items in the database:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"
  "syscall"

  "github.com/rafaeljusto/gomaxscale/v2"
)

func main() {
  consumer := gomaxscale.NewConsumer("127.0.0.1:4001", "example", "users",
    gomaxscale.WithAuth("maxuser", "maxpwd"),
  )
  err := consumer.Start()
  if err != nil {
    log.Fatal(err)
  }
  defer consumer.Close()

  fmt.Println("start consuming events")

  done := make(chan bool)
  go func() {
    consumer.Process(func(event gomaxscale.CDCEvent) {
      switch e := event.(type) {
      case gomaxscale.DDLEvent:
        fmt.Printf("ddl event detected on database '%s' and table '%s'\n",
          e.Database, e.Table)
      case gomaxscale.DMLEvent:
        fmt.Printf("dml '%s' event detected\n", e.Type)
      }
    })
    done <- true
  }()

  signalChanel := make(chan os.Signal, 1)
  signal.Notify(signalChanel, syscall.SIGINT, syscall.SIGTERM)

  select {
  case <-signalChanel:
  case <-done:
  }

  fmt.Println("terminating")
}

Running

First, start all services:

% docker-compose up -d

Then we can start consuming the items:

% go run consumer.go

To see the magic happening you could do some database changes:

% docker-compose exec db mysql -u root -p abc123 -D example \
  -e "INSERT INTO users (name, email) VALUES ('James Doe', '[email protected]')"