8000 micro-optimize prefetch; simplify cold-get; add stats-updater i/f; al… · NVIDIA/aistore@a250bfd · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit a250bfd

Browse files
committed
micro-optimize prefetch; simplify cold-get; add stats-updater i/f; align fields
* with refactoring Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 3206a6b commit a250bfd

25 files changed

+155
-124
lines changed

ais/htrun.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ func (h *htrun) String() string { return h.si.String() }
9999
func (h *htrun) Bowner() meta.Bowner { return h.owner.bmd }
100100
func (h *htrun) Sowner() meta.Sowner { return h.owner.smap }
101101

102+
func (h *htrun) StatsUpdater() cos.StatsUpdater { return h.statsT }
103+
102104
func (h *htrun) PageMM() *memsys.MMSA { return h.gmm }
103105
func (h *htrun) ByteMM() *memsys.MMSA { return h.smm }
104106

ais/target.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,7 @@ func (t *target) Run() error {
322322
t.htrun.init(config)
323323
t.setusr1()
324324

325-
tstats := t.statsT.(*stats.Trunner)
326-
327-
core.Tinit(t, tstats, config, true /*run hk*/)
325+
core.Tinit(t, config, true /*run hk*/)
328326

329327
fatalErr, writeErr := t.checkRestarted(config)
330328
if fatalErr != nil {
@@ -355,6 +353,8 @@ func (t *target) Run() error {
355353
}
356354
t.owner.smap.put(smap)
357355

356+
tstats := t.statsT.(*stats.Trunner)
357+
358358
if daemon.cli.target.standby {
359359
tstats.Standby(true)
360360
t.regstate.disabled.Store(true)
@@ -411,8 +411,8 @@ func (t *target) Run() error {
411411
go t.goresilver(marked.Interrupted)
412412
}
413413

414-
dsort.Tinit(t.statsT, db, config)
415-
dload.Init(t.statsT, db, &config.Client)
414+
dsort.Tinit(db, config)
415+
dload.Init(db, &config.Client)
416416

417417
err = t.htrun.run(config)
418418

ais/tgtimpl.go

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -142,39 +142,33 @@ func (t *target) CopyObject(lom *core.LOM, dm *bundle.DataMover, params *xs.CoiP
142142
return size, err
143143
}
144144

145-
// use `backend.GetObj`
146-
// (compare w/ `backend.GetObjReader` via ldp and blob download)
147145
func (t *target) GetCold(ctx context.Context, lom *core.LOM, xkind string, owt cmn.OWT) (ecode int, err error) {
148-
// 1. lock
146+
// lock
149147
switch owt {
150-
case cmn.OwtGetPrefetchLock:
151-
// do nothing
152-
case cmn.OwtGetTryLock, cmn.OwtGetLock:
153-
if owt == cmn.OwtGetTryLock {
154-
if !lom.TryLock(true) {
155-
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
156-
nlog.Warningln(t.String(), lom.String(), owt.String(), "is busy")
157-
}
158-
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going
148+
case cmn.OwtGetTryLock: // e.g., downloader
149+
if !lom.TryLock(true) {
150+
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
151+
nlog.Warningln(t.String(), lom.String(), owt.String(), "is busy")
159152
}
160-
} else {
161-
lom.Lock(true)
153+
return 0, cmn.ErrSkip // TODO: must be cmn.ErrBusy
162154
}
155+
case cmn.OwtGetLock: // regular usage
156+
lom.Lock(true)
163157
default:
164-
// for cmn.OwtGet, see goi.getCold
158+
// other cold-get use cases include:
159+
// - cmn.OwtGet, goi.getCold
160+
// - cmn.OwtGetPrefetchLock, xs/prefetch
165161
debug.Assert(false, owt.String())
166-
return http.StatusInternalServerError, errors.New("invalid " + owt.String())
162+
return http.StatusInternalServerError, errors.New("cold-get: invalid " + owt.String())
167163
}
168164

169-
// 2. GET remote object and store it
165+
// cold GET
170166
var (
171167
started = mono.NanoTime()
172168
backend = t.Backend(lom.Bck())
173169
)
174170
if ecode, err = backend.GetObj(ctx, lom, owt, nil /*origReq*/); err != nil {
175-
if owt != cmn.OwtGetPrefetchLock {
176-
lom.Unlock(true)
177-
}
171+
lom.Unlock(true)
178172
if cmn.IsErrFailedTo(err) {
179173
nlog.Warningln(err)
180174
} else {
@@ -183,17 +177,11 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, xkind string, owt c
183177
return ecode, err
184178
}
185179

186-
// 3. unlock
187-
switch owt {
188-
case cmn.OwtGetPrefetchLock:
189-
// do nothing
190-
case cmn.OwtGetTryLock, cmn.OwtGetLock:
191-
lom.Unlock(true)
192-
}
193-
194-
// 4. stats
180+
// unlock and stats
181+
lom.Unlock(true)
195182
lat := mono.SinceNano(started)
196183
t.coldstats(backend, lom.Bck().Cname(""), xkind, lom.Lsize(), lat)
184+
197185
return 0, nil
198186
}
199187

ais/tgtobj.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func (poi *putOI) fini() (ecode int, err error) {
379379
case cmn.OwtGetPrefetchLock:
380380
if !lom.TryLock(true) {
381381
nlog.Warningln(poi.loghdr(), "is busy")
382-
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going
382+
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going // TODO: must be cmn.ErrBusy
383383
}
384384
defer lom.Unlock(true)
385385
default:

ais/tgtobj_internal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestMain(m *testing.M) {
7575
t.htrun.init(config)
7676

7777
t.statsT = mock.NewStatsTracker()
78-
core.Tinit(t, t.statsT, config, false)
78+
core.Tinit(t, config, false)
7979

8080
bck := meta.NewBck(testBucket, apc.AIS, cmn.NsGlobal)
8181
bmd := newBucketMD()

api/etl.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ import (
1818
)
1919

2020
type ETLObjArgs struct {
21-
// ETLName specifies the running ETL instance to be used in inline transform.
22-
ETLName string
23-
2421
// TransformArgs holds the arguments to be used in ETL inline transform,
2522
// which will be sent as `apc.QparamETLArgs` query parameter in the request.
23+
// Optional, can be omitted (nil).
2624
TransformArgs any
25+
26+
// ETLName specifies the running ETL instance to be used in inline transform.
27+
ETLName string
2728
}
2829

2930
// Initiate custom ETL workload by executing one of the documented `etl.InitMsg`

cmn/err.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (e *ErrBusy) Error() string {
447447
return fmt.Sprintf("%s %q is currently busy%s, please try again", e.whereOrType, e.what, s)
448448
}
449449

450-
func isErrBusy(err error) bool {
450+
func IsErrBusy(err error) bool {
451451
_, ok := err.(*ErrBusy)
452452
return ok
453453
}
@@ -1286,7 +1286,7 @@ func WriteErr(w http.ResponseWriter, r *http.Request, err error, opts ...int /*[
12861286
status = http.StatusRequestedRangeNotSatisfiable
12871287
case isErrUnsupp(err), isErrNotImpl(err):
12881288
status = http.StatusNotImplemented
1289-
case isErrBusy(err):
1289+
case IsErrBusy(err):
12901290
status = http.StatusConflict
12911291
}
12921292
}

core/lcache.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (lchk *lchk) evict(timeout time.Duration, now time.Time, pct int) {
307307
var (
308308
avail = fs.GetAvail()
309309
wg = &sync.WaitGroup{}
310-
evicted = g.tstats.Get(LcacheEvictedCount)
310+
evicted = T.StatsUpdater().Get(LcacheEvictedCount)
311311
)
312312
lchk.total.Store(0)
313313
for _, mi := range avail {
@@ -323,7 +323,7 @@ func (lchk *lchk) evict(timeout time.Duration, now time.Time, pct int) {
323323
go evct.do()
324324
}
325325
wg.Wait()
326-
evicted = g.tstats.Get(LcacheEvictedCount) - evicted
326+
evicted = T.StatsUpdater().Get(LcacheEvictedCount) - evicted
327327
nlog.Infoln("hk done:", lchk.total.Load(), evicted)
328328
}
329329

@@ -368,7 +368,7 @@ func (evct *evct) f(hkey, value any) bool {
368368
if lmd2 == md {
369369
*md = lom0.md // zero out
370370
}
371-
g.tstats.Inc(LcacheEvictedCount)
371+
T.StatsUpdater().Inc(LcacheEvictedCount)
372372

373373
// throttle
374374
evct.evicted++
@@ -392,13 +392,13 @@ func _flushAtime(md *lmeta, atime time.Time, mdTime int64) {
392392
return
393393
}
394394
if err = lom.flushAtime(atime); err != nil {
395-
g.tstats.Inc(LcacheErrCount)
395+
T.StatsUpdater().Inc(LcacheErrCount)
396396
T.FSHC(err, lom.Mountpath(), lom.FQN)
397397
return
398398
}
399399

400400
// stats
401-
g.tstats.Inc(LcacheFlushColdCount)
401+
T.StatsUpdater().Inc(LcacheFlushColdCount)
402402

403403
if !md.isDirty() {
404404
return
@@ -418,7 +418,7 @@ func _flushAtime(md *lmeta, atime time.Time, mdTime int64) {
418418
continue
419419
}
420420
if err = fs.SetXattr(copyFQN, XattrLOM, buf); err != nil {
421-
g.tstats.Inc(LcacheErrCount)
421+
T.StatsUpdater().Inc(LcacheErrCount)
422422
nlog.Errorln("set-xattr [", copyFQN, err, "]")
423423
break
424424
}

core/ldp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (lom *LOM) CheckRemoteMD(locked, sync bool, origReq *http.Request) (res CRM
178178
ecode, err = 0, errDel
179179
} else {
180180
vlabs := map[string]string{"bucket": lom.Bck().Cname("")} // TODO -- FIXME: cannot import stats
181-
g.tstats.IncWith(RemoteDeletedDelCount, vlabs)
181+
T.StatsUpdater().IncWith(RemoteDeletedDelCount, vlabs)
182182
}
183183
debug.Assert(err != nil)
184184
return CRMD{ErrCode: ecode, Err: err}

core/lom.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ type (
6868

6969
type (
7070
global struct {
71-
tstats cos.StatsUpdater // (stats.Trunner)
7271
pmm *memsys.MMSA
7372
smm *memsys.MMSA
7473
locker nameLocker
@@ -97,13 +96,12 @@ var (
9796

9897
func Pinit() { bckLocker = newNameLocker() }
9998

100-
func Tinit(t Target, tstats cos.StatsUpdater, config *cmn.Config, runHK bool) {
99+
func Tinit(t Target, config *cmn.Config, runHK bool) {
101100
bckLocker = newNameLocker()
102101
T = t
103102
{
104103
g.maxLmeta.Store(xattrMaxSize)
105104
g.locker = newNameLocker()
106-
g.tstats = tstats
107105
g.pmm = t.PageMM()
108106
g.smm = t.ByteMM()
109107
}
@@ -516,7 +514,7 @@ func (lom *LOM) _collide(lmd *lmeta) {
516514
if cmn.Rom.FastV(4, cos.SmoduleCore) || lom.digest&0xf == 5 {
517515
nlog.InfoDepth(1, LcacheCollisionCount, lom.digest, "[", *lmd.uname, "]", *lom.md.uname, lom.Cname())
518516
}
519-
g.tstats.Inc(LcacheCollisionCount)
517+
T.StatsUpdater().Inc(LcacheCollisionCount)
520518
}
521519

522520
func (lom *LOM) Uncache() {

core/mock/target_mock.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/NVIDIA/aistore/cmn"
14+
"github.com/NVIDIA/aistore/cmn/cos"
1415
"github.com/NVIDIA/aistore/core"
1516
"github.com/NVIDIA/aistore/core/meta"
1617
"github.com/NVIDIA/aistore/fs"
@@ -31,21 +32,22 @@ var _ core.Target = (*TargetMock)(nil)
3132

3233
func NewTarget(bo meta.Bowner) *TargetMock {
3334
t := &TargetMock{BO: bo}
34-
core.Tinit(t, NewStatsTracker(), nil /*config*/, false /*run HK*/)
35+
core.Tinit(t, nil /*config*/, false /*run HK*/)
3536
return t
3637
}
3738

3839
func (t *TargetMock) Bowner() meta.Bowner { return t.BO }
3940
func (t *TargetMock) Sowner() meta.Sowner { return t.SO }
4041

41-
func (*TargetMock) SID() string { return mockID }
42-
func (*TargetMock) String() string { return "tmock" }
43-
func (*TargetMock) Snode() *meta.Snode { return &meta.Snode{DaeID: mockID} }
44-
func (*TargetMock) ClusterStarted() bool { return true }
45-
func (*TargetMock) NodeStarted() bool { return true }
46-
func (*TargetMock) DataClient() *http.Client { return http.DefaultClient }
47-
func (*TargetMock) PageMM() *memsys.MMSA { return memsys.PageMM() }
48-
func (*TargetMock) ByteMM() *memsys.MMSA { return memsys.ByteMM() }
42+
func (*TargetMock) SID() string { return mockID }
43+
func (*TargetMock) String() string { return "tmock" }
44+
func (*TargetMock) Snode() *meta.Snode { return &meta.Snode{DaeID: mockID} }
45+
func (*TargetMock) ClusterStarted() bool { return true }
46+
func (*TargetMock) NodeStarted() bool { return true }
47+
func (*TargetMock) DataClient() *http.Client { return http.DefaultClient }
48+
func (*TargetMock) StatsUpdater() cos.StatsUpdater { return NewStatsTracker() }
49+
func (*TargetMock) PageMM() *memsys.MMSA { return memsys.PageMM() }
50+
func (*TargetMock) ByteMM() *memsys.MMSA { return memsys.ByteMM() }
4951

5052
func (*TargetMock) MaxUtilLoad() (int64, float64) { return 0, 0 }
5153

core/node.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Package core provides core metadata and in-cluster API
22
/*
3-
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
3+
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
44
*/
55
package core
66

77
import (
88
"github.com/NVIDIA/aistore/cmn"
9+
"github.com/NVIDIA/aistore/cmn/cos"
910
"github.com/NVIDIA/aistore/core/meta"
1011
"github.com/NVIDIA/aistore/memsys"
1112
)
@@ -30,6 +31,8 @@ type (
3031
ClusterStarted() bool
3132
NodeStarted() bool
3233

34+
StatsUpdater() cos.StatsUpdater
35+
3336
// Memory allocators
3437
PageMM() *memsys.MMSA
3538
ByteMM() *memsys.MMSA

ext/dload/dispatcher.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/NVIDIA/aistore/core"
2222
"github.com/NVIDIA/aistore/core/meta"
2323
"github.com/NVIDIA/aistore/fs"
24-
"github.com/NVIDIA/aistore/stats"
2524
"github.com/NVIDIA/aistore/xact/xreg"
2625
"golang.org/x/sync/errgroup"
2726
)
@@ -46,9 +45,8 @@ type (
4645
}
4746

4847
global struct {
49-
tstats stats.Tracker
50-
db kvdb.Driver
51-
store *infoStore
48+
db kvdb.Driver
49+
store *infoStore
5250

5351
// Downloader selects one of the two clients (below) by the destination URL.
5452
// Certification check is disabled for now and does not depend on cluster settings.
@@ -61,16 +59,13 @@ type (
6159

6260
var g global
6361

64-
func Init(tstats stats.Tracker, db kvdb.Driver, clientConf *cmn.ClientConf) {
62+
func Init(db kvdb.Driver, clientConf *cmn.ClientConf) {
6563
g.clientH, g.clientTLS = cmn.NewDefaultClients(clientConf.TimeoutLong.D())
6664

6765
if db == nil { // unit tests only
6866
return
6967
}
70-
{
71-
g.tstats = tstats
72-
g.db = db
73-
}
68+
g.db = db
7469
xreg.RegNonBckXact(&factory{})
7570
}
7671

97AE

ext/dload/task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (task *singleTask) download(lom *core.LOM) {
9898

9999
vlabs := map[string]string{stats.VarlabBucket: lom.Bck().Cname("")}
100100
lsize := task.currentSize.Load()
101-
g.tstats.AddWith(
101+
core.T.StatsUpdater().AddWith(
102102
cos.NamedVal64{Name: stats.DloadSize, Value: lsize, VarLabs: vlabs},
103103
cos.NamedVal64{Name: stats.DloadLatencyTotal, Value: int64(task.ended.Load().Sub(task.started.Load())), VarLabs: vlabs},
104104
)
@@ -251,7 +251,7 @@ func (task *singleTask) wrapReader(r io.ReadCloser) io.ReadCloser {
251251
// Probably we need to extend the persistent database (db.go) so that it will contain
252252
// also information about specific tasks.
253253
func (task *singleTask) markFailed(statusMsg string) {
254-
g.tstats.Inc(stats.ErrDloadCount)
254+
core.T.StatsUpdater().Inc(stats.ErrDloadCount)
255255
g.store.persistError(task.jobID(), task.obj.objName, statusMsg)
256256
g.store.incErrorCnt(task.jobID())
257257
}

ext/dload/utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestCompareObject(t *testing.T) {
5656
// initialize http clients
5757
clientConf.Timeout = 5 * cos.Duration(time.Second)
5858
clientConf.TimeoutLong = 15 * cos.Duration(time.Second)
59-
dload.Init(nil, nil, &clientConf)
59+
dload.Init(nil, &clientConf)
6060

6161
// Modify local object to contain invalid (meta)data.
6262
customMD := cos.StrKVs{

0 commit comments

Comments
 (0)
0