chore(actions): support cron schedule task ()

Replace  

1. only support the default branch in the repository setting.
2. autoload schedule data from the schedule table after starting the
service.
3. support specific syntax like `@yearly`, `@monthly`, `@weekly`,
`@daily`, `@hourly`

## How to use

See the [GitHub Actions
document](https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule)
for getting more detailed information.

```yaml
on:
  schedule:
    - cron: '30 5 * * 1,3'
    - cron: '30 5 * * 2,4'

jobs:
  test_schedule:
    runs-on: ubuntu-latest
    steps:
      - name: Not on Monday or Wednesday
        if: github.event.schedule != '30 5 * * 1,3'
        run: echo "This step will be skipped on Monday and Wednesday"
      - name: Every time
        run: echo "This step will always run"
```

Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com>

---------


Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
This commit is contained in:
Lunny Xiao 2023-08-24 11:06:51 +08:00 committed by GitHub
parent b62c8e7765
commit 0d55f64e6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 693 additions and 9 deletions

2
go.mod

@ -90,6 +90,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/quasoft/websspi v1.1.2
github.com/redis/go-redis/v9 v9.0.5
github.com/robfig/cron/v3 v3.0.1
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/sassoftware/go-rpmutils v0.2.0
github.com/sergi/go-diff v1.3.1
@ -254,7 +255,6 @@ require (
github.com/rhysd/actionlint v1.6.25 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect

120
models/actions/schedule.go Normal file

@ -0,0 +1,120 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/timeutil"
webhook_module "code.gitea.io/gitea/modules/webhook"
"github.com/robfig/cron/v3"
)
// ActionSchedule represents a schedule of a workflow file
type ActionSchedule struct {
ID int64
Title string
Specs []string
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
WorkflowID string
TriggerUserID int64
TriggerUser *user_model.User `xorm:"-"`
Ref string
CommitSHA string
Event webhook_module.HookEventType
EventPayload string `xorm:"LONGTEXT"`
Content []byte
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
func init() {
db.RegisterModel(new(ActionSchedule))
}
// GetSchedulesMapByIDs returns the schedules by given id slice.
func GetSchedulesMapByIDs(ids []int64) (map[int64]*ActionSchedule, error) {
schedules := make(map[int64]*ActionSchedule, len(ids))
return schedules, db.GetEngine(db.DefaultContext).In("id", ids).Find(&schedules)
}
// GetReposMapByIDs returns the repos by given id slice.
func GetReposMapByIDs(ids []int64) (map[int64]*repo_model.Repository, error) {
repos := make(map[int64]*repo_model.Repository, len(ids))
return repos, db.GetEngine(db.DefaultContext).In("id", ids).Find(&repos)
}
var cronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
// CreateScheduleTask creates new schedule task.
func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error {
// Return early if there are no rows to insert
if len(rows) == 0 {
return nil
}
// Begin transaction
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()
// Loop through each schedule row
for _, row := range rows {
// Create new schedule row
if err = db.Insert(ctx, row); err != nil {
return err
}
// Loop through each schedule spec and create a new spec row
now := time.Now()
for _, spec := range row.Specs {
// Parse the spec and check for errors
schedule, err := cronParser.Parse(spec)
if err != nil {
continue // skip to the next spec if there's an error
}
// Insert the new schedule spec row
if err = db.Insert(ctx, &ActionScheduleSpec{
RepoID: row.RepoID,
ScheduleID: row.ID,
Spec: spec,
Next: timeutil.TimeStamp(schedule.Next(now).Unix()),
}); err != nil {
return err
}
}
}
// Commit transaction
return committer.Commit()
}
func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()
if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil {
return err
}
if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil {
return err
}
return committer.Commit()
}

