8000 Improved chunk tracking during clone by timsehn · Pull Request #9299 · dolthub/dolt · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Improved chunk tracking during clone #9299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 3, 2025
Merged
5 changes: 5 additions & 0 deletions go/libraries/doltcore/env/actions/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func clonePrint(eventCh <-chan pull.TableFileEvent) {
case pull.DownloadFailed:
// Ignore for now and output errors on the main thread
for _, tf := range tblFEvt.TableFiles {
chunksDownloading -= int64(tf.NumChunks())
delete(currStats, tf.FileID())
}
}
Expand All @@ -146,6 +147,10 @@ func clonePrint(eventCh <-chan pull.TableFileEvent) {
}
p.Display()
}

// Final status: ensure we show all chunks as complete when clone finishes
p.Printf("%s of %s chunks complete.\n",
strhelp.CommaIfy(chunksC), strhelp.CommaIfy(chunksC))
p.Display()
}

Expand Down
20 changes: 15 additions & 5 deletions go/libraries/utils/iohelp/read_with_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ type ReadStats struct {
}

type ReaderWithStats struct {
read uint64
size int64
rd io.Reader
start time.Time
closeCh chan struct{}
read uint64
size int64
rd io.Reader
start time.Time
closeCh chan struct{}
runningCh chan struct{}
}

func NewReaderWithStats(rd io.Reader, size int64) *ReaderWithStats {
Expand All @@ -45,6 +46,11 @@ func NewReaderWithStats(rd io.Reader, size int64) *ReaderWithStats {
}

func (rws *ReaderWithStats) Start(updateFunc func(ReadStats)) {
if rws.runningCh != nil {
panic("cannot start ReaderWithStats more than once.")
}
rws.runningCh = make(chan struct{})
defer close(rws.runningCh)
rws.start = time.Now()
go func() {
timer := time.NewTimer(updateFrequency)
Expand All @@ -67,6 +73,10 @@ func (rws *ReaderWithStats) Start(updateFunc func(ReadStats)) {
}

func (rws *ReaderWithStats) Close() error {
// Ensure that we never call |updateFunc| after Close() returns.
if rws.runningCh != nil {
<-rws.runningCh
}
close(rws.closeCh)

if closer, ok := rws.rd.(io.Closer); ok {
Expand Down
Loading
0