From 32393f72b1e31aa8cd2b63a0a55e05230164478e Mon Sep 17 00:00:00 2001 From: Sam Date: Tue, 13 Oct 2020 16:56:03 +1100 Subject: [PATCH] PERF: backoff background requests when overloaded (#10888) When the server gets overloaded and lots of requests start queuing server will attempt to shed load by returning 429 errors on background requests. The client can flag a request as background by setting the header: `Discourse-Background` to `true` Out-of-the-box we shed load when the queue time goes above 0.5 seconds. The only request we shed at the moment is the request to load up a new post when someone posts to a topic. We can extend this as we go with a more general pattern on the client. Previous to this change, rate limiting would "break" the post stream which would make suggested topics vanish and users would have to scroll the page to see more posts in the topic. Server needs this protection for cases where tons of clients are navigated to a topic and a new post is made. This can lead to a self inflicted denial of service if enough clients are viewing the topic. Due to the internal security design of Discourse it is hard for a large number of clients to share a channel where we would pass the full post body via the message bus. It also renames (and deprecates) triggerNewPostInStream to triggerNewPostsInStream This allows us to load a batch of new posts cleanly, so the controller can keep track of a backlog Co-authored-by: Joffrey JAFFEUX --- .prettierignore | 1 + .../discourse/app/controllers/topic.js | 57 +++++++++++- .../discourse/app/models/post-stream.js | 92 +++++++++++++------ .../tests/unit/models/post-stream-test.js | 8 +- config/discourse_defaults.conf | 5 + lib/middleware/anonymous_cache.rb | 17 ++++ .../middleware/anonymous_cache_spec.rb | 42 +++++++++ 7 files changed, 189 insertions(+), 33 deletions(-) diff --git a/.prettierignore b/.prettierignore index 0d02324d6fe..83294653c99 100644 --- a/.prettierignore +++ b/.prettierignore @@ -23,3 +23,4 @@ app/assets/javascripts/discourse/tests/fixtures app/assets/javascripts/discourse/tests/helpers/assertions.js node_modules/ dist/ +**/*.rb diff --git a/app/assets/javascripts/discourse/app/controllers/topic.js b/app/assets/javascripts/discourse/app/controllers/topic.js index 03f90ce7cf3..c7b91cdf238 100644 --- a/app/assets/javascripts/discourse/app/controllers/topic.js +++ b/app/assets/javascripts/discourse/app/controllers/topic.js @@ -2,7 +2,7 @@ import I18n from "I18n"; import { isPresent, isEmpty } from "@ember/utils"; import { or, and, not, alias } from "@ember/object/computed"; import EmberObject from "@ember/object"; -import { next, schedule } from "@ember/runloop"; +import { next, schedule, later } from "@ember/runloop"; import Controller, { inject as controller } from "@ember/controller"; import { bufferedProperty } from "discourse/mixins/buffered-content"; import Composer from "discourse/models/composer"; @@ -30,6 +30,8 @@ import { deepMerge } from "discourse-common/lib/object"; let customPostMessageCallbacks = {}; +const RETRIES_ON_RATE_LIMIT = 4; + export function resetCustomPostMessageCallbacks() { customPostMessageCallbacks = {}; } @@ -1292,6 +1294,42 @@ export default Controller.extend(bufferedProperty("model"), { this.model.destroy(this.currentUser); }, + retryOnRateLimit(times, promise, topicId) { + const currentTopicId = this.get("model.id"); + topicId = topicId || currentTopicId; + if (topicId !== currentTopicId) { + // we navigated to another topic, so skip + return; + } + + if (this.retryRateLimited || times <= 0) { + return; + } + + promise().catch((e) => { + const xhr = e.jqXHR; + if ( + xhr && + xhr.status === 429 && + xhr.responseJSON && + xhr.responseJSON.extras && + xhr.responseJSON.extras.wait_seconds + ) { + let waitSeconds = xhr.responseJSON.extras.wait_seconds; + if (waitSeconds < 5) { + waitSeconds = 5; + } + + this.retryRateLimited = true; + + later(() => { + this.retryRateLimited = false; + this.retryOnRateLimit(times - 1, promise, topicId); + }, waitSeconds * 1000); + } + }); + }, + subscribe() { this.unsubscribe(); @@ -1363,7 +1401,22 @@ export default Controller.extend(bufferedProperty("model"), { break; } case "created": { - postStream.triggerNewPostInStream(data.id).then(() => refresh()); + this.newPostsInStream = this.newPostsInStream || []; + this.newPostsInStream.push(data.id); + + this.retryOnRateLimit(RETRIES_ON_RATE_LIMIT, () => { + const postIds = this.newPostsInStream; + this.newPostsInStream = []; + + return postStream + .triggerNewPostsInStream(postIds, { background: true }) + .then(() => refresh()) + .catch((e) => { + this.newPostsInStream = postIds.concat(this.newPostsInStream); + throw e; + }); + }); + if (this.get("currentUser.id") !== data.user_id) { this.documentTitle.incrementBackgroundContextCount(); } diff --git a/app/assets/javascripts/discourse/app/models/post-stream.js b/app/assets/javascripts/discourse/app/models/post-stream.js index 340b1b2c6f8..d68c2cac413 100644 --- a/app/assets/javascripts/discourse/app/models/post-stream.js +++ b/app/assets/javascripts/discourse/app/models/post-stream.js @@ -11,6 +11,7 @@ import { loadTopicView } from "discourse/models/topic"; import { Promise } from "rsvp"; import User from "discourse/models/user"; import { deepMerge } from "discourse-common/lib/object"; +import deprecated from "discourse-common/lib/deprecated"; export default RestModel.extend({ _identityMap: null, @@ -599,15 +600,25 @@ export default RestModel.extend({ }); }, + /* mainly for backwards compatability with plugins, used in quick messages plugin + * TODO: remove July 2021 + * */ + triggerNewPostInStream(postId, opts) { + deprecated( + "Please use triggerNewPostsInStream, this method will be removed July 2021" + ); + return this.triggerNewPostsInStream([postId], opts); + }, + /** - Finds and adds a post to the stream by id. Typically this would happen if we receive a message + Finds and adds posts to the stream by id. Typically this would happen if we receive a message from the message bus indicating there's a new post. We'll only insert it if we currently have no filters. **/ - triggerNewPostInStream(postId) { + triggerNewPostsInStream(postIds, opts) { const resolved = Promise.resolve(); - if (!postId) { + if (!postIds || postIds.length === 0) { return resolved; } @@ -617,27 +628,46 @@ export default RestModel.extend({ } const loadedAllPosts = this.loadedAllPosts; + this._loadingPostIds = this._loadingPostIds || []; - if (this.stream.indexOf(postId) === -1) { - this.stream.addObject(postId); - if (loadedAllPosts) { - this.set("loadingLastPost", true); - return this.findPostsByIds([postId]) - .then((posts) => { - const ignoredUsers = - User.current() && User.current().get("ignored_users"); - posts.forEach((p) => { - if (ignoredUsers && ignoredUsers.includes(p.username)) { - this.stream.removeObject(postId); - return; - } - this.appendPost(p); - }); - }) - .finally(() => { - this.set("loadingLastPost", false); - }); + let missingIds = []; + + postIds.forEach((postId) => { + if (postId && this.stream.indexOf(postId) === -1) { + missingIds.push(postId); } + }); + + if (missingIds.length === 0) { + return resolved; + } + + if (loadedAllPosts) { + missingIds.forEach((postId) => { + if (this._loadingPostIds.indexOf(postId) === -1) { + this._loadingPostIds.push(postId); + } + }); + this.set("loadingLastPost", true); + return this.findPostsByIds(this._loadingPostIds, opts) + .then((posts) => { + this._loadingPostIds = null; + const ignoredUsers = + User.current() && User.current().get("ignored_users"); + posts.forEach((p) => { + if (ignoredUsers && ignoredUsers.includes(p.username)) { + this.stream.removeObject(p.id); + return; + } + this.stream.addObject(p.id); + this.appendPost(p); + }); + }) + .finally(() => { + this.set("loadingLastPost", false); + }); + } else { + missingIds.forEach((postId) => this.stream.addObject(postId)); } return resolved; @@ -789,11 +819,11 @@ export default RestModel.extend({ // Get the index in the stream of a post id. (Use this for the topic progress bar.) progressIndexOfPostId(post) { const postId = post.get("id"); - const index = this.stream.indexOf(postId); if (this.isMegaTopic) { return post.get("post_number"); } else { + const index = this.stream.indexOf(postId); return index + 1; } }, @@ -972,17 +1002,17 @@ export default RestModel.extend({ }); }, - findPostsByIds(postIds) { + findPostsByIds(postIds, opts) { const identityMap = this._identityMap; const unloaded = postIds.filter((p) => !identityMap[p]); // Load our unloaded posts by id - return this.loadIntoIdentityMap(unloaded).then(() => { + return this.loadIntoIdentityMap(unloaded, opts).then(() => { return postIds.map((p) => identityMap[p]).compact(); }); }, - loadIntoIdentityMap(postIds) { + loadIntoIdentityMap(postIds, opts) { if (isEmpty(postIds)) { return Promise.resolve([]); } @@ -993,7 +1023,15 @@ export default RestModel.extend({ const data = { post_ids: postIds, include_suggested: includeSuggested }; const store = this.store; - return ajax(url, { data }).then((result) => { + let headers = {}; + if (opts && opts.background) { + headers["Discourse-Background"] = "true"; + } + + return ajax(url, { + data, + headers, + }).then((result) => { if (result.suggested_topics) { this.set("topic.suggested_topics", result.suggested_topics); } diff --git a/app/assets/javascripts/discourse/tests/unit/models/post-stream-test.js b/app/assets/javascripts/discourse/tests/unit/models/post-stream-test.js index 56472cc6a3f..27fda7baf32 100644 --- a/app/assets/javascripts/discourse/tests/unit/models/post-stream-test.js +++ b/app/assets/javascripts/discourse/tests/unit/models/post-stream-test.js @@ -785,7 +785,7 @@ test("triggerRecoveredPost", async (assert) => { ); }); -test("comitting and triggerNewPostInStream race condition", (assert) => { +test("comitting and triggerNewPostsInStream race condition", (assert) => { const postStream = buildStream(4964); const store = postStream.store; @@ -808,7 +808,7 @@ test("comitting and triggerNewPostInStream race condition", (assert) => { stagedPost.set("id", 123); sandbox.stub(postStream, "appendMore"); - postStream.triggerNewPostInStream(123); + postStream.triggerNewPostsInStream([123]); assert.equal(postStream.get("filteredPostsCount"), 1, "it added the post"); postStream.commitPost(stagedPost); @@ -849,7 +849,7 @@ test("triggerNewPostInStream for ignored posts", async (assert) => { .stub(postStream, "findPostsByIds") .returns(Promise.resolve([post2])); - await postStream.triggerNewPostInStream(101); + await postStream.triggerNewPostsInStream([101]); assert.equal( postStream.posts.length, 2, @@ -864,7 +864,7 @@ test("triggerNewPostInStream for ignored posts", async (assert) => { stub.restore(); sandbox.stub(postStream, "findPostsByIds").returns(Promise.resolve([post3])); - await postStream.triggerNewPostInStream(102); + await postStream.triggerNewPostsInStream([102]); assert.equal( postStream.posts.length, 2, diff --git a/config/discourse_defaults.conf b/config/discourse_defaults.conf index 66fe1727cc4..84d9c84e345 100644 --- a/config/discourse_defaults.conf +++ b/config/discourse_defaults.conf @@ -233,6 +233,11 @@ force_anonymous_min_queue_seconds = 1 # only trigger anon if we see more than N requests for this path in last 10 seconds force_anonymous_min_per_10_seconds = 3 +# Any requests with the headers Discourse-Background = true will not be allowed to queue +# longer than this amount of time. +# Discourse will rate limit and ask client to try again later. +background_requests_max_queue_length = 0.5 + # if a message bus request queues for 100ms or longer, we will reject it and ask consumer # to back off reject_message_bus_queue_seconds = 0.1 diff --git a/lib/middleware/anonymous_cache.rb b/lib/middleware/anonymous_cache.rb index 819723f23b6..c57bafbc5af 100644 --- a/lib/middleware/anonymous_cache.rb +++ b/lib/middleware/anonymous_cache.rb @@ -329,6 +329,23 @@ module Middleware helper.force_anonymous! end + if (env["HTTP_DISCOURSE_BACKGROUND"] == "true") && (queue_time = env["REQUEST_QUEUE_SECONDS"]) + if queue_time > GlobalSetting.background_requests_max_queue_length + return [ + 429, + { + "content-type" => "application/json; charset=utf-8" + }, + [{ + errors: I18n.t("rate_limiter.slow_down"), + extras: { + wait_seconds: 5 + (5 * rand).round(2) + } + }.to_json] + ] + end + end + result = if helper.cacheable? helper.cached(env) || helper.cache(@app.call(env), env) diff --git a/spec/components/middleware/anonymous_cache_spec.rb b/spec/components/middleware/anonymous_cache_spec.rb index 2d27f992181..dbe344a3c38 100644 --- a/spec/components/middleware/anonymous_cache_spec.rb +++ b/spec/components/middleware/anonymous_cache_spec.rb @@ -127,6 +127,48 @@ describe Middleware::AnonymousCache do end end + context 'background request rate limit' do + it 'will rate limit background requests' do + + app = Middleware::AnonymousCache.new( + lambda do |env| + [200, {}, ["ok"]] + end + ) + + global_setting :background_requests_max_queue_length, 1 + + env = { + "HTTP_COOKIE" => "_t=#{SecureRandom.hex}", + "HOST" => "site.com", + "REQUEST_METHOD" => "GET", + "REQUEST_URI" => "/somewhere/rainbow", + "REQUEST_QUEUE_SECONDS" => 2.1, + "rack.input" => StringIO.new + } + + # non background ... long request + env["REQUEST_QUEUE_SECONDS"] = 2 + + status, _ = app.call(env.dup) + expect(status).to eq(200) + + env["HTTP_DISCOURSE_BACKGROUND"] = "true" + + status, headers, body = app.call(env.dup) + expect(status).to eq(429) + expect(headers["content-type"]).to eq("application/json; charset=utf-8") + json = JSON.parse(body.join) + expect(json["extras"]["wait_seconds"]).to be > 4.9 + + env["REQUEST_QUEUE_SECONDS"] = 0.5 + + status, _ = app.call(env.dup) + expect(status).to eq(200) + + end + end + context 'force_anonymous!' do before do RateLimiter.enable