dn42grcd/data.go
Simon Marsh 8cde2ae854
All checks were successful
continuous-integration/drone/push Build is passing
re-work loadsa stuff
2020-10-18 13:55:13 +01:00

310 lines
6.4 KiB
Go

//////////////////////////////////////////////////////////////////////////
// Define data structures
//////////////////////////////////////////////////////////////////////////
package main
//////////////////////////////////////////////////////////////////////////
import (
"context"
"encoding/json"
log "github.com/sirupsen/logrus"
"strings"
"sync/atomic"
"time"
"unsafe"
)
//////////////////////////////////////////////////////////////////////////
// data structures for route updates
type RouteData struct {
paths map[string]uint
}
type RouteInfo struct {
Prefix string `json:"prefix"`
Path string `json:"path"`
Count uint `json:"count"`
}
type RouteSnapshot struct {
cache []byte `json:"-"`
Total uint `json:"total"`
Unique uint `json:"unique"`
List []*RouteInfo `json:"list"`
}
//////////////////////////////////////////////////////////////////////////
// data structures for ROA failures
type ROAData struct {
roas map[string]uint
}
type ROAInfo struct {
Prefix string `json:"prefix"`
Origin string `json:"origin"`
}
type ROASnapshot struct {
cache []byte `json:"-"`
Count uint `json:"count"`
List []*ROAInfo `json:"list"`
}
//////////////////////////////////////////////////////////////////////////
// summary structures
type ActiveData struct {
route *RouteData
roa *ROAData
}
type SnapshotData struct {
route *RouteSnapshot
roa *ROASnapshot
}
type DataStruct struct {
shutdown context.CancelFunc
roaCounter uint
// sets for currently cellecting and previous data
active *ActiveData
snapshot *SnapshotData
}
//////////////////////////////////////////////////////////////////////////
// initialise the data, and start snapshoting every minute
func StartData() *DataStruct {
data := &DataStruct{}
ctx, cancelFunc := context.WithCancel(context.Background())
data.shutdown = cancelFunc
data.active = data.newActiveData()
data.snapshot = data.active.snapshot()
log.Info("Starting data updates")
go data.swap(ctx)
return data
}
//////////////////////////////////////////////////////////////////////////
func (data *DataStruct) Shutdown() {
data.shutdown()
}
//////////////////////////////////////////////////////////////////////////
// snapshot the live data every 60 seconds
func (data *DataStruct) swap(ctx context.Context) {
// update data at one minute intervals
ticker := time.NewTicker(time.Second * 60)
for {
select {
case <-ctx.Done():
// shutdown
log.Info("Stopping data updates")
return
case <-ticker.C:
// tiemr expired
log.Debug("Route Data Update")
var upp *unsafe.Pointer
var up unsafe.Pointer
var oldp unsafe.Pointer
// swap in new route data
upp = (*unsafe.Pointer)(unsafe.Pointer(&data.active.route))
up = unsafe.Pointer(data.newRouteData())
oldp = atomic.SwapPointer(upp, up)
// create a snapshot of the old data
routed := (*RouteData)(oldp)
snapshot := routed.snapshot()
// then swap in the new snapshot
upp = (*unsafe.Pointer)(unsafe.Pointer(&data.snapshot.route))
up = unsafe.Pointer(snapshot)
atomic.SwapPointer(upp, up)
if data.roaCounter == 0 {
data.roaCounter = 59
log.Debug("ROA Data Update")
// swap in new roa data
upp = (*unsafe.Pointer)(unsafe.Pointer(&data.active.roa))
up = unsafe.Pointer(data.newROAData())
oldp = atomic.SwapPointer(upp, up)
// create a snapshot of the old data
road := (*ROAData)(oldp)
snapshot := road.snapshot()
// then swap in the new snapshot
upp = (*unsafe.Pointer)(unsafe.Pointer(&data.snapshot.roa))
up = unsafe.Pointer(snapshot)
atomic.SwapPointer(upp, up)
} else {
data.roaCounter -= 1
}
}
}
}
//////////////////////////////////////////////////////////////////////////
// data initialisation
func (data *DataStruct) newActiveData() *ActiveData {
return &ActiveData{
route: data.newRouteData(),
roa: data.newROAData(),
}
}
func (data *DataStruct) newRouteData() *RouteData {
return &RouteData{
paths: make(map[string]uint),
}
}
func (data *DataStruct) newROAData() *ROAData {
return &ROAData{
roas: make(map[string]uint),
}
}
//////////////////////////////////////////////////////////////////////////
// for updates
func (route *RouteData) add(path string) {
route.paths[path] += 1
}
func (roa *ROAData) add(prefix string) {
roa.roas[prefix] += 1
}
//////////////////////////////////////////////////////////////////////////
// snapshot the live data
// all data
func (active *ActiveData) snapshot() *SnapshotData {
return &SnapshotData{
route: active.route.snapshot(),
roa: active.roa.snapshot(),
}
}
// route data
func (route *RouteData) snapshot() *RouteSnapshot {
count := uint(len(route.paths))
snapshot := &RouteSnapshot{
Unique: count,
List: make([]*RouteInfo, 0, count),
}
var total uint
// populate the ROA list
for key, value := range route.paths {
space := strings.IndexByte(key, ' ')
if space == -1 {
log.WithFields(log.Fields{
"data": key,
}).Debug("Invalid route data")
} else {
info := &RouteInfo{
Prefix: key[:space],
Path: key[space+1:],
Count: value,
}
total += value
snapshot.List = append(snapshot.List, info)
}
}
snapshot.Total = total
return snapshot
}
// roa data
func (roa *ROAData) snapshot() *ROASnapshot {
count := uint(len(roa.roas))
snapshot := &ROASnapshot{
Count: count,
List: make([]*ROAInfo, 0, count),
}
// populate the ROA list
for key, _ := range roa.roas {
space := strings.IndexByte(key, ' ')
if space == -1 {
log.WithFields(log.Fields{
"data": key,
}).Debug("Invalid roa data")
} else {
info := &ROAInfo{
Prefix: key[:space],
Origin: key[space+1:],
}
snapshot.List = append(snapshot.List, info)
}
}
return snapshot
}
//////////////////////////////////////////////////////////////////////////
// create and cache api responses
func (r *RouteSnapshot) ToJSON() []byte {
if r.cache == nil {
data, err := json.Marshal(r)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("Failed to marshal route json")
} else {
r.cache = data
}
}
return r.cache
}
func (r *ROASnapshot) ToJSON() []byte {
if r.cache == nil {
data, err := json.Marshal(r)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("Failed to marshal ROA json")
} else {
r.cache = data
}
}
return r.cache
}
//////////////////////////////////////////////////////////////////////////
// end of file