@ -0,0 +1,94 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/container"
"xorm.io/builder"
)
type ScheduleList []*ActionSchedule
// GetUserIDs returns a slice of user's id
func (schedules ScheduleList) GetUserIDs() []int64 {
ids := make(container.Set[int64], len(schedules))
for _, schedule := range schedules {
ids.Add(schedule.TriggerUserID)
}
return ids.Values()
}
func (schedules ScheduleList) GetRepoIDs() []int64 {
ids := make(container.Set[int64], len(schedules))
for _, schedule := range schedules {
ids.Add(schedule.RepoID)
}
return ids.Values()
}
func (schedules ScheduleList) LoadTriggerUser(ctx context.Context) error {
userIDs := schedules.GetUserIDs()
users := make(map[int64]*user_model.User, len(userIDs))
if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil {
return err
}
for _, schedule := range schedules {
if schedule.TriggerUserID == user_model.ActionsUserID {
schedule.TriggerUser = user_model.NewActionsUser()
} else {
schedule.TriggerUser = users[schedule.TriggerUserID]
}
}
return nil
}
func (schedules ScheduleList) LoadRepos() error {
repoIDs := schedules.GetRepoIDs()
repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs)
if err != nil {
return err
}
for _, schedule := range schedules {
schedule.Repo = repos[schedule.RepoID]
}
return nil
}
type FindScheduleOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
}
func (opts FindScheduleOptions) toConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.OwnerID > 0 {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
return cond
}
func FindSchedules(ctx context.Context, opts FindScheduleOptions) (ScheduleList, int64, error) {
e := db.GetEngine(ctx).Where(opts.toConds())
if !opts.ListAll && opts.PageSize > 0 && opts.Page >= 1 {
e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize)
}
var schedules ScheduleList
total, err := e.Desc("id").FindAndCount(&schedules)
return schedules, total, err
}
func CountSchedules(ctx context.Context, opts FindScheduleOptions) (int64, error) {
return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionSchedule))
}

@ -0,0 +1,50 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/timeutil"
"github.com/robfig/cron/v3"
)
// ActionScheduleSpec represents a schedule spec of a workflow file
type ActionScheduleSpec struct {
ID int64
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
ScheduleID int64 `xorm:"index"`
Schedule *ActionSchedule `xorm:"-"`
// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
Next timeutil.TimeStamp `xorm:"index"`
// Prev is the last time this job was run, or the zero time if never.
Prev timeutil.TimeStamp
Spec string
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
func (s *ActionScheduleSpec) Parse() (cron.Schedule, error) {
return cronParser.Parse(s.Spec)
}
func init() {
db.RegisterModel(new(ActionScheduleSpec))
}
func UpdateScheduleSpec(ctx context.Context, spec *ActionScheduleSpec, cols ...string) error {
sess := db.GetEngine(ctx).ID(spec.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
_, err := sess.Update(spec)
return err
}

@ -0,0 +1,106 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/container"
"xorm.io/builder"
)
type SpecList []*ActionScheduleSpec
func (specs SpecList) GetScheduleIDs() []int64 {
ids := make(container.Set[int64], len(specs))
for _, spec := range specs {
ids.Add(spec.ScheduleID)
}
return ids.Values()
}
func (specs SpecList) LoadSchedules() error {
scheduleIDs := specs.GetScheduleIDs()
schedules, err := GetSchedulesMapByIDs(scheduleIDs)
if err != nil {
return err
}
for _, spec := range specs {
spec.Schedule = schedules[spec.ScheduleID]
}
repoIDs := specs.GetRepoIDs()
repos, err := GetReposMapByIDs(repoIDs)
if err != nil {
return err
}
for _, spec := range specs {
spec.Repo = repos[spec.RepoID]
}
return nil
}
func (specs SpecList) GetRepoIDs() []int64 {
ids := make(container.Set[int64], len(specs))
for _, spec := range specs {
ids.Add(spec.RepoID)
}
return ids.Values()
}
func (specs SpecList) LoadRepos() error {
repoIDs := specs.GetRepoIDs()
repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs)
if err != nil {
return err
}
for _, spec := range specs {
spec.Repo = repos[spec.RepoID]
}
return nil
}
type FindSpecOptions struct {
db.ListOptions
RepoID int64
Next int64
}
func (opts FindSpecOptions) toConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.Next > 0 {
cond = cond.And(builder.Lte{"next": opts.Next})
}
return cond
}
func FindSpecs(ctx context.Context, opts FindSpecOptions) (SpecList, int64, error) {
e := db.GetEngine(ctx).Where(opts.toConds())
if opts.PageSize > 0 && opts.Page >= 1 {
e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize)
}
var specs SpecList
total, err := e.Desc("id").FindAndCount(&specs)
if err != nil {
return nil, 0, err
}
if err := specs.LoadSchedules(); err != nil {
return nil, 0, err
}
return specs, total, nil
}
func CountSpecs(ctx context.Context, opts FindSpecOptions) (int64, error) {
return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionScheduleSpec))
}

