8000 observability: jobs to report (num-joggers, num-workers, channel-full) · NVIDIA/aistore@37c2434 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 37c2434

Browse files
committed
observability: jobs to report (num-joggers, num-workers, channel-full)
* TCB and TCO * part one Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 01e6eea commit 37c2434

File tree

14 files changed

+81
-52
lines changed

14 files changed

+81
-52
lines changed

ais/prxtxn.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,10 +1216,11 @@ func (r *_tcbfin) cb(nl nl.Listener) {
12161216
return
12171217
default:
12181218
nlog.Warningln("abort:", err)
1219-
if r.existed {
1219+
if r.existed || nlog.Stopping() || strings.Contains(err.Error(), apc.ActShutdownCluster) {
12201220
return
12211221
}
12221222
}
1223-
// when (tcb aborted) and (did not exist prior)
1223+
1224+
// NOTE: when (tcb aborted) && (destination bucket did not exist prior)
12241225
_ = r.p.destroyBucket(&apc.ActMsg{Action: apc.ActDestroyBck}, r.bck)
12251226
}

api/apc/lsmsg.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,23 +107,23 @@ const (
107107
LocIsCopyMissingObj // missing "main replica"
108108

109109
// LsoEntry Flags
110-
EntryIsCached = 1 << (EntryStatusBits + 1)
111-
EntryInArch = 1 << (EntryStatusBits + 2)
112-
EntryIsDir = 1 << (EntryStatusBits + 3)
113-
EntryIsArchive = 1 << (EntryStatusBits + 4)
114-
EntryVerChanged = 1 << (EntryStatusBits + 5) // see also: QparamLatestVer, et al.
115-
EntryVerRemoved = 1 << (EntryStatusBits + 6) // ditto
110+
EntryIsCached = 1 << (statusBits + 1)
111+
EntryInArch = 1 << (statusBits + 2)
112+
EntryIsDir = 1 << (statusBits + 3)
113+
EntryIsArchive = 1 << (statusBits + 4)
114+
EntryVerChanged = 1 << (statusBits + 5) // see also: QparamLatestVer, et al.
115+
EntryVerRemoved = 1 << (statusBits + 6) // ditto
116116
// added v3.26
117-
EntryHeadFail = 1 << (EntryStatusBits + 7)
117+
EntryHeadFail = 1 << (statusBits + 7)
118118
)
119119

120-
// ObjEntry.Flags field
120+
// cmn/objlist_utils
121121
const (
122-
EntryStatusBits = 5 // N bits
123-
EntryStatusMask = (1 << EntryStatusBits) - 1 // mask for N low bits
122+
statusBits = 5
123+
LsoStatusMask = (1 << statusBits) - 1
124124
)
125125

