8000 perf(probe): publish delta reports to reduce data size by bboreham · Pull Request #3677 · weaveworks/scope · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

perf(probe): publish delta reports to reduce data size #3677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions extras/generate_latest_map
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,19 @@ function generate_latest_map() {
return true
}

// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m ${latest_map_type}) EqualIgnoringTimestamps(n ${latest_map_type}) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}

// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this
Expand Down
50 changes: 36 additions & 14 deletions probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Probe struct {
spyInterval, publishInterval time.Duration
publisher ReportPublisher
rateLimiter *rate.Limiter
ticksPerFullReport int
noControls bool

tickers []Ticker
Expand Down Expand Up @@ -77,17 +78,19 @@ type Ticker interface {
func New(
spyInterval, publishInterval time.Duration,
publisher ReportPublisher,
ticksPerFullReport int,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
ticksPerFullReport: ticksPerFullReport,
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
}
return result
}
Expand Down Expand Up @@ -208,7 +211,7 @@ func (p *Probe) tag(r report.Report) report.Report {
return r
}

func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) {
func (p *Probe) drainAndSanitise(rpt report.Report, rs chan report.Report) report.Report {
p.rateLimiter.Wait(context.Background())
ForLoop:
for {
Expand All @@ -225,25 +228,44 @@ ForLoop:
t.Controls = report.Controls{}
})
}
if err := p.publisher.Publish(rpt); err != nil {
log.Infof("Publish: %v", err)
}
return rpt
}

func (p *Probe) publishLoop() {
defer p.done.Done()
pubTick := time.Tick(p.publishInterval)
publishCount := 0
var lastFullReport report.Report

for {
var err error
select {
case <-pubTick:
p.drainAndPublish(report.MakeReport(), p.spiedReports)
rpt := p.drainAndSanitise(report.MakeReport(), p.spiedReports)
fullReport := (publishCount % p.ticksPerFullReport) == 0
if !fullReport {
rpt.UnsafeUnMerge(lastFullReport)
}
err = p.publisher.Publish(rpt)
if err == nil {
if fullReport {
lastFullReport = rpt
}
publishCount++
} else {
// If we failed to send then drop back to full report next time
publishCount = 0
}

case rpt := <-p.shortcutReports:
p.drainAndPublish(rpt, p.shortcutReports)
rpt = p.drainAndSanitise(rpt, p.shortcutReports)
err = p.publisher.Publish(rpt)

case <-p.quit:
return
}
if err != nil {
log.Infof("Publish: %v", err)
}
}
}
4 changes: 2 additions & 2 deletions probe/probe_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestApply(t *testing.T) {
endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"})
)

p := New(0, 0, nil, false)
p := New(0, 0, nil, 1, false)
p.AddTagger(NewTopologyTagger())

r := report.MakeReport()
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestProbe(t *testing.T) {

pub := mockPublisher{make(chan report.Report, 10)}

p := New(10*time.Millisecond, 100*time.Millisecond, pub, false)
p := New(10*time.Millisecond, 100*time.Millisecond, pub, 1, false)
p.AddReporter(mockReporter{want})
p.Start()
defer p.Stop()
Expand Down
2 changes: 2 additions & 0 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type probeFlags struct {
token string
httpListen string
publishInterval time.Duration
ticksPerFullReport int
spyInterval time.Duration
pluginsRoot string
insecure bool
Expand Down Expand Up @@ -297,6 +298,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server")
flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval")
flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval")
flag.IntVar(&flags.probe.ticksPerFullReport, "probe.full-report-every", 3, "publish full report every N times, deltas in between. Make sure N < (app.window / probe.publish.interval)")
flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins")
flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (e.g. start/stop containers, terminals, logs ...)")
flag.BoolVar(&flags.probe.noCommandLineArguments, "probe.omit.cmd-args", false, "Disable collection of command-line arguments")
Expand Down
2 changes: 1 addition & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
clients = multiClients
}

p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls)
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.ticksPerFullReport, flags.noControls)
p.AddTagger(probe.NewTopologyTagger())
var processCache *process.CachingWalker

Expand Down
5 changes: 5 additions & 0 deletions report/id_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (a IDList) Merge(b IDList) IDList {
return IDList(merged)
}

// Equal returns true if a and b have the same contents
func (a IDList) Equal(b IDList) bool {
return StringSet(a).Equal(StringSet(b))
}

// Contains returns true if id is in the list.
func (a IDList) Contains(id string) bool {
return StringSet(a).Contains(id)
Expand Down
26 changes: 26 additions & 0 deletions report/latest_map_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool {
return true
}

// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m StringLatestMap) EqualIgnoringTimestamps(n StringLatestMap) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}

// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this
Expand Down Expand Up @@ -450,6 +463,19 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool {
return true
}

// EqualIgnoringTimestamps returns true if all keys and values are the same.
func (m NodeControlDataLatestMap) EqualIgnoringTimestamps(n NodeControlDataLatestMap) bool {
if m.Size() != n.Size() {
return false
}
for i := range m {
if m[i].key != n[i].key || m[i].Value != n[i].Value {
return false
}
}
return true
}

// CodecEncodeSelf implements codec.Selfer.
// Duplicates the output for a built-in map without generating an
// intermediate copy of the data structure, to save time. Note this
Expand Down
44 changes: 44 additions & 0 deletions report/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,47 @@ func (n Node) Merge(other Node) Node {
Children: n.Children.Merge(other.Children),
}
}

