8000 tscache: switch to skiplist implementation by default, remove Request interface by nvanbenschoten · Pull Request #20232 · cockroachdb/cockroach · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

tscache: switch to skiplist implementation by default, remove Request interface #20232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,21 +680,11 @@ func EnsureSafeSplitKey(key roachpb.Key) (roachpb.Key, error) {
// Range returns a key range encompassing the key ranges of all requests in the
// Batch.
func Range(ba roachpb.BatchRequest) (roachpb.RSpan, error) {
return RangeMatchingPred(ba, nil)
}

// RangeMatchingPred returns a key range encompassing the key ranges of all
// requests in the Batch that match the provided predicate. If no predicate
// is provided, no filtering will be performed on the requests in the Batch.
func RangeMatchingPred(
ba roachpb.BatchRequest, pred func(roachpb.Request) bool,
) (roachpb.RSpan, error) {
from := roachpb.RKeyMax
to := roachpb.RKeyMin
for _, arg := range ba.Requests {
req := arg.GetInner()
_, noop := req.(*roachpb.NoopRequest)
if noop || (pred != nil && !pred(req)) {
if _, ok := req.(*roachpb.NoopRequest); ok {
continue
}
h := req.Header()
Expand Down
5 changes: 0 additions & 5 deletions pkg/roachpb/data.go
10000
Original file line number Diff line number Diff line change
Expand Up @@ -1373,11 +1373,6 @@ func (rs RSpan) AsRawSpanWithNoLocals() Span {
}
}

// Overlaps returns whether the two spans overlap.
func (rs RSpan) Overlaps(other RSpan) bool {
return rs.AsRawSpanWithNoLocals().Overlaps(other.AsRawSpanWithNoLocals())
}

// KeyValueByKey implements sorting of a slice of KeyValues by key.
type KeyValueByKey []KeyValue

Expand Down
9 changes: 2 additions & 7 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,15 +324,10 @@ func (r *Replica) CommandSizesLen() int {
// GetTSCacheHighWater returns the high water mark of the replica's timestamp
// cache.
func (r *Replica) GetTSCacheHighWater() hlc.Timestamp {
if !r.store.tsCacheMu.cache.ThreadSafe() {
r.store.tsCacheMu.Lock()
defer r.store.tsCacheMu.Unlock()
}

start := roachpb.Key(r.Desc().StartKey)
end := roachpb.Key(r.Desc().EndKey)
t, _ := r.store.tsCacheMu.cache.GetMaxRead(start, end)
if w, _ := r.store.tsCacheMu.cache.GetMaxWrite(start, end); t.Less(w) {
t, _ := r.store.tsCache.GetMaxRead(start, end)
if w, _ := r.store.tsCache.GetMaxWrite(start, end); t.Less(w) {
t = w
}
return t
Expand Down
165 changes: 30 additions & 135 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/tscache"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
Expand Down Expand Up @@ -1980,19 +1978,7 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry pr
// marked as affecting the cache are processed. Inconsistent reads
// are excluded.
if pErr == nil && retry == proposalNoRetry && ec.ba.ReadConsistency != roachpb.INCONSISTENT {
creq := makeTSCacheRequest(&ec.ba, br)

if ec.repl.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset {
// Clockless mode: all reads count as writes.
creq.Writes, creq.Reads = append(creq.Writes, creq.Reads...), nil
}
if !ec.repl.store.tsCacheMu.cache.ThreadSafe() {
ec.repl.store.tsCacheMu.Lock()
}
ec.repl.store.tsCacheMu.cache.AddRequest(creq)
if !ec.repl.store.tsCacheMu.cache.ThreadSafe() {
ec.repl.store.tsCacheMu.Unlock()
}
ec.repl.updateTimestampCache(&ec.ba, br)
}

if fn := ec.repl.store.cfg.TestingKnobs.OnCommandQueueAction; fn != nil {
Expand All @@ -2001,106 +1987,38 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry pr
ec.repl.removeCmdsFromCommandQueue(ec.cmds)
}

func makeTSCacheRequest(ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *tscache.Request {
cr := tscache.NewRequest()
cr.Timestamp = ba.Timestamp
if ba.Txn != nil {
cr.TxnID = ba.Txn.ID
}

span, err := keys.RangeMatchingPred(*ba, roachpb.UpdatesTimestampCache)
if err != nil {
// This can't happen because we've already called keys.Range before
// evaluating the request.
log.Fatal(context.Background(), err)
}

// Copy all keys necessary for the TimestampCache Request into a single byte
// slice. This has been shown to dramatically reduce the GC time spent
// scanning TimestampCache-related memory. Before doing so, add up the
// length of all keys we'll need to copy so we can allocate exactly the
// buffer size we'll need. While doing so, we can also count how large the
// read and write span slices need to be.
var readCount, writeCount int
keysSize := bufalloc.RSpanSize(span)
for i, union := range ba.Requests {
args := union.GetInner()
if roachpb.UpdatesTimestampCache(args) {
header := args.Header()

// Count key sizes and read or write requests. This should be kept
// in-sync with the type switch below.
switch args.(type) {
case *roachpb.DeleteRangeRequest:
writeCount++
keysSize += bufalloc.SpanSize(header)
case *roachpb.EndTransactionRequest:
keysSize += bufalloc.SpanSize(header)
case *roachpb.ScanRequest:
readCount++
keysSize += len(header.Key)
resp := br.Responses[i].GetInner().(*roachpb.ScanResponse)
if ba.Header.MaxSpanRequestKeys != 0 &&
ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) {
// See below. +1 because we call Next on this key.
keysSize += len(resp.Rows[len(resp.Rows)-1].Key) + 1
} else {
keysSize += len(header.EndKey)
}
case *roachpb.ReverseScanRequest:
readCount++
keysSize += len(header.EndKey)
resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse)
if ba.Header.MaxSpanRequestKeys != 0 &&
ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) {
// See below.
keysSize += len(resp.Rows[len(resp.Rows)-1].Key)
} else {
keysSize += len(header.Key)
}
default:
readCount++
keysSize += bufalloc.SpanSize(header)
}
}
// updateTimestampCache updates the timestamp cache in order to set a low water
// mark for the timestamp at which mutations to keys overlapping the provided
// request can write, such that they don't re-write history.
func (r *Replica) updateTimestampCache(ba *roachpb.BatchRequest, br *roachpb.BatchResponse) {
readOnlyUseReadCache := true
if r.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset {
// Clockless mode: all reads count as writes.
readOnlyUseReadCache = false
}

// Allocate slices for the read and write spans. NewRequest will return a
// Read slice with some space already allocated, so we try to use this
// wherever possible.
if cap(cr.Reads) < readCount {
// Maybe we can use the pre-allocated span slice for Writes?
cr.Writes = cr.Reads
cr.Reads = make([]roachpb.Span, 0, readCount)
}
if cap(cr.Writes) < writeCount {
cr.Writes = make([]roachpb.Span, 0, writeCount)
tc := r.store.tsCache
ts := ba.Timestamp
var txnID uuid.UUID
if ba.Txn != nil {
txnID = ba.Txn.ID
}

// Create a ByteAllocator with exactly the number of bytes we'll need for
// all keys in the CacheRequest. It should never need to re-alloc.
alloc := bufalloc.ByteAllocator(make([]byte, 0, keysSize))
alloc, cr.Span = alloc.CopyRSpan(span)

for i, union := range ba.Requests {
args := union.GetInner()
if roachpb.UpdatesTimestampCache(args) {
header := args.Header()
var headerCopy roachpb.Span
start, end := header.Key, header.EndKey
switch args.(type) {
case *roachpb.DeleteRangeRequest:
// DeleteRange adds to the write timestamp cache to prevent
// subsequent writes from rewriting history.
alloc, headerCopy = alloc.CopySpan(header)
cr.Writes = append(cr.Writes, headerCopy)
tc.Add(start, end, ts, txnID, false /* readCache */)
case *roachpb.EndTransactionRequest:
// EndTransaction adds to the write timestamp cache to ensure replays
// create a transaction record with WriteTooOld set.
//
// Note that TimestampCache.ExpandRequests lazily creates the
// transaction key from the request key.
alloc, headerCopy = alloc.CopySpan(header)
cr.Txn = headerCopy
// EndTransaction adds the transaction key to the write
// timestamp cache to ensure replays create a transaction
// record with WriteTooOld set.
key := keys.TransactionKey(start, txnID)
tc.Add(key, nil, ts, txnID, false /* readCache */)
case *roachpb.ScanRequest:
resp := br.Responses[i].GetInner().(*roachpb.ScanResponse)
if ba.Header.MaxSpanRequestKeys != 0 &&
Expand All @@ -2114,29 +2032,24 @@ func makeTSCacheRequest(ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *ts
// to prevent phantom read anomalies. That means we can only
// perform this truncate if the scan requested a limited number of
// results and we hit that limit.
header.EndKey = resp.Rows[len(resp.Rows)-1].Key.Next()
end = resp.Rows[len(resp.Rows)-1].Key.Next()
}
alloc, headerCopy = alloc.CopySpan(header)
cr.Reads = append(cr.Reads, headerCopy)
tc.Add(start, end, ts, txnID, readOnlyUseReadCache)
case *roachpb.ReverseScanRequest:
resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse)
if ba.Header.MaxSpanRequestKeys != 0 &&
ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) {
// See comment in the ScanRequest case. For revert scans, results
// are returned in reverse order and we truncate the start key of
// the span.
header.Key = resp.Rows[len(resp.Rows)-1].Key
start = resp.Rows[len(resp.Rows)-1].Key
}
alloc, headerCopy = alloc.CopySpan(header)
cr.Reads = append(cr.Reads, headerCopy)
tc.Add(start, end, ts, txnID, readOnlyUseReadCache)
default:
alloc, headerCopy = alloc.CopySpan(header)
cr.Reads = append(cr.Reads, headerCopy)
tc.Add(start, end, ts, txnID, readOnlyUseReadCache)
}
}
}

return cr
}

func collectSpans(
Expand Down Expand Up @@ -2365,24 +2278,6 @@ func (r *Replica) removeCmdsFromCommandQueue(cmds batchCmdSet) {
func (r *Replica) applyTimestampCache(
ctx context.Context, ba *roachpb.BatchRequest,
) (bool, *roachpb.Error) {
span, err := keys.RangeMatchingPred(*ba, roachpb.ConsultsTimestampCache)
if err != nil {
return false, roachpb.NewError(err)
}

// TODO(peter): We only need to hold a write lock during the ExpandRequests
// calls. Investigate whether using a RWMutex here reduces lock contention.
if !r.store.tsCacheMu.cache.ThreadSafe() {
r.store.tsCacheMu.Lock()
defer r.store.tsCacheMu.Unlock()
}

if ba.Txn != nil {
r.store.tsCacheMu.cache.ExpandRequests(span, ba.Txn.Timestamp)
} else {
r.store.tsCacheMu.cache.ExpandRequests(span, ba.Timestamp)
}

var bumped bool
for _, union := range ba.Requests {
args := union.GetInner()
Expand All @@ -2393,7 +2288,7 @@ func (r *Replica) applyTimestampCache(
// has already been finalized, in which case this is a replay.
if _, ok := args.(*roachpb.BeginTransactionRequest); ok {
key := keys.TransactionKey(header.Key, ba.Txn.ID)
wTS, wTxnID := r.store.tsCacheMu.cache.GetMaxWrite(key, nil)
wTS, wTxnID := r.store.tsCache.GetMaxWrite(key, nil)
// GetMaxWrite will only find a timestamp interval with an
// associated txnID on the TransactionKey if an EndTxnReq has
// been processed. All other timestamp intervals will have no
Expand All @@ -2411,7 +2306,7 @@ func (r *Replica) applyTimestampCache(
// retry. If it's really a replay, it won't retry.
txn := ba.Txn.Clone()
bumped = txn.Timestamp.Forward(wTS.Next()) || bumped
err = roachpb.NewTransactionRetryError(roachpb.RETRY_POSSIBLE_REPLAY)
err := roachpb.NewTransactionRetryError(roachpb.RETRY_POSSIBLE_REPLAY)
return bumped, roachpb.NewErrorWithTxn(err, &txn)
}
default:
Expand All @@ -2422,7 +2317,7 @@ func (r *Replica) applyTimestampCache(
}

// Forward the timestamp if there's been a more recent read (by someone else).
rTS, rTxnID := r.store.tsCacheMu.cache.GetMaxRead(header.Key, header.EndKey)
rTS, rTxnID := r.store.tsCache.GetMaxRead(header.Key, header.EndKey)
if ba.Txn != nil {
if ba.Txn.ID != rTxnID {
nextTS := rTS.Next()
Expand All @@ -2440,7 +2335,7 @@ func (r *Replica) applyTimestampCache(
// write too old boolean for transactions. Note that currently
// only EndTransaction and DeleteRange requests update the
// write timestamp cache.
wTS, wTxnID := r.store.tsCacheMu.cache.GetMaxWrite(header.Key, header.EndKey)
wTS, wTxnID := r.store.tsCache.GetMaxWrite(header.Key, header.EndKey)
if ba.Txn != nil {
if ba.Txn.ID != wTxnID {
if !wTS.Less(ba.Txn.Timestamp) {
Expand Down
8 changes: 1 addition & 7 deletions 8 pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
// lease's expiration but instead use the new lease's start to initialize
// the timestamp cache low water.
desc := r.Desc()
if !r.store.tsCacheMu.cache.ThreadSafe() {
r.store.tsCacheMu.Lock()
}
for _, keyRange := range makeReplicatedKeyRanges(desc) {
r.store.tsCacheMu.cache.SetLowWater(keyRange.start.Key, keyRange.end.Key, newLease.Start)
}
if !r.store.tsCacheMu.cache.ThreadSafe() {
r.store.tsCacheMu.Unlock()
r.store.tsCache.SetLowWater(keyRange.start.Key, keyRange.end.Key, newLease.Start)
}

// Reset the request counts used to make lease placement decisions whenever
Expand Down
Loading
0