////////////////////////////////////////////////////////////////////////// // Define data structures ////////////////////////////////////////////////////////////////////////// package main ////////////////////////////////////////////////////////////////////////// import ( "context" "encoding/json" log "github.com/sirupsen/logrus" "strings" "sync/atomic" "time" "unsafe" ) ////////////////////////////////////////////////////////////////////////// // data structures for route updates type RouteData struct { paths map[string]uint } type RouteInfo struct { Prefix string `json:"prefix"` Path string `json:"path"` Count uint `json:"count"` } type RouteSnapshot struct { cache []byte `json:"-"` Total uint `json:"total"` Unique uint `json:"unique"` List []*RouteInfo `json:"list"` } ////////////////////////////////////////////////////////////////////////// // data structures for ROA failures type ROAData struct { roas map[string]uint } type ROAInfo struct { Prefix string `json:"prefix"` Origin string `json:"origin"` } type ROASnapshot struct { cache []byte `json:"-"` Count uint `json:"count"` List []*ROAInfo `json:"list"` } ////////////////////////////////////////////////////////////////////////// // summary structures type ActiveData struct { route *RouteData roa *ROAData } type SnapshotData struct { route *RouteSnapshot roa *ROASnapshot } type DataStruct struct { shutdown context.CancelFunc // sets for currently cellecting and previous data active *ActiveData snapshot *SnapshotData } ////////////////////////////////////////////////////////////////////////// // initialise the data, and start snapshoting every minute func StartData() *DataStruct { data := &DataStruct{} ctx, cancelFunc := context.WithCancel(context.Background()) data.shutdown = cancelFunc data.active = data.newActiveData() data.snapshot = data.active.snapshot() log.Info("Starting data updates") go data.swap(ctx) return data } ////////////////////////////////////////////////////////////////////////// func (data *DataStruct) Shutdown() { data.shutdown() } ////////////////////////////////////////////////////////////////////////// // snapshot the live data every 60 seconds func (data *DataStruct) swap(ctx context.Context) { // update data at one minute intervals ticker := time.NewTicker(time.Second * 60) for { select { case <-ctx.Done(): // shutdown log.Info("Stopping data updates") return case <-ticker.C: // tiemr expired log.Debug("Data Update") var upp *unsafe.Pointer var up unsafe.Pointer var oldp unsafe.Pointer // swap in a new empty data set upp = (*unsafe.Pointer)(unsafe.Pointer(&data.active)) up = unsafe.Pointer(data.newActiveData()) oldp = atomic.SwapPointer(upp, up) // generate the reporting snapshot active := (*ActiveData)(oldp) snapshot := active.snapshot() // then swap in the new snapshot upp = (*unsafe.Pointer)(unsafe.Pointer(&data.snapshot)) up = unsafe.Pointer(snapshot) atomic.SwapPointer(upp, up) } } } ////////////////////////////////////////////////////////////////////////// // data initialisation func (data *DataStruct) newActiveData() *ActiveData { return &ActiveData{ route: data.newRouteData(), roa: data.newROAData(), } } func (data *DataStruct) newRouteData() *RouteData { return &RouteData{ paths: make(map[string]uint), } } func (data *DataStruct) newROAData() *ROAData { return &ROAData{ roas: make(map[string]uint), } } ////////////////////////////////////////////////////////////////////////// // for updates func (route *RouteData) add(path string) { route.paths[path] += 1 } func (roa *ROAData) add(path string) { roa.roas[path] += 1 } ////////////////////////////////////////////////////////////////////////// // snapshot the live data // all data func (active *ActiveData) snapshot() *SnapshotData { return &SnapshotData{ route: active.route.snapshot(), roa: active.roa.snapshot(), } } // route data func (route *RouteData) snapshot() *RouteSnapshot { count := uint(len(route.paths)) snapshot := &RouteSnapshot{ Unique: count, List: make([]*RouteInfo, 0, count), } var total uint // populate the ROA list for key, value := range route.paths { space := strings.IndexByte(key, ' ') if space == -1 { log.WithFields(log.Fields{ "data": key, }).Debug("Invalid route data") } else { info := &RouteInfo{ Prefix: key[:space], Path: key[space+1:], Count: value, } total += value snapshot.List = append(snapshot.List, info) } } snapshot.Total = total return snapshot } // roa data func (roa *ROAData) snapshot() *ROASnapshot { count := uint(len(roa.roas)) snapshot := &ROASnapshot{ Count: count, List: make([]*ROAInfo, 0, count), } // populate the ROA list for key, _ := range roa.roas { space := strings.IndexByte(key, ' ') if space == -1 { log.WithFields(log.Fields{ "data": key, }).Debug("Invalid roa data") } else { info := &ROAInfo{ Prefix: key[:space], Origin: key[space+1:], } snapshot.List = append(snapshot.List, info) } } return snapshot } ////////////////////////////////////////////////////////////////////////// // create and cache api responses func (r *RouteSnapshot) ToJSON() []byte { if r.cache == nil { data, err := json.Marshal(r) if err != nil { log.WithFields(log.Fields{ "error": err, }).Error("Failed to marshal route json") } else { r.cache = data } } return r.cache } func (r *ROASnapshot) ToJSON() []byte { if r.cache == nil { data, err := json.Marshal(r) if err != nil { log.WithFields(log.Fields{ "error": err, }).Error("Failed to marshal ROA json") } else { r.cache = data } } return r.cache } ////////////////////////////////////////////////////////////////////////// // end of file