Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup mqtt+config+bootstrap #50

Merged
merged 8 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 75 additions & 60 deletions apihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ func APIbootstrap(conf *Config) func(w http.ResponseWriter, r *http.Request) {
log.Printf("API: received /bootstrap request (cmd: %s) from %s.\n", bp.Command, r.RemoteAddr)

switch bp.Command {
case "greylist-status":
me := conf.TemData.MqttEngine
stats := me.Stats()
resp.MsgCounters = stats.MsgCounters
resp.MsgTimeStamps = stats.MsgTimeStamps
log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName])

case "export-greylist":
td := conf.TemData
td.mu.RLock()
Expand Down Expand Up @@ -501,13 +508,13 @@ func APIdispatcher(conf *Config, done <-chan struct{}) {
walkRoutes(router, viper.GetString("apiserver.address"))
log.Println("")

address := viper.GetString("apiserver.address")
tlsaddress := viper.GetString("apiserver.tlsaddress")
addresses := viper.GetStringSlice("apiserver.addresses")
tlsaddresses := viper.GetStringSlice("apiserver.tlsaddresses")
certfile := viper.GetString("certs.tem.cert")
keyfile := viper.GetString("certs.tem.key")

bootstrapaddress := viper.GetString("bootstrapserver.address")
bootstraptlsaddress := viper.GetString("bootstrapserver.tlsaddress")
bootstrapaddresses := viper.GetStringSlice("bootstrapserver.addresses")
bootstraptlsaddresses := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstraprouter := SetupBootstrapRouter(conf)

tlspossible := true
Expand All @@ -529,79 +536,87 @@ func APIdispatcher(conf *Config, done <-chan struct{}) {
TEMExiter("Error creating API server tls config: %v\n", err)
}

tlsServer := &http.Server{
Addr: tlsaddress,
Handler: router,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
bootstrapTlsServer := &http.Server{
Addr: bootstraptlsaddress,
Handler: bootstraprouter,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

var wg sync.WaitGroup

log.Println("*** API: Starting API dispatcher #1. Listening on", address)

if address != "" {
wg.Add(1)
go func(wg *sync.WaitGroup) {
apiServer := &http.Server{
Addr: address,
Handler: router,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Println("*** API: Starting API dispatcher #1. Listening on", address)
wg.Done()
TEMExiter(apiServer.ListenAndServe())
}(&wg)
}
// log.Println("*** API: Starting API dispatcher #1. Listening on", address)

if tlsaddress != "" {
if tlspossible {
if len(addresses) > 0 {
for idx, address := range addresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
log.Println("*** API: Starting TLS API dispatcher #1. Listening on", tlsaddress)
apiServer := &http.Server{
Addr: address,
Handler: router,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Printf("*** API: Starting API dispatcher #%d. Listening on %s", idx+1, address)
wg.Done()
TEMExiter(tlsServer.ListenAndServeTLS(certfile, keyfile))
TEMExiter(apiServer.ListenAndServe())
}(&wg)
} else {
log.Printf("*** API: APIdispatcher: Error: Cannot provide TLS service without cert and key files.\n")
}
}

if bootstrapaddress != "" {
wg.Add(1)
go func(wg *sync.WaitGroup) {
apiServer := &http.Server{
Addr: bootstrapaddress,
Handler: bootstraprouter,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
if len(tlsaddresses) > 0 {
if tlspossible {
for idx, tlsaddress := range tlsaddresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
tlsServer := &http.Server{
Addr: tlsaddress,
Handler: router,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Printf("*** API: Starting TLS API dispatcher #%d. Listening on %s", idx+1, tlsaddress)
wg.Done()
TEMExiter(tlsServer.ListenAndServeTLS(certfile, keyfile))
}(&wg)
}
log.Println("*** API: Starting Bootstrap API dispatcher #1. Listening on", bootstrapaddress)
wg.Done()
TEMExiter(apiServer.ListenAndServe())
}(&wg)
} else {
log.Println("*** API: No bootstrap address specified")
} else {
log.Printf("*** API: APIdispatcher: Error: Cannot provide TLS service without cert and key files.\n")
}
}

if bootstraptlsaddress != "" {
if tlspossible {
if len(bootstrapaddresses) > 0 {
for idx, address := range bootstrapaddresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
log.Println("*** API: Starting Bootstrap TLS API dispatcher #1. Listening on", bootstraptlsaddress)
apiServer := &http.Server{
Addr: address,
Handler: bootstraprouter,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Printf("*** API: Starting Bootstrap API dispatcher #%d. Listening on %s", idx+1, address)
wg.Done()
TEMExiter(bootstrapTlsServer.ListenAndServeTLS(certfile, keyfile))
TEMExiter(apiServer.ListenAndServe())
}(&wg)
}
} else {
log.Println("*** API: No bootstrap address specified")
}

if len(bootstraptlsaddresses) > 0 {
if tlspossible {
for idx, address := range bootstraptlsaddresses {
wg.Add(1)
go func(wg *sync.WaitGroup) {
bootstrapTlsServer := &http.Server{
Addr: address,
Handler: bootstraprouter,
TLSConfig: tlsConfig,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

log.Printf("*** API: Starting Bootstrap TLS API dispatcher #%d. Listening on %s", idx+1, address)
wg.Done()
TEMExiter(bootstrapTlsServer.ListenAndServeTLS(certfile, keyfile))
}(&wg)
}
} else {
log.Printf("*** API: APIdispatcher: Error: Cannot provide Bootstrap TLS service without cert and key files.\n")
}
Expand Down
51 changes: 47 additions & 4 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,20 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
return nil, fmt.Errorf("error setting up TLS for the API client: %v", err)
}

bootstrapaddrs := viper.GetStringSlice("bootstrapserver.addresses")
tlsbootstrapaddrs := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstrapaddrs = append(bootstrapaddrs, tlsbootstrapaddrs...)

// Iterate over the bootstrap servers
for _, server := range src.Bootstrap {
// Is this myself?
for _, bs := range bootstrapaddrs {
if bs == server {
td.Logger.Printf("MQTT bootstrap server %s is myself, skipping", server)
continue
}
}

api.BaseUrl = fmt.Sprintf(src.BootstrapUrl, server)

// Send an API ping command
Expand All @@ -57,6 +69,37 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)

status, buf, err := api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "greylist-status",
ListName: src.Name,
Encoding: "json", // XXX: This is our default, but we'll test other encodings later
}, true)

if err != nil {
fmt.Printf("Error from RequestNG: %v\n", err)
continue
}

if status != http.StatusOK {
td.Logger.Printf("HTTP Error: %s\n", buf)
continue
}

var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("Error decoding greylist-status response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("Bootstrap server %s responded: %s", server, br.Msg)
}

td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last msg arrived at %s), ", server, uptime, br.MsgCounters["greylist"], src.Name, br.MsgTimeStamps["greylist"].Format(time.RFC3339))

status, buf, err = api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "export-greylist",
ListName: src.Name,
Encoding: "gob", // XXX: This is our default, but we'll test other encodings later
Expand All @@ -67,7 +110,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
}

if status != http.StatusOK {
fmt.Printf("HTTP Error: %s\n", buf)
td.Logger.Printf("HTTP Error: %s\n", buf)
continue
}

Expand All @@ -94,13 +137,13 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
}

if td.Debug {
fmt.Printf("%v\n", greylist)
fmt.Printf("Names present in greylist %s:\n", src.Name)
td.Logger.Printf("%v", greylist)
td.Logger.Printf("Names present in greylist %s:", src.Name)
out := []string{"Name|Time added|TTL|Tags"}
for _, n := range greylist.Names {
out = append(out, fmt.Sprintf("%s|%v|%v|%v", n.Name, n.TimeAdded.Format(tapir.TimeLayout), n.TTL, n.TagMask))
}
fmt.Printf("%s\n", columnize.SimpleFormat(out))
td.Logger.Printf("%s", columnize.SimpleFormat(out))
}

// Successfully received and decoded bootstrap data
Expand Down
82 changes: 53 additions & 29 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
)

type Config struct {
Service ServiceConf
Server ServerConf
Apiserver ApiserverConf
Dnsengine DnsengineConf
Sources map[string]SourceConf
Policy PolicyConf
Log struct {
Services ServicesConf
ApiServer ApiserverConf
DnsEngine DnsengineConf
BootstrapServer BootstrapServerConf
Sources map[string]SourceConf
Policy PolicyConf
Log struct {
File string `validate:"required"`
Verbose *bool `validate:"required"`
Debug *bool `validate:"required"`
Expand All @@ -36,12 +36,40 @@ type Config struct {
BootTime time.Time
}

type ServiceConf struct {
Name string `validate:"required"`
// Filter string `validate:"required"`
Reset_Soa_Serial *bool `validate:"required"`
Debug *bool
Verbose *bool
type ServicesConf struct {
Rpz struct {
ZoneName string `validate:"required"`
Primary string `validate:"required"` // XXX: must be an address that DnsEngine listens to
SerialCache string `validate:"required"`
}

Reaper struct {
Interval int `validate:"required"`
}
}

type ApiserverConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Key string `validate:"required"`
Addresses []string `validate:"required"`
TlsAddresses []string `validate:"required"`
}

type DnsengineConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Addresses []string `validate:"required"`
Logfile string `validate:"required"`
// Logger *log.Logger
}

type BootstrapServerConf struct {
Active *bool `validate:"required"`
Name string `validate:"required"`
Addresses []string `validate:"required"`
TlsAddresses []string `validate:"required"`
Logfile string
}

type ServerConf struct {
Expand Down Expand Up @@ -96,16 +124,6 @@ type GreylistConf struct {
}
}

type ApiserverConf struct {
Address string `validate:"required"`
Key string `validate:"required"`
}
type DnsengineConf struct {
Address string `validate:"required"`
Logfile string `validate:"required"`
// Logger *log.Logger
}

type InternalConf struct {
// RefreshZoneCh chan RpzRefresher
// RpzCmdCh chan RpzCmdData
Expand All @@ -128,9 +146,11 @@ func ValidateConfig(v *viper.Viper, cfgfile string) error {
var configsections = make(map[string]interface{}, 5)

configsections["log"] = config.Log
configsections["service"] = config.Service
configsections["server"] = config.Server
configsections["apiserver"] = config.Apiserver
configsections["services"] = config.Services
// configsections["server"] = config.Server
configsections["apiserver"] = config.ApiServer
configsections["dnsengine"] = config.DnsEngine
configsections["bootstrapserver"] = config.BootstrapServer
configsections["policy"] = config.Policy

// Cannot validate a map[string]foobar, must validate the individual foobars:
Expand All @@ -149,11 +169,15 @@ func ValidateBySection(config *Config, configsections map[string]interface{}, cf
validate := validator.New()

for k, data := range configsections {
log.Printf("%s: Validating config for %s section\n", config.Service.Name, k)
switch data := data.(type) {
case *SourceConf:
log.Printf("%s: Validating config for source %s", data.Name, k)
case *DnsengineConf, *ApiserverConf, *BootstrapServerConf:
// log.Printf("%s: Validating config for service %s", data.Name, k)
}
if err := validate.Struct(data); err != nil {
log.Printf("ValidateBySection: data that caused validation to fail:\n%v\n", data)
TEMExiter("ValidateBySection: Config %s, section %s: missing required attributes:\n%v\n",
cfgfile, k, err)
TEMExiter("ValidateBySection: Config %s, section %s: missing required attributes:\n%v\n", cfgfile, k, err)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func SetupLogging(conf *Config) {
conf.Loggers.Dnsengine = log.Default()
}

logfile = viper.GetString("mqtt.logfile")
logfile = viper.GetString("tapir.mqtt.logfile")
if logfile != "" {
logfile = filepath.Clean(logfile)
f, err := os.OpenFile(logfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) // #nosec G302
Expand Down
Loading