PullService lock via pullID (#19520)

* lock pull on git&db actions ...

* add TODO notes

* rename prQueue 2 prPatchCheckerQueue

* fmt
This commit is contained in:
6543 2022-05-04 18:06:23 +02:00 committed by GitHub
parent e933f31426
commit f034ee6cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 21 additions and 2 deletions

View File

@ -317,6 +317,8 @@ func handle(data ...queue.Data) []queue.Data {
} }
func testPR(id int64) { func testPR(id int64) {
pullWorkingPool.CheckIn(fmt.Sprint(id))
defer pullWorkingPool.CheckOut(fmt.Sprint(id))
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id)) ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
defer finished() defer finished()

View File

@ -34,7 +34,6 @@ import (
// Merge merges pull request to base repository. // Merge merges pull request to base repository.
// Caller should check PR is ready to be merged (review and status checks) // Caller should check PR is ready to be merged (review and status checks)
// FIXME: add repoWorkingPull make sure two merges does not happen at same time.
func Merge(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, mergeStyle repo_model.MergeStyle, expectedHeadCommitID, message string) error { func Merge(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, mergeStyle repo_model.MergeStyle, expectedHeadCommitID, message string) error {
if err := pr.LoadHeadRepo(); err != nil { if err := pr.LoadHeadRepo(); err != nil {
log.Error("LoadHeadRepo: %v", err) log.Error("LoadHeadRepo: %v", err)
@ -44,6 +43,9 @@ func Merge(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repos
return fmt.Errorf("LoadBaseRepo: %v", err) return fmt.Errorf("LoadBaseRepo: %v", err)
} }
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests) prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests)
if err != nil { if err != nil {
log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err) log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err)
@ -726,6 +728,9 @@ func CheckPullBranchProtections(ctx context.Context, pr *models.PullRequest, ski
// MergedManually mark pr as merged manually // MergedManually mark pr as merged manually
func MergedManually(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error { func MergedManually(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
if err := db.WithTx(func(ctx context.Context) error { if err := db.WithTx(func(ctx context.Context) error {
prUnit, err := pr.BaseRepo.GetUnitCtx(ctx, unit.TypePullRequests) prUnit, err := pr.BaseRepo.GetUnitCtx(ctx, unit.TypePullRequests)
if err != nil { if err != nil {

View File

@ -25,9 +25,13 @@ import (
"code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
issue_service "code.gitea.io/gitea/services/issue" issue_service "code.gitea.io/gitea/services/issue"
) )
// TODO: use clustered lock (unique queue? or *abuse* cache)
var pullWorkingPool = sync.NewExclusivePool()
// NewPullRequest creates new pull request with labels for repository. // NewPullRequest creates new pull request with labels for repository.
func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *models.Issue, labelIDs []int64, uuids []string, pr *models.PullRequest, assigneeIDs []int64) error { func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *models.Issue, labelIDs []int64, uuids []string, pr *models.PullRequest, assigneeIDs []int64) error {
if err := TestPatch(pr); err != nil { if err := TestPatch(pr); err != nil {
@ -124,6 +128,9 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *mode
// ChangeTargetBranch changes the target branch of this pull request, as the given user. // ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *models.PullRequest, doer *user_model.User, targetBranch string) (err error) { func ChangeTargetBranch(ctx context.Context, pr *models.PullRequest, doer *user_model.User, targetBranch string) (err error) {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
// Current target branch is already the same // Current target branch is already the same
if pr.BaseBranch == targetBranch { if pr.BaseBranch == targetBranch {
return nil return nil

View File

@ -23,6 +23,9 @@ func Update(ctx context.Context, pull *models.PullRequest, doer *user_model.User
style repo_model.MergeStyle style repo_model.MergeStyle
) )
pullWorkingPool.CheckIn(fmt.Sprint(pull.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pull.ID))
if rebase { if rebase {
pr = pull pr = pull
style = repo_model.MergeStyleRebaseUpdate style = repo_model.MergeStyleRebaseUpdate

View File

@ -19,6 +19,7 @@ import (
) )
// repoWorkingPool represents a working pool to order the parallel changes to the same repository // repoWorkingPool represents a working pool to order the parallel changes to the same repository
// TODO: use clustered lock (unique queue? or *abuse* cache)
var repoWorkingPool = sync.NewExclusivePool() var repoWorkingPool = sync.NewExclusivePool()
// TransferOwnership transfers all corresponding setting from old user to new one. // TransferOwnership transfers all corresponding setting from old user to new one.

View File

@ -27,7 +27,8 @@ import (
var ( var (
reservedWikiNames = []string{"_pages", "_new", "_edit", "raw"} reservedWikiNames = []string{"_pages", "_new", "_edit", "raw"}
wikiWorkingPool = sync.NewExclusivePool() // TODO: use clustered lock (unique queue? or *abuse* cache)
wikiWorkingPool = sync.NewExclusivePool()
) )
func nameAllowed(name string) error { func nameAllowed(name string) error {