126-
// LsoMsg and HEAD(object) enum (NOTE: compare with `cmn.ObjectProps`)
126+
// LsoMsg and HEAD(object) enum
127127
const (
128128
GetPropsName = "name"
129129
GetPropsSize = "size"

cmd/cli/cli/show_hdlr.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ func xlistByKindID(c *cli.Context, xargs *xact.ArgsMsg, caption bool, xs xact.Mu
551551
// do nothing
552552
case ctlmsg == "":
553553
ctlmsg = nmsg
554+
555+
// TODO -- FIXME: draft
556+
// a, b, c := snaps[0].Unpack()
557+
// ctlmsg += fmt.Sprintf("; parallelism: (joggers: %d, workers: %d, chan-full: %d", a, b, c)
558+
554559
case strings.HasSuffix(ctlmsg, "..."):
555560
// do nothing
556561
case strings.Contains(ctlmsg, nmsg):

cmd/cli/teb/utils.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ var (
9696
"FormatACL": fmtACL,
9797
"FormatNameDirArch": fmtNameDirArch,
9898
"FormatXactRunFinAbrt": FmtXactRunFinAbrt,
99-
"FormatCtlMsg": fmtCtlMsg,
10099
// misc. helpers
101100
"IsUnsetTime": isUnsetTime,
102101
"IsEqS": func(a, b string) bool { return a == b },
@@ -538,13 +537,6 @@ func FmtXactRunFinAbrt(snap *core.Snap) (s string) {
538537
return
539538
}
540539

541-
func fmtCtlMsg(ctlmsg string) string {
542-
if ctlmsg == "" {
543-
return NotSetVal
544-
}
545-
return "'" + ctlmsg + "'"
546-
}
547-
548540
func extECGetStats(base *core.Snap) *ec.ExtECGetStats {
549541
ecGet := &ec.ExtECGetStats{}
550542
if err := cos.MorphMarshal(base.Ext, ecGet); err != nil {

cmn/objlist_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (entries LsoEntries) cmp(i, j int) bool {
2525
}
2626

2727
////////////
28-
// LsoEnt //
28+
// LsoEnt (for apc.* constants, see api/apc/lsmsg)
2929
////////////
3030

3131
// flags:
@@ -39,7 +39,7 @@ func (be *LsoEnt) IsAnyFlagSet(fl uint16) bool { return be.Flags&fl != 0 }
3939

4040
// location _status_
4141
func (be *LsoEnt) IsStatusOK() bool { return be.Status() == 0 }
42-
func (be *LsoEnt) Status() uint16 { return be.Flags & apc.EntryStatusMask }
42+
func (be *LsoEnt) Status() uint16 { return be.Flags & apc.LsoStatusMask }
4343

4444
// sorting
4545
func (be *LsoEnt) less(oe *LsoEnt) bool {

core/xaction.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
// Package core provides core metadata and in-cluster API
22
/*
3-
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
3+
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
44
*/
55
package core
66

77
import (
8+
"math"
89
"sync"
910
"time"
1011

@@ -91,14 +92,14 @@ type (
9192
DstBck cmn.Bck `json:"dst-bck"`
9293
ID string `json:"id"`
9394
Kind string `json:"kind"`
94-
CtlMsg string `json:"ctlmsg,omitempty"` // initiating control msg (added v3.26)
95+
CtlMsg string `json:"ctlmsg,omitempty"` // initiating control msg (a.k.a. "run options"; added v3.26)
9596

9697
// extended error info
9798
AbortErr string `json:"abort-err"`
9899
Err string `json:"err"`
99100

100-
// rebalance-only
101-
RebID int64 `json:"glob.id,string"`
101+
// packed field: number of workers, et al.
102+
Packed int64 `json:"glob.id,string"`
102103

103104
// common runtime: stats counters (above) and state
104105
Stats Stats `json:"stats"`
@@ -113,11 +114,33 @@ type (
113114
)
114115

115116
//////////
116-
// Snap //
117+
// Snap (see related MultiSnap in xact/api)
117118
//////////
118119

119-
func (snp *Snap) IsAborted() bool { return snp.AbortedX }
120-
func (snp *Snap) IsIdle() bool { return snp.IdleX }
121-
func (snp *Snap) Started() bool { return !snp.StartTime.IsZero() }
122-
func (snp *Snap) Running() bool { return snp.Started() && !snp.IsAborted() && snp.EndTime.IsZero() }
123-
func (snp *Snap) Finished() bool { return snp.Started() && !snp.EndTime.IsZero() }
120+
func (xsnap *Snap) IsAborted() bool { return xsnap.AbortedX }
121+
func (xsnap *Snap) IsIdle() bool { return xsnap.IdleX }
122+
func (xsnap *Snap) Started() bool { return !xsnap.StartTime.IsZero() }
123+
124+
func (xsnap *Snap) Running() bool {
125+
return xsnap.Started() && !xsnap.IsAborted() && xsnap.EndTime.IsZero()
126+
}
127+
128+
func (xsnap *Snap) Finished() bool { return xsnap.Started() && !xsnap.EndTime.IsZero() }
129+
130+
const (
131+
gorBits = 10
132+
chShift = 2 * gorBits
133+
gorMask = (1 << gorBits) - 1
134+
)
135+
136+
func (xsnap *Snap) Pack(njoggers, nworkers int, chanFull int64) {
137+
chfull := int(min(chanFull, math.MaxInt))
138+
xsnap.Packed = int64((njoggers & gorMask) | (nworkers&gorMask)<<gorBits | chfull<<chShift)
139+
}
140+
141+
func (xsnap *Snap) Unpack() (njoggers, nworkers, chanFull int) {
142+
njoggers = int(xsnap.Packed & gorMask)
143+
nworkers = int(xsnap.Packed>>gorBits) & gorMask
144+
chanFull = int(xsnap.Packed >> chShift)
145+
return njoggers, nworkers, chanFull
146+
}

python/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ We structure this changelog in accordance with [Keep a Changelog](https://keepac
1515
- Adjust error handling logic to accommodate updated object naming (see [here](https://github.com/NVIDIA/aistore/blob/main/docs/unicode.md)).
1616
- URL-encode object names when constructing the object paths for the request URLs (see [here](https://github.com/NVIDIA/aistore/blob/main/docs/unicode.md)).
1717
- Add `direct put` support in the websocket endpoint of ETL fastapi web server.
18+
- Remove `rebalance_id` from job JobSnapshot (not used; deprecated)
1819

1920
### Removed
2021

python/aistore/sdk/types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,6 @@ class JobSnapshot(BaseModel):
443443
bucket: Optional[BucketModel] = Field(default=None, alias="bck")
444444
source_bck: Optional[BucketModel] = Field(default=None, alias="src-bck")
445445
destination_bck: Optional[BucketModel] = Field(default=None, alias="dst-bck")
446-
rebalance_id: str = Field(default="", alias="glob.id")
447446
stats: Optional[JobStats] = Field(default=None)
448447
aborted: bool = Field(default=False)
449448
is_idle: bool = Field(default=False)

volume/vinit.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ func Init(t core.Target, config *cmn.Config, ctx IniCtx) bool /*created*/ {
9696
// MPI => VMD
9797
func NewFromMPI(tid string) (vmd *VMD, err error) {
9898
var (
99-
curVersion uint64
100-
available, disabled = fs.Get()
99+
curVersion uint64
100+
avail, disabled = fs.Get()
101101
)
102102
vmd, err = loadVMD(tid, nil)
103103
if err != nil {
@@ -106,10 +106,10 @@ func NewFromMPI(tid string) (vmd *VMD, err error) {
106106
if vmd != nil {
107107
curVersion = vmd.Version
108108
}
109-
vmd = newVMD(len(available))
109+
vmd = newVMD(len(avail))
110110
vmd.DaemonID = tid
111111
vmd.Version = curVersion + 1 // Bump the version.
112-
for _, mi := range available {
112+
for _, mi := range avail {
113113
vmd.addMountpath(mi, true /*enabled*/)
114114
}
115115
for _, mi := range disabled {

xact/xs/archive.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ func (p *archFactory) Start() (err error) {
124124
)
125125
r.pending.m = make(map[string]*archwi, burst)
126126

127-
avail := fs.GetAvail()
128-
r.joggers.m = make(map[string]*jogger, len(avail))
127+
r.joggers.m = make(map[string]*jogger, fs.NumAvail())
129128
r.smap = core.T.Sowner().Get()
130129
p.xctn = r
131130

xact/xs/lrit.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/NVIDIA/aistore/api/apc"
1212
"github.com/NVIDIA/aistore/cmn"
13-
"github.com/NVIDIA/aistore/cmn/atomic"
1413
"github.com/NVIDIA/aistore/cmn/cos"
1514
"github.com/NVIDIA/aistore/cmn/nlog"
1615
"github.com/NVIDIA/aistore/core"
@@ -77,8 +76,7 @@ type (
7776
workers []*lrworker
7877
wg sync.WaitGroup
7978
}
80-
numvis atomic.Int64 // counter: num visited objects
81-
lrp int // enum { lrpList, ... }
79+
lrp int // enum { lrpList, ... }
8280
}
8381
)
8482

@@ -92,7 +90,7 @@ type (
9290
//////////
9391

9492
func (r *lrit) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers, confBurst int) error {
95-
l := len(fs.GetAvail())
93+
l := fs.NumAvail()
9694
if l == 0 {
9795
xctn.Abort(cmn.ErrNoMountpaths)
9896
return cmn.ErrNoMountpaths
@@ -350,11 +348,12 @@ func (r *lrit) do(lom *core.LOM, wi lrwi, smap *meta.Smap) (bool /*this lom done
350348

351349
if r.nwp.workers == nil {
352350
wi.do(lom, r, r.buf)
353-
r.numvis.Inc()
354351
return true, nil
355352
}
356353

357-
r.nwp.workCh <- lrpair{lom, wi} // lom eventually freed below // TODO -- FIXME: consider core.LIF
354+
// lom eventually freed below
355+
// TODO: consider core.LIF with subsequent lif.LOM() conversion
356+
r.nwp.workCh <- lrpair{lom, wi}
358357
return false, nil
359358
}
360359

@@ -374,7 +373,6 @@ outer:
374373
}
375374
lrpair.wi.do(lrpair.lom, worker.lrit, buf)
376375
core.FreeLOM(lrpair.lom)
377-
worker.lrit.numvis.Inc()
378376
case <-stopCh:
379377
break outer
380378
}

xact/xs/rebres.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ func (xreb *Rebalance) RebID() int64 {
127127
func (xreb *Rebalance) Snap() (snap *core.Snap) {
128128
snap = &core.Snap{}
129129
xreb.ToSnap(snap)
130-
snap.RebID = xreb.RebID()
131130

132131
snap.IdleX = xreb.IsIdle()
133132

xact/xs/tcb.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ import (
3333
"github.com/NVIDIA/aistore/xact/xreg"
3434
)
3535

36+
// NOTE the limitation:
37+
// - sentinels and workers require DM
38+
// - DM is not available when explicitly disabled (below) and for a single-node cluster
39+
3640
type (
3741
tcbFactory struct {
3842
xreg.RenewBase
@@ -139,12 +143,12 @@ func (p *tcbFactory) Start() error {
139143

140144
if msg.NumWorkers > 0 {
141145
// tune-up the specified number of workers
142-
avail := fs.GetAvail()
143-
numWorkers, err := throttleNwp(r.Name(), max(msg.NumWorkers, len(avail)))
146+
l := fs.NumAvail()
147+
numWorkers, err := throttleNwp(r.Name(), max(msg.NumWorkers, l))
144148
if err != nil {
145149
return err
146150
}
147-
if numWorkers >= len(avail) {
151+
if numWorkers >= l {
148152
// delegate intra-cluster copying/transforming to additional workers;
149153
// run them in parallel with traversing joggers;
150154
r._iniNwp(numWorkers)
@@ -388,7 +392,7 @@ func (r *XactTCB) recv(hdr *transport.ObjHdr, objReader io.Reader, err error) er
388392
return err
389393
}
390394

391-
// control; // TODO -- FIXME: must become a shared code w/ tco
395+
// control
392396
if hdr.Opcode != 0 {
393397
switch hdr.Opcode {
394398
case opDone:
@@ -487,6 +491,8 @@ func (r *XactTCB) Snap() (snap *core.Snap) {
487491
snap = &core.Snap{}
488492
r.ToSnap(snap)
489493

494+
snap.Pack(fs.NumAvail(), len(r.numwp.workers), r.chanFull.Load())
495+
490496
snap.IdleX = r.IsIdle()
491497
f, t := r.FromTo()
492498
snap.SrcBck, snap.DstBck = f.Clone(), t.Clone()

xact/xs/tcobjs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type (
5353
}
5454
streamingX
5555
chanFull atomic.Int64
56+
nworkers atomic.Int64 // total across all pending
5657
owt cmn.OWT
5758
}
5859
tcowi struct {
@@ -188,6 +189,8 @@ func (r *XactTCO) Snap() (snap *core.Snap) {
188189
snap = &core.Snap{}
189190
r.ToSnap(snap)
190191

192+
snap.Pack(0, int(r.nworkers.Load()), r.chanFull.Load())
193+
191194
snap.IdleX = r.IsIdle()
192195
f, t := r.FromTo()
193196
snap.SrcBck, snap.DstBck = f.Clone(), t.Clone()
@@ -250,6 +253,8 @@ func (r *XactTCO) doMsg(msg *cmn.TCOMsg) (stop bool) {
250253
r.AddErr(err)
251254
return !msg.ContinueOnError // stop?
252255
}
256+
nworkers := int64(len(lrit.nwp.workers))
257+
r.nworkers.Add(nworkers)
253258

254259
// run
255260
var wg *sync.WaitGroup
@@ -274,6 +279,7 @@ func (r *XactTCO) doMsg(msg *cmn.TCOMsg) (stop bool) {
274279
err := lrit.run(wi, smap, true /*prealloc buf*/)
275280

276281
lrit.wait()
282+
r.nworkers.Sub(nworkers)
277283
if wg != nil {
278284
wg.Wait()
279285
}

0 commit comments

Comments
 (0)
0