dn42grcd/ingest.go
Simon Marsh 8cde2ae854
All checks were successful
continuous-integration/drone/push Build is passing
re-work loadsa stuff
2020-10-18 13:55:13 +01:00

166 lines
3.7 KiB
Go

//////////////////////////////////////////////////////////////////////////
// Ingest updates from the bird socket to the internal data structures
//////////////////////////////////////////////////////////////////////////
package main
//////////////////////////////////////////////////////////////////////////
import (
"bufio"
"context"
log "github.com/sirupsen/logrus"
"os"
"strings"
"time"
)
//////////////////////////////////////////////////////////////////////////
type Ingest struct {
shutdown context.CancelFunc
socket *os.File
data *DataStruct
}
//////////////////////////////////////////////////////////////////////////
// initialise and start collecting data
func StartIngesting(sockPath string, data *DataStruct) *Ingest {
ingest := &Ingest{data: data}
// create cancellation context
ctx, cancelFunc := context.WithCancel(context.Background())
ingest.shutdown = cancelFunc
// start reading from the socket
go ingest.tail(ctx, sockPath)
return ingest
}
//////////////////////////////////////////////////////////////////////////
func (ingest *Ingest) Shutdown() {
// signal that the tail function should close
ingest.shutdown()
// and close the file to ensure it gets kicked out the read loop
ingest.socket.Close()
}
//////////////////////////////////////////////////////////////////////////
// continuously read lines from the socket until closed
// the function will re-open the socket on failures
func (ingest *Ingest) tail(ctx context.Context, sockPath string) {
for {
// have I been cancelled ?
select {
case <-ctx.Done():
log.Debug("Ingestion tail cancelled")
break
default:
}
// open the named pipe
sock, err := os.OpenFile(sockPath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
// if an error occured, sleep for a bit and retry
log.WithFields(log.Fields{
"path": sockPath,
}).Fatal("Failed to open fifo, pausing")
time.Sleep(time.Second * 5)
} else {
ingest.socket = sock
log.WithFields(log.Fields{
"path": sockPath,
}).Info("Opened bird socket")
reader := bufio.NewReader(sock)
// read from the socket
for {
line, err := reader.ReadString('\n')
if err != nil {
// error occurred, break out the read loop
log.WithFields(log.Fields{
"error": err,
}).Warn("Error reading from socket")
break
}
// trim the trailing newline
line = strings.TrimSuffix(line, "\n")
ix := strings.Index(line, "<INFO> ")
if (ix != -1) && (len(line) > (ix + 10)) {
line = line[ix+7:]
log.WithFields(log.Fields{
"line": line,
}).Debug("Received line")
// action based on first two characters
action := line[:2]
// data is from char 3 because of the space
data := line[3:]
switch action {
case "!1":
ingest.route(data)
case "!2":
ingest.roaFail(data)
default:
// ignore anything else
}
}
}
// ensure socket is closed
ingest.socket.Close()
}
}
}
//////////////////////////////////////////////////////////////////////////
// ingest a route update
func (ingest *Ingest) route(data string) {
log.WithFields(log.Fields{
"data": data,
}).Debug("route update")
space := strings.IndexByte(data, ' ')
if space != -1 {
prefix := data[:space]
path := data[space+7 : len(data)-1]
ingest.data.active.route.add(prefix + " " + path)
}
}
//////////////////////////////////////////////////////////////////////////
// ingest an ROA Fail
func (ingest *Ingest) roaFail(data string) {
// update the datastructures
log.WithFields(log.Fields{
"data": data,
}).Debug("roa fail")
ingest.data.active.roa.add(data)
}
//////////////////////////////////////////////////////////////////////////
// end of file