dn42grcd/ingest.go
2020-10-10 18:51:45 +01:00

158 lines
3.5 KiB
Go

//////////////////////////////////////////////////////////////////////////
// Ingest updates from the bird socket to the internal data structures
//////////////////////////////////////////////////////////////////////////
package main
//////////////////////////////////////////////////////////////////////////
import (
"bufio"
"context"
log "github.com/sirupsen/logrus"
"os"
"strings"
"time"
)
//////////////////////////////////////////////////////////////////////////
type Ingest struct {
shutdown context.CancelFunc
socket *os.File
data *DataStruct
}
//////////////////////////////////////////////////////////////////////////
// initialise and start collecting data
func StartIngesting(sockPath string, data *DataStruct) *Ingest {
ingest := &Ingest{data: data}
// create cancellation context
ctx, cancelFunc := context.WithCancel(context.Background())
ingest.shutdown = cancelFunc
// start reading from the socket
go ingest.tail(ctx, sockPath)
return ingest
}
//////////////////////////////////////////////////////////////////////////
func (ingest *Ingest) Shutdown() {
// signal that the tail function should close
ingest.shutdown()
// and close the file to ensure it gets kicked out the read loop
ingest.socket.Close()
}
//////////////////////////////////////////////////////////////////////////
// continuously read lines from the socket until closed
// the function will re-open the socket on failures
func (ingest *Ingest) tail(ctx context.Context, sockPath string) {
for {
// have I been cancelled ?
select {
case <-ctx.Done():
log.Debug("Ingestion tail cancelled")
break
default:
}
// open the named pipe
sock, err := os.OpenFile(sockPath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
// if an error occured, sleep for a bit and retry
log.WithFields(log.Fields{
"path": sockPath,
}).Fatal("Failed to open fifo, pausing")
time.Sleep(time.Second * 5)
} else {
ingest.socket = sock
log.WithFields(log.Fields{
"path": sockPath,
}).Info("Opened bird socket")
reader := bufio.NewReader(sock)
// read from the socket
for {
line, err := reader.ReadString('\n')
if err != nil {
// error occurred, break out the read loop
log.WithFields(log.Fields{
"error": err,
}).Warn("Error reading from socket")
break
}
// trim the trailing newline
line = strings.TrimSuffix(line, "\n")
if len(line) > 3 {
log.WithFields(log.Fields{
"line": line,
}).Debug("Received line")
// action based on first two characters
action := line[:2]
// data is from char 3 because of the space
data := line[3:]
switch action {
case "!1":
ingest.route(data)
case "!2":
ingest.roaFail(data)
default:
// ignore anything else
}
}
}
// ensure socket is closed
ingest.socket.Close()
}
}
}
//////////////////////////////////////////////////////////////////////////
// ingest a route update
func (ingest *Ingest) route(data string) {
log.WithFields(log.Fields{
"data": data,
}).Debug("route update")
ingest.data.active.route.add(data)
}
//////////////////////////////////////////////////////////////////////////
// ingest an ROA Fail
func (ingest *Ingest) roaFail(data string) {
// update the datastructures
log.WithFields(log.Fields{
"data": data,
}).Debug("roa fail")
ingest.data.active.roa.add(data)
}
//////////////////////////////////////////////////////////////////////////
// end of file