////////////////////////////////////////////////////////////////////////// // Ingest updates from the bird socket to the internal data structures ////////////////////////////////////////////////////////////////////////// package main ////////////////////////////////////////////////////////////////////////// import ( "bufio" "context" log "github.com/sirupsen/logrus" "os" "strings" "time" ) ////////////////////////////////////////////////////////////////////////// type Ingest struct { shutdown context.CancelFunc socket *os.File data *DataStruct } ////////////////////////////////////////////////////////////////////////// // initialise and start collecting data func StartIngesting(sockPath string, data *DataStruct) *Ingest { ingest := &Ingest{data: data} // create cancellation context ctx, cancelFunc := context.WithCancel(context.Background()) ingest.shutdown = cancelFunc // start reading from the socket go ingest.tail(ctx, sockPath) return ingest } ////////////////////////////////////////////////////////////////////////// func (ingest *Ingest) Shutdown() { // signal that the tail function should close ingest.shutdown() // and close the file to ensure it gets kicked out the read loop ingest.socket.Close() } ////////////////////////////////////////////////////////////////////////// // continuously read lines from the socket until closed // the function will re-open the socket on failures func (ingest *Ingest) tail(ctx context.Context, sockPath string) { for { // have I been cancelled ? select { case <-ctx.Done(): log.Debug("Ingestion tail cancelled") break default: } // open the named pipe sock, err := os.OpenFile(sockPath, os.O_RDONLY, os.ModeNamedPipe) if err != nil { // if an error occured, sleep for a bit and retry log.WithFields(log.Fields{ "path": sockPath, }).Fatal("Failed to open fifo, pausing") time.Sleep(time.Second * 5) } else { ingest.socket = sock log.WithFields(log.Fields{ "path": sockPath, }).Info("Opened bird socket") reader := bufio.NewReader(sock) // read from the socket for { line, err := reader.ReadString('\n') if err != nil { // error occurred, break out the read loop log.WithFields(log.Fields{ "error": err, }).Warn("Error reading from socket") break } // trim the trailing newline line = strings.TrimSuffix(line, "\n") ix := strings.Index(line, " ") if (ix != -1) && (len(line) > (ix + 10)) { line = line[ix+7:] log.WithFields(log.Fields{ "line": line, }).Debug("Received line") // action based on first two characters action := line[:2] // data is from char 3 because of the space data := line[3:] switch action { case "!1": ingest.route(data) case "!2": ingest.roaFail(data) default: // ignore anything else } } } // ensure socket is closed ingest.socket.Close() } } } ////////////////////////////////////////////////////////////////////////// // ingest a route update func (ingest *Ingest) route(data string) { log.WithFields(log.Fields{ "data": data, }).Debug("route update") space := strings.IndexByte(data, ' ') if space != -1 { prefix := data[:space] path := data[space+7 : len(data)-1] ingest.data.active.route.add(prefix + " " + path) } } ////////////////////////////////////////////////////////////////////////// // ingest an ROA Fail func (ingest *Ingest) roaFail(data string) { // update the datastructures log.WithFields(log.Fields{ "data": data, }).Debug("roa fail") ingest.data.active.roa.add(data) } ////////////////////////////////////////////////////////////////////////// // end of file