// UnsafeUnMerge removes data from n that would be added by merging other,
// modifying the original.
// returns true if n.Merge(other) is the same as n
func (n *Node) UnsafeUnMerge(other Node) bool {
// If it's not the same ID and topology then just bail out
if n.ID != other.ID || n.Topology != other.Topology {
return false
}
n.ID = ""
n.Topology = ""
remove := true
// We either keep a whole section or drop it if anything changed
// - a trade-off of some extra data size in favour of faster simpler code.
// (in practice, very few values reported by Scope probes do change over time)
if n.LatestControls.EqualIgnoringTimestamps(other.LatestControls) {
n.LatestControls = nil
} else {
remove = false
}
if n.Latest.EqualIgnoringTimestamps(other.Latest) {
n.Latest = nil
} else {
remove = false
}
if n.Sets.DeepEqual(other.Sets) {
n.Sets = MakeSets()
} else {
remove = false
}
if n.Parents.DeepEqual(other.Parents) {
n.Parents = MakeSets()
} else {
remove = false
}
if n.Adjacency.Equal(other.Adjacency) {
n.Adjacency = nil
} else {
remove = false
}
// counters and children are not created in the probe so we don't check those
// metrics don't overlap so just check if we have any
return remove && len(n.Metrics) == 0
}
12 changes: 11 additions & 1 deletion report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ type Report struct {
// Job represent all Kubernetes Job on hosts running probes.
Job Topology

DNS DNSRecords
DNS DNSRecords `json:"nodes,omitempty" deepequal:"nil==empty"`

// Sampling data for this report.
Sampling Sampling
Expand Down Expand Up @@ -351,6 +351,16 @@ func (r *Report) UnsafeMerge(other Report) {
})
}

// UnsafeUnMerge removes any information from r that would be added by merging other.
// The original is modified.
func (r *Report) UnsafeUnMerge(other Report) {
// TODO: DNS, Sampling, Plugins
r.Window = r.Window - other.Window
r.WalkPairedTopologies(&other, func(ourTopology, theirTopology *Topology) {
ourTopology.UnsafeUnMerge(*theirTopology)
})
}

// WalkTopologies iterates through the Topologies of the report,
// potentially modifying them
func (r *Report) WalkTopologies(f func(*Topology)) {
Expand Down
38 changes: 38 additions & 0 deletions report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,41 @@ func TestReportUpgrade(t *testing.T) {
t.Error(test.Diff(expected, got))
}
}

func TestReportUnMerge(t *testing.T) {
n1 := report.MakeNodeWith("foo", map[string]string{"foo": "bar"})
r1 := makeTestReport()
r2 := r1.Copy()
r2.Container.AddNode(n1)
// r2 should be the same as r1 with just the foo-bar node added
r2.UnsafeUnMerge(r1)
// Now r2 should have everything removed except that one node, and its ID
expected := report.Report{
ID: r2.ID,
Container: report.Topology{
Nodes: report.Nodes{
"foo": n1,
},
},
}

// Now test report with two nodes unmerged on report with one
r1.Container.AddNode(n1)
r2 = r1.Copy()
n2 := report.MakeNodeWith("foo2", map[string]string{"ping": "pong"})
r2.Container.AddNode(n2)
// r2 should be the same as r1 with one extra node
r2.UnsafeUnMerge(r1)
expected = report.Report{
ID: r2.ID,
Container: report.Topology{
Nodes: report.Nodes{
"foo2": n2,
},
},
}

if !s_reflect.DeepEqual(expected, r2) {
t.Error(test.Diff(expected, r2))
}
}
36 changes: 36 additions & 0 deletions report/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,27 @@ func (t *Topology) UnsafeMerge(other Topology) {
t.TableTemplates = t.TableTemplates.Merge(other.TableTemplates)
}

// UnsafeUnMerge removes any information from t that would be added by merging other,
// modifying the original.
func (t *Topology) UnsafeUnMerge(other Topology) {
if t.Shape == other.Shape {
t.Shape = ""
}
if t.Label == other.Label && t.LabelPlural == other.LabelPlural {
t.Label, t.LabelPlural = "", ""
}
if t.Tag == other.Tag {
t.Tag = ""
}
t.Nodes.UnsafeUnMerge(other.Nodes)
// TODO Controls
// NOTE: taking a shortcut and assuming templates are static, which they have always been in Scope
// If you break that assumption please change this.
t.MetadataTemplates = nil
t.MetricTemplates = nil
t.TableTemplates = nil
}

// Nodes is a collection of nodes in a topology. Keys are node IDs.
// TODO(pb): type Topology map[string]Node
type Nodes map[string]Node
Expand Down Expand Up @@ -249,6 +270,21 @@ func (n *Nodes) UnsafeMerge(other Nodes) {
}
}

// UnsafeUnMerge removes nodes from n that would be added by merging other,
// modifying the original.
func (n *Nodes) UnsafeUnMerge(other Nodes) {
for k, node := range *n {
if otherNode, ok := (other)[k]; ok {
remove := node.UnsafeUnMerge(otherNode)
if remove {
delete(*n, k)
} else {
(*n)[k] = node
}
}
}
}

// Validate checks the topology for various inconsistencies.
func (t Topology) Validate() error {
errs := []string{}
Expand Down
0