mirror of
https://github.com/rclone/rclone.git
synced 2024-11-24 23:04:16 +08:00
rc/job: use mutex for adding listeners thread safety
Fix in extreme cases, when the job is executing finish(), the listener added by calling OnFinish() will never be executed. This change should not cause compatibility issues, as consumers should not make assumptions about whether listeners will be run in a new goroutine
This commit is contained in:
parent
b9207e5727
commit
42bfc66494
|
@ -74,12 +74,6 @@ func (job *Job) finish(out rc.Params, err error) {
|
|||
running.kickExpire() // make sure this job gets expired
|
||||
}
|
||||
|
||||
func (job *Job) addListener(fn *func()) {
|
||||
job.mu.Lock()
|
||||
defer job.mu.Unlock()
|
||||
job.listeners = append(job.listeners, fn)
|
||||
}
|
||||
|
||||
func (job *Job) removeListener(fn *func()) {
|
||||
job.mu.Lock()
|
||||
defer job.mu.Unlock()
|
||||
|
@ -94,10 +88,12 @@ func (job *Job) removeListener(fn *func()) {
|
|||
// OnFinish adds listener to job that will be triggered when job is finished.
|
||||
// It returns a function to cancel listening.
|
||||
func (job *Job) OnFinish(fn func()) func() {
|
||||
job.mu.Lock()
|
||||
defer job.mu.Unlock()
|
||||
if job.Finished {
|
||||
fn()
|
||||
go fn()
|
||||
} else {
|
||||
job.addListener(&fn)
|
||||
job.listeners = append(job.listeners, &fn)
|
||||
}
|
||||
return func() { job.removeListener(&fn) }
|
||||
}
|
||||
|
|
|
@ -554,3 +554,52 @@ func TestOnFinishAlreadyFinished(t *testing.T) {
|
|||
t.Fatal("Timeout waiting for OnFinish to fire")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnFinishDataRace(t *testing.T) {
|
||||
jobID.Store(0)
|
||||
job, _, err := NewJob(context.Background(), ctxFn, rc.Params{"_async": true})
|
||||
assert.NoError(t, err)
|
||||
var expect, got uint64
|
||||
finished := make(chan struct{})
|
||||
stop, stopped := make(chan struct{}), make(chan struct{})
|
||||
go func() {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
break Loop
|
||||
default:
|
||||
_, err := OnFinish(job.ID, func() {
|
||||
finished <- struct{}{}
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
expect += 1
|
||||
}
|
||||
}
|
||||
close(stopped)
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
job.Stop()
|
||||
|
||||
// Wait for the first OnFinish to fire
|
||||
<-finished
|
||||
got += 1
|
||||
|
||||
// Stop the OnFinish producer
|
||||
close(stop)
|
||||
<-stopped
|
||||
|
||||
timeout := time.After(5 * time.Second)
|
||||
for {
|
||||
if got == expect {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-finished:
|
||||
got += 1
|
||||
case <-timeout:
|
||||
t.Fatal("Timeout waiting for all OnFinish calls to fire")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user