Fixed race condition when deleting documents by repoId in ElasticSearch (#32185) (#32188)
Some checks are pending
release-nightly / disk-clean (push) Waiting to run
release-nightly / nightly-binary (push) Waiting to run
release-nightly / nightly-docker-rootful (push) Waiting to run
release-nightly / nightly-docker-rootless (push) Waiting to run

Backport #32185 by @bsofiato

Resolves #32184

Signed-off-by: Bruno Sofiato <bruno.sofiato@gmail.com>
Co-authored-by: Bruno Sofiato <bruno.sofiato@gmail.com>
This commit is contained in:
Giteabot 2024-10-04 00:33:26 +08:00 committed by GitHub
parent d86433cce2
commit 361221c531
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,6 +20,7 @@ import (
indexer_internal "code.gitea.io/gitea/modules/indexer/internal" indexer_internal "code.gitea.io/gitea/modules/indexer/internal"
inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
"code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/typesniffer" "code.gitea.io/gitea/modules/typesniffer"
@ -197,8 +198,33 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st
return nil return nil
} }
// Delete deletes indexes by ids // Delete entries by repoId
func (b *Indexer) Delete(ctx context.Context, repoID int64) error { func (b *Indexer) Delete(ctx context.Context, repoID int64) error {
if err := b.doDelete(ctx, repoID); err != nil {
// Maybe there is a conflict during the delete operation, so we should retry after a refresh
log.Warn("Deletion of entries of repo %v within index %v was erroneus. Trying to refresh index before trying again", repoID, b.inner.VersionedIndexName(), err)
if err := b.refreshIndex(ctx); err != nil {
return err
}
if err := b.doDelete(ctx, repoID); err != nil {
log.Error("Could not delete entries of repo %v within index %v", repoID, b.inner.VersionedIndexName())
return err
}
}
return nil
}
func (b *Indexer) refreshIndex(ctx context.Context) error {
if _, err := b.inner.Client.Refresh(b.inner.VersionedIndexName()).Do(ctx); err != nil {
log.Error("Error while trying to refresh index %v", b.inner.VersionedIndexName(), err)
return err
}
return nil
}
// Delete entries by repoId
func (b *Indexer) doDelete(ctx context.Context, repoID int64) error {
_, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()). _, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()).
Query(elastic.NewTermsQuery("repo_id", repoID)). Query(elastic.NewTermsQuery("repo_id", repoID)).
Do(ctx) Do(ctx)