Nick Craig-Wood 2023-06-11 15:23:39 +01:00
parent 45255bccb3
commit 1f5a29209e
2 changed files with 45 additions and 5 deletions

View File

@ -83,6 +83,17 @@ func (job *Job) removeListener(fn *func()) {
}
}
// OnFinish adds listener to job that will be triggered when job is finished.
// It returns a function to cancel listening.
func (job *Job) OnFinish(fn func()) func() {
if job.Finished {
fn()
} else {
job.addListener(&fn)
}
return func() { job.removeListener(&fn) }
}
// run the job until completion writing the return status
func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) {
defer func() {
@ -237,6 +248,11 @@ func getFilter(ctx context.Context, in rc.Params) (context.Context, error) {
return ctx, nil
}
type jobKeyType struct{}
// Key for adding jobs to ctx
var jobKey = jobKeyType{}
// NewJob creates a Job and executes it, possibly in the background if _async is set
func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) {
id := atomic.AddInt64(&jobID, 1)
@ -274,9 +290,14 @@ func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Jo
StartTime: time.Now(),
Stop: stop,
}
jobs.mu.Lock()
jobs.jobs[job.ID] = job
jobs.mu.Unlock()
// Add the job to the context
ctx = context.WithValue(ctx, jobKey, job)
if isAsync {
go job.run(ctx, fn, in)
out = make(rc.Params)
@ -303,12 +324,22 @@ func OnFinish(jobID int64, fn func()) (func(), error) {
if job == nil {
return func() {}, errors.New("job not found")
}
if job.Finished {
fn()
} else {
job.addListener(&fn)
return job.OnFinish(fn), nil
}
// GetJob gets the Job from the context if possible
func GetJob(ctx context.Context) (job *Job, ok bool) {
job, ok = ctx.Value(jobKey).(*Job)
return job, ok
}
// GetJobID gets the Job from the context if possible
func GetJobID(ctx context.Context) (jobID int64, ok bool) {
job, ok := GetJob(ctx)
if !ok {
return -1, ok
}
return func() { job.removeListener(&fn) }, nil
return job.ID, true
}
func init() {

View File

@ -44,13 +44,22 @@ func TestJobsExpire(t *testing.T) {
jobs := newJobs()
jobs.opt.JobExpireInterval = time.Millisecond
assert.Equal(t, false, jobs.expireRunning)
var gotJobID int64
var gotJob *Job
job, out, err := jobs.NewJob(ctx, func(ctx context.Context, in rc.Params) (rc.Params, error) {
defer close(wait)
var ok bool
gotJobID, ok = GetJobID(ctx)
assert.True(t, ok)
gotJob, ok = GetJob(ctx)
assert.True(t, ok)
return in, nil
}, rc.Params{"_async": true})
require.NoError(t, err)
assert.Equal(t, 1, len(out))
<-wait
assert.Equal(t, job.ID, gotJobID, "check can get JobID from ctx")
assert.Equal(t, job, gotJob, "check can get Job from ctx")
assert.Equal(t, 1, len(jobs.jobs))
jobs.Expire()
assert.Equal(t, 1, len(jobs.jobs))