From 01b9f37dd0fd453225e042d6f3a27cd14f252cd0 Mon Sep 17 00:00:00 2001 From: CamiloGarciaLaRotta Date: Sun, 13 Jun 2021 10:40:36 -0400 Subject: [PATCH 1/8] doc: rely on cache.Wait() instead of time.Sleep() (#280) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f4bb28cd..4224cca4 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ func main() { cache.Set("key", "value", 1) // wait for value to pass through buffers - time.Sleep(10 * time.Millisecond) + cache.Wait() value, found := cache.Get("key") if !found { From 777d8d2f019bdb32c4c26efb3b6eef07c67238ce Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 17 Aug 2021 07:06:27 -0700 Subject: [PATCH 2/8] feat(SuperFlag): Allow a way to get error while merging --- z/flags.go | 4 ++-- z/flags_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/z/flags.go b/z/flags.go index a55c474a..fb6dffb6 100644 --- a/z/flags.go +++ b/z/flags.go @@ -134,14 +134,14 @@ func (sf *SuperFlag) String() string { } func (sf *SuperFlag) MergeAndCheckDefault(flag string) *SuperFlag { - sf, err := sf.mergeAndCheckDefaultImpl(flag) + sf, err := sf.MergeWithDefault(flag) if err != nil { glog.Fatal(err) } return sf } -func (sf *SuperFlag) mergeAndCheckDefaultImpl(flag string) (*SuperFlag, error) { +func (sf *SuperFlag) MergeWithDefault(flag string) (*SuperFlag, error) { if sf == nil { m, err := parseFlag(flag) if err != nil { diff --git a/z/flags_test.go b/z/flags_test.go index 2cadb805..fe61d473 100644 --- a/z/flags_test.go +++ b/z/flags_test.go @@ -16,7 +16,7 @@ func TestFlag(t *testing.T) { const def = `bool_key=false; int-key=0; float-key=1.0; string-key=; other-key=5; duration-minutes=15m; duration-hours=12h; duration-days=30d;` - _, err := NewSuperFlag("boolo-key=true").mergeAndCheckDefaultImpl(def) + _, err := NewSuperFlag("boolo-key=true").MergeWithDefault(def) require.Error(t, err) _, err = newSuperFlagImpl("key-without-value") require.Error(t, err) From 89e99415887abf0cd5fcaa0c48d25a72548b8de4 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 24 Aug 2021 07:51:21 -0400 Subject: [PATCH 3/8] feat(Ristretto): Introduce SetIfPresent and refactor codebase (#282) * Introduce SetIfPresent and move metrics out to new file. * Introduce ShouldUpdate, to allow Dgraph to only update the cache with a higher version entry. * Remove interfaces. * Rename structs for clarity. --- cache.go | 258 ++++++------------------------------------------- metrics.go | 249 +++++++++++++++++++++++++++++++++++++++++++++++ policy.go | 135 ++++++++++---------------- policy_test.go | 32 +++--- store.go | 73 ++++++-------- store_test.go | 18 ++-- ttl.go | 12 ++- 7 files changed, 391 insertions(+), 386 deletions(-) create mode 100644 metrics.go diff --git a/cache.go b/cache.go index 7226245b..1b3b1a8d 100644 --- a/cache.go +++ b/cache.go @@ -20,11 +20,8 @@ package ristretto import ( - "bytes" "errors" - "fmt" "sync" - "sync/atomic" "time" "unsafe" @@ -45,9 +42,9 @@ const itemSize = int64(unsafe.Sizeof(storeItem{})) // from as many goroutines as you want. type Cache struct { // store is the central concurrent hashmap where key-value items are stored. - store store + store *shardedMap // policy determines what gets let in to the cache and what gets kicked out. - policy policy + policy *lfuPolicy // getBuf is a custom ring buffer implementation that gets pushed to when // keys are read. getBuf *ringBuffer @@ -126,6 +123,8 @@ type Config struct { // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. KeyToHash func(key interface{}) (uint64, uint64) + // shouldUpdate is called when a value already exists in cache and is being updated. + ShouldUpdate func(prev, cur interface{}) bool // Cost evaluates a value and outputs a corresponding cost. This function // is ran after Set is called for a new item or an item update with a cost // param of 0. @@ -168,7 +167,7 @@ func NewCache(config *Config) (*Cache, error) { } policy := newPolicy(config.NumCounters, config.MaxCost) cache := &Cache{ - store: newStore(), + store: newShardedMap(config.ShouldUpdate), policy: policy, getBuf: newRingBuffer(policy, config.BufferItems), setBuf: make(chan *Item, setBufSize), @@ -195,6 +194,12 @@ func NewCache(config *Config) (*Cache, error) { } cache.onExit(item.Value) } + cache.store.shouldUpdate = func(prev, cur interface{}) bool { + if config.ShouldUpdate != nil { + return config.ShouldUpdate(prev, cur) + } + return true + } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash } @@ -254,6 +259,17 @@ func (c *Cache) Set(key, value interface{}, cost int64) bool { // expires, which is identical to calling Set. A negative value is a no-op and the value // is discarded. func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool { + return c.setInternal(key, value, cost, ttl, false) +} + +// SetIfPresent is like Set, but only updates the value of an existing key. It +// does NOT add the key to cache if it's absent. +func (c *Cache) SetIfPresent(key, value interface{}, cost int64) bool { + return c.setInternal(key, value, cost, 0*time.Second, true) +} + +func (c *Cache) setInternal(key, value interface{}, + cost int64, ttl time.Duration, onlyUpdate bool) bool { if c == nil || c.isClosed || key == nil { return false } @@ -279,11 +295,18 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration Cost: cost, Expiration: expiration, } + if onlyUpdate { + i.flag = itemUpdate + } // cost is eventually updated. The expiration must also be immediately updated // to prevent items from being prematurely removed from the map. if prev, ok := c.store.Update(i); ok { c.onExit(prev) i.flag = itemUpdate + } else if onlyUpdate { + // The instruction was to update the key, but store.Update failed. So, + // this is a NOOP. + return false } // Attempt to send item to policy. select { @@ -494,226 +517,3 @@ func (c *Cache) processItems() { } } } - -// collectMetrics just creates a new *Metrics instance and adds the pointers -// to the cache and policy instances. -func (c *Cache) collectMetrics() { - c.Metrics = newMetrics() - c.policy.CollectMetrics(c.Metrics) -} - -type metricType int - -const ( - // The following 2 keep track of hits and misses. - hit = iota - miss - // The following 3 keep track of number of keys added, updated and evicted. - keyAdd - keyUpdate - keyEvict - // The following 2 keep track of cost of keys added and evicted. - costAdd - costEvict - // The following keep track of how many sets were dropped or rejected later. - dropSets - rejectSets - // The following 2 keep track of how many gets were kept and dropped on the - // floor. - dropGets - keepGets - // This should be the final enum. Other enums should be set before this. - doNotUse -) - -func stringFor(t metricType) string { - switch t { - case hit: - return "hit" - case miss: - return "miss" - case keyAdd: - return "keys-added" - case keyUpdate: - return "keys-updated" - case keyEvict: - return "keys-evicted" - case costAdd: - return "cost-added" - case costEvict: - return "cost-evicted" - case dropSets: - return "sets-dropped" - case rejectSets: - return "sets-rejected" // by policy. - case dropGets: - return "gets-dropped" - case keepGets: - return "gets-kept" - default: - return "unidentified" - } -} - -// Metrics is a snapshot of performance statistics for the lifetime of a cache instance. -type Metrics struct { - all [doNotUse][]*uint64 - - mu sync.RWMutex - life *z.HistogramData // Tracks the life expectancy of a key. -} - -func newMetrics() *Metrics { - s := &Metrics{ - life: z.NewHistogramData(z.HistogramBounds(1, 16)), - } - for i := 0; i < doNotUse; i++ { - s.all[i] = make([]*uint64, 256) - slice := s.all[i] - for j := range slice { - slice[j] = new(uint64) - } - } - return s -} - -func (p *Metrics) add(t metricType, hash, delta uint64) { - if p == nil { - return - } - valp := p.all[t] - // Avoid false sharing by padding at least 64 bytes of space between two - // atomic counters which would be incremented. - idx := (hash % 25) * 10 - atomic.AddUint64(valp[idx], delta) -} - -func (p *Metrics) get(t metricType) uint64 { - if p == nil { - return 0 - } - valp := p.all[t] - var total uint64 - for i := range valp { - total += atomic.LoadUint64(valp[i]) - } - return total -} - -// Hits is the number of Get calls where a value was found for the corresponding key. -func (p *Metrics) Hits() uint64 { - return p.get(hit) -} - -// Misses is the number of Get calls where a value was not found for the corresponding key. -func (p *Metrics) Misses() uint64 { - return p.get(miss) -} - -// KeysAdded is the total number of Set calls where a new key-value item was added. -func (p *Metrics) KeysAdded() uint64 { - return p.get(keyAdd) -} - -// KeysUpdated is the total number of Set calls where the value was updated. -func (p *Metrics) KeysUpdated() uint64 { - return p.get(keyUpdate) -} - -// KeysEvicted is the total number of keys evicted. -func (p *Metrics) KeysEvicted() uint64 { - return p.get(keyEvict) -} - -// CostAdded is the sum of costs that have been added (successful Set calls). -func (p *Metrics) CostAdded() uint64 { - return p.get(costAdd) -} - -// CostEvicted is the sum of all costs that have been evicted. -func (p *Metrics) CostEvicted() uint64 { - return p.get(costEvict) -} - -// SetsDropped is the number of Set calls that don't make it into internal -// buffers (due to contention or some other reason). -func (p *Metrics) SetsDropped() uint64 { - return p.get(dropSets) -} - -// SetsRejected is the number of Set calls rejected by the policy (TinyLFU). -func (p *Metrics) SetsRejected() uint64 { - return p.get(rejectSets) -} - -// GetsDropped is the number of Get counter increments that are dropped -// internally. -func (p *Metrics) GetsDropped() uint64 { - return p.get(dropGets) -} - -// GetsKept is the number of Get counter increments that are kept. -func (p *Metrics) GetsKept() uint64 { - return p.get(keepGets) -} - -// Ratio is the number of Hits over all accesses (Hits + Misses). This is the -// percentage of successful Get calls. -func (p *Metrics) Ratio() float64 { - if p == nil { - return 0.0 - } - hits, misses := p.get(hit), p.get(miss) - if hits == 0 && misses == 0 { - return 0.0 - } - return float64(hits) / float64(hits+misses) -} - -func (p *Metrics) trackEviction(numSeconds int64) { - if p == nil { - return - } - p.mu.Lock() - defer p.mu.Unlock() - p.life.Update(numSeconds) -} - -func (p *Metrics) LifeExpectancySeconds() *z.HistogramData { - if p == nil { - return nil - } - p.mu.RLock() - defer p.mu.RUnlock() - return p.life.Copy() -} - -// Clear resets all the metrics. -func (p *Metrics) Clear() { - if p == nil { - return - } - for i := 0; i < doNotUse; i++ { - for j := range p.all[i] { - atomic.StoreUint64(p.all[i][j], 0) - } - } - p.mu.Lock() - p.life = z.NewHistogramData(z.HistogramBounds(1, 16)) - p.mu.Unlock() -} - -// String returns a string representation of the metrics. -func (p *Metrics) String() string { - if p == nil { - return "" - } - var buf bytes.Buffer - for i := 0; i < doNotUse; i++ { - t := metricType(i) - fmt.Fprintf(&buf, "%s: %d ", stringFor(t), p.get(t)) - } - fmt.Fprintf(&buf, "gets-total: %d ", p.get(hit)+p.get(miss)) - fmt.Fprintf(&buf, "hit-ratio: %.2f", p.Ratio()) - return buf.String() -} diff --git a/metrics.go b/metrics.go new file mode 100644 index 00000000..a75496ed --- /dev/null +++ b/metrics.go @@ -0,0 +1,249 @@ +/* + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ristretto + +import ( + "bytes" + "fmt" + "sync" + "sync/atomic" + + "github.com/dgraph-io/ristretto/z" +) + +type metricType int + +const ( + // The following 2 keep track of hits and misses. + hit = iota + miss + // The following 3 keep track of number of keys added, updated and evicted. + keyAdd + keyUpdate + keyEvict + // The following 2 keep track of cost of keys added and evicted. + costAdd + costEvict + // The following keep track of how many sets were dropped or rejected later. + dropSets + rejectSets + // The following 2 keep track of how many gets were kept and dropped on the + // floor. + dropGets + keepGets + // This should be the final enum. Other enums should be set before this. + doNotUse +) + +func stringFor(t metricType) string { + switch t { + case hit: + return "hit" + case miss: + return "miss" + case keyAdd: + return "keys-added" + case keyUpdate: + return "keys-updated" + case keyEvict: + return "keys-evicted" + case costAdd: + return "cost-added" + case costEvict: + return "cost-evicted" + case dropSets: + return "sets-dropped" + case rejectSets: + return "sets-rejected" // by policy. + case dropGets: + return "gets-dropped" + case keepGets: + return "gets-kept" + default: + return "unidentified" + } +} + +// Metrics is a snapshot of performance statistics for the lifetime of a cache instance. +type Metrics struct { + all [doNotUse][]*uint64 + + mu sync.RWMutex + life *z.HistogramData // Tracks the life expectancy of a key. +} + +// collectMetrics just creates a new *Metrics instance and adds the pointers +// to the cache and policy instances. +func (c *Cache) collectMetrics() { + c.Metrics = newMetrics() + c.policy.CollectMetrics(c.Metrics) +} + +func newMetrics() *Metrics { + s := &Metrics{ + life: z.NewHistogramData(z.HistogramBounds(1, 16)), + } + for i := 0; i < doNotUse; i++ { + s.all[i] = make([]*uint64, 256) + slice := s.all[i] + for j := range slice { + slice[j] = new(uint64) + } + } + return s +} + +func (p *Metrics) add(t metricType, hash, delta uint64) { + if p == nil { + return + } + valp := p.all[t] + // Avoid false sharing by padding at least 64 bytes of space between two + // atomic counters which would be incremented. + idx := (hash % 25) * 10 + atomic.AddUint64(valp[idx], delta) +} + +func (p *Metrics) get(t metricType) uint64 { + if p == nil { + return 0 + } + valp := p.all[t] + var total uint64 + for i := range valp { + total += atomic.LoadUint64(valp[i]) + } + return total +} + +// Hits is the number of Get calls where a value was found for the corresponding key. +func (p *Metrics) Hits() uint64 { + return p.get(hit) +} + +// Misses is the number of Get calls where a value was not found for the corresponding key. +func (p *Metrics) Misses() uint64 { + return p.get(miss) +} + +// KeysAdded is the total number of Set calls where a new key-value item was added. +func (p *Metrics) KeysAdded() uint64 { + return p.get(keyAdd) +} + +// KeysUpdated is the total number of Set calls where the value was updated. +func (p *Metrics) KeysUpdated() uint64 { + return p.get(keyUpdate) +} + +// KeysEvicted is the total number of keys evicted. +func (p *Metrics) KeysEvicted() uint64 { + return p.get(keyEvict) +} + +// CostAdded is the sum of costs that have been added (successful Set calls). +func (p *Metrics) CostAdded() uint64 { + return p.get(costAdd) +} + +// CostEvicted is the sum of all costs that have been evicted. +func (p *Metrics) CostEvicted() uint64 { + return p.get(costEvict) +} + +// SetsDropped is the number of Set calls that don't make it into internal +// buffers (due to contention or some other reason). +func (p *Metrics) SetsDropped() uint64 { + return p.get(dropSets) +} + +// SetsRejected is the number of Set calls rejected by the policy (TinyLFU). +func (p *Metrics) SetsRejected() uint64 { + return p.get(rejectSets) +} + +// GetsDropped is the number of Get counter increments that are dropped +// internally. +func (p *Metrics) GetsDropped() uint64 { + return p.get(dropGets) +} + +// GetsKept is the number of Get counter increments that are kept. +func (p *Metrics) GetsKept() uint64 { + return p.get(keepGets) +} + +// Ratio is the number of Hits over all accesses (Hits + Misses). This is the +// percentage of successful Get calls. +func (p *Metrics) Ratio() float64 { + if p == nil { + return 0.0 + } + hits, misses := p.get(hit), p.get(miss) + if hits == 0 && misses == 0 { + return 0.0 + } + return float64(hits) / float64(hits+misses) +} + +func (p *Metrics) trackEviction(numSeconds int64) { + if p == nil { + return + } + p.mu.Lock() + defer p.mu.Unlock() + p.life.Update(numSeconds) +} + +func (p *Metrics) LifeExpectancySeconds() *z.HistogramData { + if p == nil { + return nil + } + p.mu.RLock() + defer p.mu.RUnlock() + return p.life.Copy() +} + +// Clear resets all the metrics. +func (p *Metrics) Clear() { + if p == nil { + return + } + for i := 0; i < doNotUse; i++ { + for j := range p.all[i] { + atomic.StoreUint64(p.all[i][j], 0) + } + } + p.mu.Lock() + p.life = z.NewHistogramData(z.HistogramBounds(1, 16)) + p.mu.Unlock() +} + +// String returns a string representation of the metrics. +func (p *Metrics) String() string { + if p == nil { + return "" + } + var buf bytes.Buffer + for i := 0; i < doNotUse; i++ { + t := metricType(i) + fmt.Fprintf(&buf, "%s: %d ", stringFor(t), p.get(t)) + } + fmt.Fprintf(&buf, "gets-total: %d ", p.get(hit)+p.get(miss)) + fmt.Fprintf(&buf, "hit-ratio: %.2f", p.Ratio()) + return buf.String() +} diff --git a/policy.go b/policy.go index bf23f91f..af6228a8 100644 --- a/policy.go +++ b/policy.go @@ -30,56 +30,21 @@ const ( lfuSample = 5 ) -// policy is the interface encapsulating eviction/admission behavior. -// -// TODO: remove this interface and just rename defaultPolicy to policy, as we -// are probably only going to use/implement/maintain one policy. -type policy interface { - ringConsumer - // Add attempts to Add the key-cost pair to the Policy. It returns a slice - // of evicted keys and a bool denoting whether or not the key-cost pair - // was added. If it returns true, the key should be stored in cache. - Add(uint64, int64) ([]*Item, bool) - // Has returns true if the key exists in the Policy. - Has(uint64) bool - // Del deletes the key from the Policy. - Del(uint64) - // Cap returns the available capacity. - Cap() int64 - // Close stops all goroutines and closes all channels. - Close() - // Update updates the cost value for the key. - Update(uint64, int64) - // Cost returns the cost value of a key or -1 if missing. - Cost(uint64) int64 - // Optionally, set stats object to track how policy is performing. - CollectMetrics(*Metrics) - // Clear zeroes out all counters and clears hashmaps. - Clear() - // MaxCost returns the current max cost of the cache policy. - MaxCost() int64 - // UpdateMaxCost updates the max cost of the cache policy. - UpdateMaxCost(int64) -} - -func newPolicy(numCounters, maxCost int64) policy { - return newDefaultPolicy(numCounters, maxCost) -} - -type defaultPolicy struct { +// lfuPolicy encapsulates eviction/admission behavior. +type lfuPolicy struct { sync.Mutex admit *tinyLFU - evict *sampledLFU + costs *keyCosts itemsCh chan []uint64 stop chan struct{} isClosed bool metrics *Metrics } -func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy { - p := &defaultPolicy{ +func newPolicy(numCounters, maxCost int64) *lfuPolicy { + p := &lfuPolicy{ admit: newTinyLFU(numCounters), - evict: newSampledLFU(maxCost), + costs: newSampledLFU(maxCost), itemsCh: make(chan []uint64, 3), stop: make(chan struct{}), } @@ -87,9 +52,9 @@ func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy { return p } -func (p *defaultPolicy) CollectMetrics(metrics *Metrics) { +func (p *lfuPolicy) CollectMetrics(metrics *Metrics) { p.metrics = metrics - p.evict.metrics = metrics + p.costs.metrics = metrics } type policyPair struct { @@ -97,7 +62,7 @@ type policyPair struct { cost int64 } -func (p *defaultPolicy) processItems() { +func (p *lfuPolicy) processItems() { for { select { case items := <-p.itemsCh: @@ -110,7 +75,7 @@ func (p *defaultPolicy) processItems() { } } -func (p *defaultPolicy) Push(keys []uint64) bool { +func (p *lfuPolicy) Push(keys []uint64) bool { if p.isClosed { return false } @@ -132,28 +97,28 @@ func (p *defaultPolicy) Push(keys []uint64) bool { // Add decides whether the item with the given key and cost should be accepted by // the policy. It returns the list of victims that have been evicted and a boolean // indicating whether the incoming item should be accepted. -func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) { +func (p *lfuPolicy) Add(key uint64, cost int64) ([]*Item, bool) { p.Lock() defer p.Unlock() // Cannot add an item bigger than entire cache. - if cost > p.evict.getMaxCost() { + if cost > p.costs.getMaxCost() { return nil, false } // No need to go any further if the item is already in the cache. - if has := p.evict.updateIfHas(key, cost); has { + if has := p.costs.updateIfHas(key, cost); has { // An update does not count as an addition, so return false. return nil, false } // If the execution reaches this point, the key doesn't exist in the cache. // Calculate the remaining room in the cache (usually bytes). - room := p.evict.roomLeft(cost) + room := p.costs.roomLeft(cost) if room >= 0 { // There's enough room in the cache to store the new item without // overflowing. Do that now and stop here. - p.evict.add(key, cost) + p.costs.add(key, cost) p.metrics.add(costAdd, key, uint64(cost)) return nil, true } @@ -170,9 +135,9 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) { // Delete victims until there's enough space or a minKey is found that has // more hits than incoming item. - for ; room < 0; room = p.evict.roomLeft(cost) { + for ; room < 0; room = p.costs.roomLeft(cost) { // Fill up empty slots in sample. - sample = p.evict.fillSample(sample) + sample = p.costs.fillSample(sample) // Find minimally used item in sample. minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0) @@ -190,7 +155,7 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) { } // Delete the victim from metadata. - p.evict.del(minKey) + p.costs.del(minKey) // Delete the victim from sample. sample[minId] = sample[len(sample)-1] @@ -203,40 +168,40 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) { }) } - p.evict.add(key, cost) + p.costs.add(key, cost) p.metrics.add(costAdd, key, uint64(cost)) return victims, true } -func (p *defaultPolicy) Has(key uint64) bool { +func (p *lfuPolicy) Has(key uint64) bool { p.Lock() - _, exists := p.evict.keyCosts[key] + _, exists := p.costs.keyCosts[key] p.Unlock() return exists } -func (p *defaultPolicy) Del(key uint64) { +func (p *lfuPolicy) Del(key uint64) { p.Lock() - p.evict.del(key) + p.costs.del(key) p.Unlock() } -func (p *defaultPolicy) Cap() int64 { +func (p *lfuPolicy) Cap() int64 { p.Lock() - capacity := int64(p.evict.getMaxCost() - p.evict.used) + capacity := int64(p.costs.getMaxCost() - p.costs.used) p.Unlock() return capacity } -func (p *defaultPolicy) Update(key uint64, cost int64) { +func (p *lfuPolicy) Update(key uint64, cost int64) { p.Lock() - p.evict.updateIfHas(key, cost) + p.costs.updateIfHas(key, cost) p.Unlock() } -func (p *defaultPolicy) Cost(key uint64) int64 { +func (p *lfuPolicy) Cost(key uint64) int64 { p.Lock() - if cost, found := p.evict.keyCosts[key]; found { + if cost, found := p.costs.keyCosts[key]; found { p.Unlock() return cost } @@ -244,14 +209,14 @@ func (p *defaultPolicy) Cost(key uint64) int64 { return -1 } -func (p *defaultPolicy) Clear() { +func (p *lfuPolicy) Clear() { p.Lock() p.admit.clear() - p.evict.clear() + p.costs.clear() p.Unlock() } -func (p *defaultPolicy) Close() { +func (p *lfuPolicy) Close() { if p.isClosed { return } @@ -263,22 +228,22 @@ func (p *defaultPolicy) Close() { p.isClosed = true } -func (p *defaultPolicy) MaxCost() int64 { - if p == nil || p.evict == nil { +func (p *lfuPolicy) MaxCost() int64 { + if p == nil || p.costs == nil { return 0 } - return p.evict.getMaxCost() + return p.costs.getMaxCost() } -func (p *defaultPolicy) UpdateMaxCost(maxCost int64) { - if p == nil || p.evict == nil { +func (p *lfuPolicy) UpdateMaxCost(maxCost int64) { + if p == nil || p.costs == nil { return } - p.evict.updateMaxCost(maxCost) + p.costs.updateMaxCost(maxCost) } -// sampledLFU is an eviction helper storing key-cost pairs. -type sampledLFU struct { +// keyCosts stores key-cost pairs. +type keyCosts struct { // NOTE: align maxCost to 64-bit boundary for use with atomic. // As per https://golang.org/pkg/sync/atomic/: "On ARM, x86-32, // and 32-bit MIPS, it is the caller’s responsibility to arrange @@ -291,26 +256,26 @@ type sampledLFU struct { keyCosts map[uint64]int64 } -func newSampledLFU(maxCost int64) *sampledLFU { - return &sampledLFU{ +func newSampledLFU(maxCost int64) *keyCosts { + return &keyCosts{ keyCosts: make(map[uint64]int64), maxCost: maxCost, } } -func (p *sampledLFU) getMaxCost() int64 { +func (p *keyCosts) getMaxCost() int64 { return atomic.LoadInt64(&p.maxCost) } -func (p *sampledLFU) updateMaxCost(maxCost int64) { +func (p *keyCosts) updateMaxCost(maxCost int64) { atomic.StoreInt64(&p.maxCost, maxCost) } -func (p *sampledLFU) roomLeft(cost int64) int64 { +func (p *keyCosts) roomLeft(cost int64) int64 { return p.getMaxCost() - (p.used + cost) } -func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair { +func (p *keyCosts) fillSample(in []*policyPair) []*policyPair { if len(in) >= lfuSample { return in } @@ -323,7 +288,7 @@ func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair { return in } -func (p *sampledLFU) del(key uint64) { +func (p *keyCosts) del(key uint64) { cost, ok := p.keyCosts[key] if !ok { return @@ -334,12 +299,12 @@ func (p *sampledLFU) del(key uint64) { p.metrics.add(keyEvict, key, 1) } -func (p *sampledLFU) add(key uint64, cost int64) { +func (p *keyCosts) add(key uint64, cost int64) { p.keyCosts[key] = cost p.used += cost } -func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool { +func (p *keyCosts) updateIfHas(key uint64, cost int64) bool { if prev, found := p.keyCosts[key]; found { // Update the cost of an existing key, but don't worry about evicting. // Evictions will be handled the next time a new item is added. @@ -358,7 +323,7 @@ func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool { return false } -func (p *sampledLFU) clear() { +func (p *keyCosts) clear() { p.used = 0 p.keyCosts = make(map[uint64]int64) } diff --git a/policy_test.go b/policy_test.go index de58b8cd..49d33e2d 100644 --- a/policy_test.go +++ b/policy_test.go @@ -15,14 +15,14 @@ func TestPolicy(t *testing.T) { } func TestPolicyMetrics(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.CollectMetrics(newMetrics()) require.NotNil(t, p.metrics) - require.NotNil(t, p.evict.metrics) + require.NotNil(t, p.costs.metrics) } func TestPolicyProcessItems(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.itemsCh <- []uint64{1, 2, 2} time.Sleep(wait) p.Lock() @@ -39,7 +39,7 @@ func TestPolicyProcessItems(t *testing.T) { } func TestPolicyPush(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) require.True(t, p.Push([]uint64{})) keepCount := 0 @@ -52,12 +52,12 @@ func TestPolicyPush(t *testing.T) { } func TestPolicyAdd(t *testing.T) { - p := newDefaultPolicy(1000, 100) + p := newPolicy(1000, 100) if victims, added := p.Add(1, 101); victims != nil || added { t.Fatal("can't add an item bigger than entire cache") } p.Lock() - p.evict.add(1, 1) + p.costs.add(1, 1) p.admit.Increment(1) p.admit.Increment(2) p.admit.Increment(3) @@ -81,14 +81,14 @@ func TestPolicyAdd(t *testing.T) { } func TestPolicyHas(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 1) require.True(t, p.Has(1)) require.False(t, p.Has(2)) } func TestPolicyDel(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 1) p.Del(1) p.Del(2) @@ -97,29 +97,29 @@ func TestPolicyDel(t *testing.T) { } func TestPolicyCap(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 1) require.Equal(t, int64(9), p.Cap()) } func TestPolicyUpdate(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 1) p.Update(1, 2) p.Lock() - require.Equal(t, int64(2), p.evict.keyCosts[1]) + require.Equal(t, int64(2), p.costs.keyCosts[1]) p.Unlock() } func TestPolicyCost(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 2) require.Equal(t, int64(2), p.Cost(1)) require.Equal(t, int64(-1), p.Cost(2)) } func TestPolicyClear(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 1) p.Add(2, 2) p.Add(3, 3) @@ -135,20 +135,20 @@ func TestPolicyClose(t *testing.T) { require.NotNil(t, recover()) }() - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Add(1, 1) p.Close() p.itemsCh <- []uint64{1} } func TestPushAfterClose(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Close() require.False(t, p.Push([]uint64{1, 2})) } func TestAddAfterClose(t *testing.T) { - p := newDefaultPolicy(100, 10) + p := newPolicy(100, 10) p.Close() p.Add(1, 1) } diff --git a/store.go b/store.go index e42a98b7..5d5395c8 100644 --- a/store.go +++ b/store.go @@ -29,51 +29,28 @@ type storeItem struct { expiration time.Time } -// store is the interface fulfilled by all hash map implementations in this -// file. Some hash map implementations are better suited for certain data -// distributions than others, so this allows us to abstract that out for use -// in Ristretto. -// -// Every store is safe for concurrent usage. -type store interface { - // Get returns the value associated with the key parameter. - Get(uint64, uint64) (interface{}, bool) - // Expiration returns the expiration time for this key. - Expiration(uint64) time.Time - // Set adds the key-value pair to the Map or updates the value if it's - // already present. The key-value pair is passed as a pointer to an - // item object. - Set(*Item) - // Del deletes the key-value pair from the Map. - Del(uint64, uint64) (uint64, interface{}) - // Update attempts to update the key with a new value and returns true if - // successful. - Update(*Item) (interface{}, bool) - // Cleanup removes items that have an expired TTL. - Cleanup(policy policy, onEvict itemCallback) - // Clear clears all contents of the store. - Clear(onEvict itemCallback) -} - -// newStore returns the default store implementation. -func newStore() store { - return newShardedMap() -} - const numShards uint64 = 256 +type updateFn func(prev, cur interface{}) bool type shardedMap struct { - shards []*lockedMap - expiryMap *expirationMap + shards []*lockedMap + expiryMap *expirationMap + shouldUpdate func(prev, cur interface{}) bool } -func newShardedMap() *shardedMap { +// newShardedMap is safe for concurrent usage. +func newShardedMap(fn updateFn) *shardedMap { sm := &shardedMap{ shards: make([]*lockedMap, int(numShards)), expiryMap: newExpirationMap(), } + if fn == nil { + fn = func(prev, cur interface{}) bool { + return true + } + } for i := range sm.shards { - sm.shards[i] = newLockedMap(sm.expiryMap) + sm.shards[i] = newLockedMap(fn, sm.expiryMap) } return sm } @@ -103,7 +80,7 @@ func (sm *shardedMap) Update(newItem *Item) (interface{}, bool) { return sm.shards[newItem.Key%numShards].Update(newItem) } -func (sm *shardedMap) Cleanup(policy policy, onEvict itemCallback) { +func (sm *shardedMap) Cleanup(policy *lfuPolicy, onEvict itemCallback) { sm.expiryMap.cleanup(sm, policy, onEvict) } @@ -115,14 +92,16 @@ func (sm *shardedMap) Clear(onEvict itemCallback) { type lockedMap struct { sync.RWMutex - data map[uint64]storeItem - em *expirationMap + data map[uint64]storeItem + em *expirationMap + shouldUpdate updateFn } -func newLockedMap(em *expirationMap) *lockedMap { +func newLockedMap(fn updateFn, em *expirationMap) *lockedMap { return &lockedMap{ - data: make(map[uint64]storeItem), - em: em, + data: make(map[uint64]storeItem), + em: em, + shouldUpdate: fn, } } @@ -166,6 +145,9 @@ func (m *lockedMap) Set(i *Item) { if i.Conflict != 0 && (i.Conflict != item.conflict) { return } + if !m.shouldUpdate(item.value, i.Value) { + return + } m.em.update(i.Key, i.Conflict, item.expiration, i.Expiration) } else { // The value is not in the map already. There's no need to return anything. @@ -204,15 +186,18 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) { func (m *lockedMap) Update(newItem *Item) (interface{}, bool) { m.Lock() + defer m.Unlock() + item, ok := m.data[newItem.Key] if !ok { - m.Unlock() return nil, false } if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) { - m.Unlock() return nil, false } + if !m.shouldUpdate(item.value, newItem.Value) { + return item.value, false + } m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration) m.data[newItem.Key] = storeItem{ @@ -221,8 +206,6 @@ func (m *lockedMap) Update(newItem *Item) (interface{}, bool) { value: newItem.Value, expiration: newItem.Expiration, } - - m.Unlock() return item.value, true } diff --git a/store_test.go b/store_test.go index eea9440d..0cec53aa 100644 --- a/store_test.go +++ b/store_test.go @@ -9,7 +9,7 @@ import ( ) func TestStoreSetGet(t *testing.T) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) i := Item{ Key: key, @@ -40,7 +40,7 @@ func TestStoreSetGet(t *testing.T) { } func TestStoreDel(t *testing.T) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) i := Item{ Key: key, @@ -57,7 +57,7 @@ func TestStoreDel(t *testing.T) { } func TestStoreClear(t *testing.T) { - s := newStore() + s := newShardedMap(nil) for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) it := Item{ @@ -77,7 +77,7 @@ func TestStoreClear(t *testing.T) { } func TestStoreUpdate(t *testing.T) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) i := Item{ Key: key, @@ -119,7 +119,7 @@ func TestStoreUpdate(t *testing.T) { } func TestStoreCollision(t *testing.T) { - s := newShardedMap() + s := newShardedMap(nil) s.shards[1].Lock() s.shards[1].data[1] = storeItem{ key: 1, @@ -154,7 +154,7 @@ func TestStoreCollision(t *testing.T) { } func TestStoreExpiration(t *testing.T) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) expiration := time.Now().Add(time.Second) i := Item{ @@ -184,7 +184,7 @@ func TestStoreExpiration(t *testing.T) { } func BenchmarkStoreGet(b *testing.B) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) i := Item{ Key: key, @@ -201,7 +201,7 @@ func BenchmarkStoreGet(b *testing.B) { } func BenchmarkStoreSet(b *testing.B) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { @@ -217,7 +217,7 @@ func BenchmarkStoreSet(b *testing.B) { } func BenchmarkStoreUpdate(b *testing.B) { - s := newStore() + s := newShardedMap(nil) key, conflict := z.KeyToHash(1) i := Item{ Key: key, diff --git a/ttl.go b/ttl.go index 337976ad..6e4bf38b 100644 --- a/ttl.go +++ b/ttl.go @@ -77,17 +77,25 @@ func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time if m == nil { return } + if oldExpTime.IsZero() && newExpTime.IsZero() { + return + } m.Lock() defer m.Unlock() oldBucketNum := storageBucket(oldExpTime) + newBucketNum := storageBucket(newExpTime) + if oldBucketNum == newBucketNum { + // No change. + return + } + oldBucket, ok := m.buckets[oldBucketNum] if ok { delete(oldBucket, key) } - newBucketNum := storageBucket(newExpTime) newBucket, ok := m.buckets[newBucketNum] if !ok { newBucket = make(bucket) @@ -114,7 +122,7 @@ func (m *expirationMap) del(key uint64, expiration time.Time) { // cleanup removes all the items in the bucket that was just completed. It deletes // those items from the store, and calls the onEvict function on those items. // This function is meant to be called periodically. -func (m *expirationMap) cleanup(store store, policy policy, onEvict itemCallback) { +func (m *expirationMap) cleanup(store *shardedMap, policy *lfuPolicy, onEvict itemCallback) { if m == nil { return } From c2901dc76889a90201a15b9cfc3c04bbfe0c298c Mon Sep 17 00:00:00 2001 From: Andrey Yaysh Date: Thu, 2 Sep 2021 00:40:26 +0200 Subject: [PATCH 4/8] Remove mmapSize check, it has wrong value on ARM64 (#281) --- z/mmap_linux.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/z/mmap_linux.go b/z/mmap_linux.go index 9cc3497a..8843e424 100644 --- a/z/mmap_linux.go +++ b/z/mmap_linux.go @@ -17,7 +17,6 @@ package z import ( - "fmt" "os" "reflect" "unsafe" @@ -41,7 +40,7 @@ func mremap(data []byte, size int) ([]byte, error) { const MREMAP_MAYMOVE = 0x1 header := (*reflect.SliceHeader)(unsafe.Pointer(&data)) - mmapAddr, mmapSize, errno := unix.Syscall6( + mmapAddr, _, errno := unix.Syscall6( unix.SYS_MREMAP, header.Data, uintptr(header.Len), @@ -53,9 +52,6 @@ func mremap(data []byte, size int) ([]byte, error) { if errno != 0 { return nil, errno } - if mmapSize != uintptr(size) { - return nil, fmt.Errorf("mremap size mismatch: requested: %d got: %d", size, mmapSize) - } header.Data = mmapAddr header.Cap = size From 91c6a4080750eed6af6b26870fbe90c4236ac81e Mon Sep 17 00:00:00 2001 From: Yongsheng Xu Date: Tue, 28 Sep 2021 23:26:03 +0800 Subject: [PATCH 5/8] feat(mmap): support msync for windows (#283) See: https://docs.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-flushviewoffile --- z/mmap_windows.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/z/mmap_windows.go b/z/mmap_windows.go index 171176e9..0ea6e944 100644 --- a/z/mmap_windows.go +++ b/z/mmap_windows.go @@ -91,6 +91,5 @@ func madvise(b []byte, readahead bool) error { } func msync(b []byte) error { - // TODO: Figure out how to do msync on Windows. - return nil + return syscall.FlushViewOfFile(uintptr(unsafe.Pointer(&b[0])), uintptr(len(b))) } From 55e7615b73e57b2762a402aceb64088dc99f7cc0 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Mon, 4 Oct 2021 14:46:27 -0400 Subject: [PATCH 6/8] readme: Include SpiceDB in the list of projects using Ristretto (#285) Include SpiceDB in the list of projects using Ristretto. Signed-off-by: Jimmy Zelinskie --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4224cca4..7a67b182 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,8 @@ Below is a list of known projects that use Ristretto: - [Badger](https://github.com/dgraph-io/badger) - Embeddable key-value DB in Go - [Dgraph](https://github.com/dgraph-io/dgraph) - Horizontally scalable and distributed GraphQL database with a graph backend -- [Vitess](https://github.com/vitessio/vitess) - database clustering system for horizontal scaling of MySQL +- [Vitess](https://github.com/vitessio/vitess) - Database clustering system for horizontal scaling of MySQL +- [SpiceDB](https://github.com/authzed/spicedb) - Horizontally scalable permissions database ## FAQ From efb105d0ca5ed9ceec285b838c0bf7fabf8d3bf2 Mon Sep 17 00:00:00 2001 From: "brain.duan" Date: Sat, 23 Oct 2021 01:04:58 +0800 Subject: [PATCH 7/8] fix(z): runtime error: index out of range for !amd64 env #287 fix `runtime error: index out of range` in non-amd64 (arm64) env --- z/simd/search.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/z/simd/search.go b/z/simd/search.go index 0d001ee0..b1e63922 100644 --- a/z/simd/search.go +++ b/z/simd/search.go @@ -20,7 +20,7 @@ package simd // Search uses the Clever search to find the correct key. func Search(xs []uint64, k uint64) int16 { - if len(xs) < 8 { + if len(xs) < 8 || (len(xs) % 8 != 0) { return Naive(xs, k) } var twos, pk [4]uint64 From 297c39e6640f84b171ff0c32ff1f1e7c9e346ce4 Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Mon, 8 Nov 2021 11:05:08 +0530 Subject: [PATCH 8/8] chore: update go version in test.sh (#290) --- test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.sh b/test.sh index d53b32d4..0028a24f 100755 --- a/test.sh +++ b/test.sh @@ -12,7 +12,7 @@ if [ -z "${TEAMCITY_VERSION}" ]; then -v `pwd`:/go/src/github.com/dgraph-io/ristretto \ --workdir /go/src/github.com/dgraph-io/ristretto \ --env TEAMCITY_VERSION=local \ - golang:1.13 \ + golang:1.16 \ sh test.sh else # running in teamcity, since teamcity itself run this in container, let's simply run this