TaskStats: Updates.

Signed-off-by: Shishir Mahajan <smahajan@roblox.com>
This commit is contained in:
Shishir Mahajan 2020-06-12 18:01:00 -07:00
parent 68af55ea5c
commit 3ce882e1be
No known key found for this signature in database
GPG Key ID: D41782E7688DEC4A

View File

@ -11,7 +11,6 @@ import (
v1 "github.com/containerd/cgroups/stats/v1"
v2 "github.com/containerd/cgroups/v2/stats"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/cio"
"github.com/containerd/typeurl"
"github.com/hashicorp/go-hclog"
@ -189,14 +188,6 @@ func (h *taskHandle) stats(ctx, ctxContainerd context.Context, interval time.Dur
}
func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxContainerd context.Context, interval time.Duration) {
statsInterval, err := time.ParseDuration("30s")
if err != nil {
h.logger.Info("Error in parsing time interval.")
return
}
interval = statsInterval
defer close(ch)
timer := time.NewTimer(0)
for {
@ -204,7 +195,6 @@ func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxCon
case <-ctx.Done():
return
case <-timer.C:
h.logger.Info("HELLO: timer.C")
timer.Reset(interval)
}
@ -215,11 +205,10 @@ func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxCon
return
}
h.logger.Info("HELLO: STEP 1")
anydata, err := typeurl.UnmarshalAny(metric.Data)
if err != nil {
h.logger.Error("Failed to unmarshal metric data:", "error", err)
return
}
var (
@ -227,34 +216,55 @@ func (h *taskHandle) handleStats(ch chan *drivers.TaskResourceUsage, ctx, ctxCon
data2 *v2.Metrics
)
var taskResourceUsage *drivers.TaskResourceUsage
switch v := anydata.(type) {
case *v1.Metrics:
h.logger.Info("HELLO: v1.Metrics")
data = v
taskResourceUsage = h.getV1TaskResourceUsage(data)
case *v2.Metrics:
h.logger.Info("HELLO: v2.Metrics")
data2 = v
taskResourceUsage = h.getV2TaskResourceUsage(data2)
default:
h.logger.Error("Cannot convert metric data to cgroups.Metrics")
return
}
h.logger.Info(fmt.Sprintf("V1 Metrics: %+v\n", data))
h.logger.Info(fmt.Sprintf("V2 Metrics: %+v\n", data2))
select {
case <-ctx.Done():
return
case ch <- h.getTaskResourceUsage(metric):
case ch <- taskResourceUsage:
}
}
}
// Convert containerd task metric to TaskResourceUsage.
func (h *taskHandle) getTaskResourceUsage(metric *types.Metric) *drivers.TaskResourceUsage {
return nil
// Convert containerd V1 task metrics to TaskResourceUsage.
func (h *taskHandle) getV1TaskResourceUsage(metrics *v1.Metrics) *drivers.TaskResourceUsage {
cs := &drivers.CpuStats{
SystemMode: float64(metrics.CPU.Usage.Kernel),
UserMode: float64(metrics.CPU.Usage.User),
TotalTicks: float64(metrics.CPU.Usage.Total),
}
ms := &drivers.MemoryStats{
RSS: metrics.Memory.RSS,
Cache: metrics.Memory.Cache,
}
ts := time.Now().UTC().UnixNano()
return &drivers.TaskResourceUsage{
ResourceUsage: &drivers.ResourceUsage{
CpuStats: cs,
MemoryStats: ms,
},
Timestamp: ts,
}
}
// Convert containerd V2 task metrics to TaskResourceUsage.
func (h *taskHandle) getV2TaskResourceUsage(metrics *v2.Metrics) *drivers.TaskResourceUsage {
return nil
}
func (h *taskHandle) signal(ctxContainerd context.Context, sig os.Signal) error {
return h.task.Kill(ctxContainerd, sig.(syscall.Signal))
}