@ -526,6 +526,8 @@ var migrations = []Migration{
NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable),
// v272 -> v273
NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable),
// v273 -> v274
NewMigration("Add Action Schedule Table", v1_21.AddActionScheduleTable),
}
// GetCurrentDBVersion returns the current db version

@ -0,0 +1,45 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package v1_21 //nolint
import (
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/xorm"
)
func AddActionScheduleTable(x *xorm.Engine) error {
type ActionSchedule struct {
ID int64
Title string
Specs []string
RepoID int64 `xorm:"index"`
OwnerID int64 `xorm:"index"`
WorkflowID string
TriggerUserID int64
Ref string
CommitSHA string
Event string
EventPayload string `xorm:"LONGTEXT"`
Content []byte
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
type ActionScheduleSpec struct {
ID int64
RepoID int64 `xorm:"index"`
ScheduleID int64 `xorm:"index"`
Spec string
Next timeutil.TimeStamp `xorm:"index"`
Prev timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
return x.Sync(
new(ActionSchedule),
new(ActionScheduleSpec),
)
}

@ -170,6 +170,8 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error {
&actions_model.ActionRunJob{RepoID: repoID},
&actions_model.ActionRun{RepoID: repoID},
&actions_model.ActionRunner{RepoID: repoID},
&actions_model.ActionScheduleSpec{RepoID: repoID},
&actions_model.ActionSchedule{RepoID: repoID},
&actions_model.ActionArtifact{RepoID: repoID},
); err != nil {
return fmt.Errorf("deleteBeans: %w", err)

@ -95,18 +95,25 @@ func GetEventsFromContent(content []byte) ([]*jobparser.Event, error) {
return events, nil
}
func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader) ([]*DetectedWorkflow, error) {
func DetectWorkflows(
gitRepo *git.Repository,
commit *git.Commit,
triggedEvent webhook_module.HookEventType,
payload api.Payloader,
) ([]*DetectedWorkflow, []*DetectedWorkflow, error) {
entries, err := ListWorkflows(commit)
if err != nil {
return nil, err
return nil, nil, err
}
workflows := make([]*DetectedWorkflow, 0, len(entries))
schedules := make([]*DetectedWorkflow, 0, len(entries))
for _, entry := range entries {
content, err := GetContentFromEntry(entry)
if err != nil {
return nil, err
return nil, nil, err
}
events, err := GetEventsFromContent(content)
if err != nil {
log.Warn("ignore invalid workflow %q: %v", entry.Name(), err)
@ -114,6 +121,14 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w
}
for _, evt := range events {
log.Trace("detect workflow %q for event %#v matching %q", entry.Name(), evt, triggedEvent)
if evt.IsSchedule() {
dwf := &DetectedWorkflow{
EntryName: entry.Name(),
TriggerEvent: evt.Name,
Content: content,
}
schedules = append(schedules, dwf)
}
if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
dwf := &DetectedWorkflow{
EntryName: entry.Name(),
@ -125,7 +140,7 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w
}
}
return workflows, nil
return workflows, schedules, nil
}
func detectMatched(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader, evt *jobparser.Event) bool {

@ -2756,6 +2756,7 @@ dashboard.gc_lfs = Garbage collect LFS meta objects
dashboard.stop_zombie_tasks = Stop zombie tasks
dashboard.stop_endless_tasks = Stop endless tasks
dashboard.cancel_abandoned_jobs = Cancel abandoned jobs
dashboard.start_schedule_tasks = Start schedule tasks
dashboard.sync_branch.started = Branches Sync started
dashboard.rebuild_issue_indexer = Rebuild issue indexer

@ -4,6 +4,7 @@
package actions
import (
"bytes"
"context"
"fmt"
"strings"
@ -24,6 +25,7 @@ import (
"code.gitea.io/gitea/services/convert"
"github.com/nektos/act/pkg/jobparser"
"github.com/nektos/act/pkg/model"
)
var methodCtxKey struct{}
@ -143,15 +145,15 @@ func notify(ctx context.Context, input *notifyInput) error {
}
var detectedWorkflows []*actions_module.DetectedWorkflow
workflows, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
if len(workflows) == 0 {
log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
} else {
actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
for _, wf := range workflows {
if actionsConfig.IsWorkflowDisabled(wf.EntryName) {
log.Trace("repo %s has disable workflows %s", input.Repo.RepoPath(), wf.EntryName)
@ -171,7 +173,7 @@ func notify(ctx context.Context, input *notifyInput) error {
if err != nil {
return fmt.Errorf("gitRepo.GetCommit: %w", err)
}
baseWorkflows, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
@ -186,7 +188,22 @@ func notify(ctx context.Context, input *notifyInput) error {
}
}
if err := handleSchedules(ctx, schedules, commit, input); err != nil {
return err
}
return handleWorkflows(ctx, detectedWorkflows, commit, input, ref)
}
func handleWorkflows(
ctx context.Context,
detectedWorkflows []*actions_module.DetectedWorkflow,
commit *git.Commit,
input *notifyInput,
ref string,
) error {
if len(detectedWorkflows) == 0 {
log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
return nil
}
@ -350,3 +367,86 @@ func ifNeedApproval(ctx context.Context, run *actions_model.ActionRun, repo *rep
log.Trace("need approval because it's the first time user %d triggered actions", user.ID)
return true, nil
}
func handleSchedules(
ctx context.Context,
detectedWorkflows []*actions_module.DetectedWorkflow,
commit *git.Commit,
input *notifyInput,
) error {
if len(detectedWorkflows) == 0 {
log.Trace("repo %s with commit %s couldn't find schedules", input.Repo.RepoPath(), commit.ID)
return nil
}
branch, err := commit.GetBranchName()
if err != nil {
return err
}
if branch != input.Repo.DefaultBranch {
log.Trace("commit branch is not default branch in repo")
return nil
}
rows, _, err := actions_model.FindSchedules(ctx, actions_model.FindScheduleOptions{RepoID: input.Repo.ID})
if err != nil {
log.Error("FindCrons: %v", err)
return err
}
if len(rows) > 0 {
if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil {
log.Error("DeleteCronTaskByRepo: %v", err)
}
}
p, err := json.Marshal(input.Payload)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
crons := make([]*actions_model.ActionSchedule, 0, len(detectedWorkflows))
for _, dwf := range detectedWorkflows {
// Check cron job condition. Only working in default branch
workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content))
if err != nil {
log.Error("ReadWorkflow: %v", err)
continue
}
schedules := workflow.OnSchedule()
if len(schedules) == 0 {
log.Warn("no schedule event")
continue
}
run := &actions_model.ActionSchedule{
Title: strings.SplitN(commit.CommitMessage, "\n", 2)[0],
RepoID: input.Repo.ID,
OwnerID: input.Repo.OwnerID,
WorkflowID: dwf.EntryName,
TriggerUserID: input.Doer.ID,
Ref: input.Ref,
CommitSHA: commit.ID.String(),
Event: input.Event,
EventPayload: string(p),
Specs: schedules,
Content: dwf.Content,
}
// cancel running jobs if the event is push
if run.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := actions_model.CancelRunningJobs(
ctx,
run.RepoID,
run.Ref,
run.WorkflowID,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
}
crons = append(crons, run)
}
return actions_model.CreateScheduleTask(ctx, crons)
}

@ -0,0 +1,135 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"fmt"
"time"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
webhook_module "code.gitea.io/gitea/modules/webhook"
"github.com/nektos/act/pkg/jobparser"
)
// StartScheduleTasks start the task
func StartScheduleTasks(ctx context.Context) error {
return startTasks(ctx)
}
// startTasks retrieves specifications in pages, creates a schedule task for each specification,
// and updates the specification's next run time and previous run time.
// The function returns an error if there's an issue with finding or updating the specifications.
func startTasks(ctx context.Context) error {
// Set the page size
pageSize := 50
// Retrieve specs in pages until all specs have been retrieved
now := time.Now()
for page := 1; ; page++ {
// Retrieve the specs for the current page
specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
ListOptions: db.ListOptions{
Page: page,
PageSize: pageSize,
},
Next: now.Unix(),
})
if err != nil {
return fmt.Errorf("find specs: %w", err)
}
// Loop through each spec and create a schedule task for it
for _, row := range specs {
// cancel running jobs if the event is push
if row.Schedule.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := actions_model.CancelRunningJobs(
ctx,
row.RepoID,
row.Schedule.Ref,
row.Schedule.WorkflowID,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
}
if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
log.Error("CreateScheduleTask: %v", err)
return err
}
// Parse the spec
schedule, err := row.Parse()
if err != nil {
log.Error("Parse: %v", err)
return err
}
// Update the spec's next run time and previous run time
row.Prev = row.Next
row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
log.Error("UpdateScheduleSpec: %v", err)
return err
}
}
// Stop if all specs have been retrieved
if len(specs) < pageSize {
break
}
}
return nil
}
// CreateScheduleTask creates a scheduled task from a cron action schedule.
// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
// Create a new action run based on the schedule
run := &actions_model.ActionRun{
Title: cron.Title,
RepoID: cron.RepoID,
OwnerID: cron.OwnerID,
WorkflowID: cron.WorkflowID,
TriggerUserID: cron.TriggerUserID,
Ref: cron.Ref,
CommitSHA: cron.CommitSHA,
Event: cron.Event,
EventPayload: cron.EventPayload,
Status: actions_model.StatusWaiting,
}
// Parse the workflow specification from the cron schedule
workflows, err := jobparser.Parse(cron.Content)
if err != nil {
return err
}
// Insert the action run and its associated jobs into the database
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
return err
}
// Retrieve the jobs for the newly created action run
jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
return err
}
// Create commit statuses for each job
for _, job := range jobs {
if err := createCommitStatus(ctx, job); err != nil {
return err
}
}
// Return nil if no errors occurred
return nil
}

@ -18,6 +18,7 @@ func initActionsTasks() {
registerStopZombieTasks()
registerStopEndlessTasks()
registerCancelAbandonedJobs()
registerScheduleTasks()
}
func registerStopZombieTasks() {
@ -49,3 +50,16 @@ func registerCancelAbandonedJobs() {
return actions_service.CancelAbandonedJobs(ctx)
})
}
// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks.
func registerScheduleTasks() {
// Register the task with a unique name, enabled status, and schedule for every minute.
RegisterTaskFatal("start_schedule_tasks", &BaseConfig{
Enabled: true,
RunAtStart: false,
Schedule: "@every 1m",
}, func(ctx context.Context, _ *user_model.User, cfg Config) error {
// Call the function to start schedule tasks and pass the context.
return actions_service.StartScheduleTasks(ctx)
})
}