Skip to content

Commit

Permalink
Merge pull request #305 from Dletta/master
Browse files Browse the repository at this point in the history
Small improvements for Connection error logic, Capitalization across …
  • Loading branch information
kYroL01 authored Oct 1, 2024
2 parents 9d102bb + cc080a1 commit 7914e85
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
35 changes: 18 additions & 17 deletions publish/hep.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) {
errCnt := 0
for n := range a {
if err := h.ConnectServer(n); err != nil {
logp.Err("%v", err)
logp.Err("Error connecting to HEP server (%s): %v", h.addr[n], err)
errCnt++
} else {
if config.Cfg.HEPBufferEnable {
Expand All @@ -52,7 +52,7 @@ func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) {
}
}
if errCnt == l {
return nil, fmt.Errorf("cannot establish a connection")
return nil, fmt.Errorf("Cannot establish a connection")
}

go h.Start()
Expand All @@ -61,12 +61,13 @@ func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) {

func (h *HEPOutputer) Close(n int) {
if err := h.client[n].conn.Close(); err != nil {
logp.Err("cannnot close connection to %s: %v", h.addr[n], err)
logp.Err("Cannot close connection to %s: %v", h.addr[n], err)
}
}

func (h *HEPOutputer) ReConnect(n int) (err error) {
if err = h.ConnectServer(n); err != nil {
logp.Err("Error reconnecting to HEP server (%s): %v", h.addr[n], err)
return err
}
h.client[n].writer.Reset(h.client[n].conn)
Expand Down Expand Up @@ -154,7 +155,7 @@ func (h *HEPOutputer) Send(msg []byte) {
var err error

if h.client[n].conn == nil || h.client[n].writer == nil {
logp.Debug("connection is not up", fmt.Sprintf("index: %d, Len: %d, once: %v", n, len(h.addr), onceSent))
logp.Debug("Connection is not up", fmt.Sprintf("index: %d, Len: %d, once: %v", n, len(h.addr), onceSent))
err = fmt.Errorf("connection is broken")
} else {
h.client[n].writer.Write(msg)
Expand Down Expand Up @@ -216,13 +217,13 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) {

defer func() {
if r := recover(); r != nil {
logp.Err("copy hep file out panic: %v, %v", r, debug.Stack())
logp.Err("Copy hep file out panic: %v, %v", r, debug.Stack())
return
}
}()

if _, err := os.Stat(config.Cfg.HEPBufferFile); err != nil {
logp.Debug("file doesn't exists: ", config.Cfg.HEPBufferFile)
logp.Debug("File doesn't exists: ", config.Cfg.HEPBufferFile)
return 1, nil
}

Expand All @@ -233,15 +234,15 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) {
}

if h.client[n].conn == nil {
logp.Err("connection is not up....")
return 0, fmt.Errorf("connection is broken")
logp.Err("Connection is not up....")
return 0, fmt.Errorf("Connection is broken")
}

//Send Logged HEP upon reconnect out to backend
hl, err := h.client[n].conn.Write(HEPFileData)
if err != nil {
promstats.HepFileFlushesError.Inc()
return 0, fmt.Errorf("bad write to socket")
return 0, fmt.Errorf("Bad write to socket")
}

err = h.client[n].writer.Flush()
Expand All @@ -268,40 +269,40 @@ func (h *HEPOutputer) copyHEPbufftoFile(inbytes []byte) (int64, error) {

defer func() {
if r := recover(); r != nil {
logp.Err("copy buffer to panic: %v,\n%s", r, debug.Stack())
logp.Err("Copy buffer to panic: %v,\n%s", r, debug.Stack())
return
}
}()

if config.Cfg.HEPBufferDebug {
logp.Err("adding packet to BUFFER: %s\n", string(inbytes))
logp.Err("Adding packet to BUFFER: %s\n", string(inbytes))
}

destination, err := os.OpenFile(config.Cfg.HEPBufferFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
logp.Err("open HEP file error: %v\n", err)
return 0, fmt.Errorf("open HEP file error: %v", err)
logp.Err("Open HEP file error: %v\n", err)
return 0, fmt.Errorf("Open HEP file error: %v", err)
}

defer destination.Close()

if config.Cfg.MaxBufferSizeBytes > 0 {
fi, err := destination.Stat()
if err != nil {
logp.Debug("collector", fmt.Sprintf("couldn't retrive stats from buffer file error: %v", err.Error()))
logp.Debug("collector", fmt.Sprintf("Couldn't retrive stats from buffer file error: %v", err.Error()))
return 0, err
} else {
if fi.Size() >= config.Cfg.MaxBufferSizeBytes {
logp.Debug("collector", fmt.Sprintln("Buffer size has been excited error: Maxsize: ", config.Cfg.MaxBufferSizeBytes, " vs CurrentSize: ", fi.Size()))
return 0, fmt.Errorf("buffer size has been excited: %d", fi.Size())
logp.Debug("collector", fmt.Sprintln("Buffer size has been exceeded error: Maxsize: ", config.Cfg.MaxBufferSizeBytes, " vs CurrentSize: ", fi.Size()))
return 0, fmt.Errorf("Buffer size has been exceeded: %d", fi.Size())
}
}
}

nBytes, err := destination.Write(inbytes)

if err != nil {
logp.Err("file Send HEP from buffer to file error: %v", err.Error())
logp.Err("File send HEP from buffer to file error: %v", err.Error())
return 0, fmt.Errorf("file Send HEP from buffer to file error: %v", err.Error())
} else {
logp.Debug("collector", " File Send HEP from buffer to file OK")
Expand Down
4 changes: 2 additions & 2 deletions sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func (sniffer *SnifferSetup) handleRequestExtended(conn net.Conn) {
// Read the incoming connection into the buffer.
n, err := conn.Read(message)
if err != nil {
logp.Err("closed tcp connection [1]: %s", err.Error())
logp.Err("Incoming tcp connection closed during read with error [1]: %s", err.Error())
break
}

Expand Down Expand Up @@ -776,7 +776,7 @@ func (sniffer *SnifferSetup) handleRequestExtended(conn net.Conn) {
// Read the incoming connection into the buffer.
n, err := conn.Read(message)
if err != nil {
logp.Err("closed tcp connection [2]: %s", err.Error())
logp.Err("Incoming tcp connection closed during direct read from buffer with error [2]: %s", err.Error())
bufferPool.Reset()
break
}
Expand Down

0 comments on commit 7914e85

Please sign in to comment.