mirror of
https://github.com/discourse/discourse.git
synced 2024-11-22 06:53:45 +08:00
PERF: backoff background requests when overloaded (#10888)
When the server gets overloaded and lots of requests start queuing server will attempt to shed load by returning 429 errors on background requests. The client can flag a request as background by setting the header: `Discourse-Background` to `true` Out-of-the-box we shed load when the queue time goes above 0.5 seconds. The only request we shed at the moment is the request to load up a new post when someone posts to a topic. We can extend this as we go with a more general pattern on the client. Previous to this change, rate limiting would "break" the post stream which would make suggested topics vanish and users would have to scroll the page to see more posts in the topic. Server needs this protection for cases where tons of clients are navigated to a topic and a new post is made. This can lead to a self inflicted denial of service if enough clients are viewing the topic. Due to the internal security design of Discourse it is hard for a large number of clients to share a channel where we would pass the full post body via the message bus. It also renames (and deprecates) triggerNewPostInStream to triggerNewPostsInStream This allows us to load a batch of new posts cleanly, so the controller can keep track of a backlog Co-authored-by: Joffrey JAFFEUX <j.jaffeux@gmail.com>
This commit is contained in:
parent
dc8c23b44c
commit
32393f72b1
|
@ -23,3 +23,4 @@ app/assets/javascripts/discourse/tests/fixtures
|
|||
app/assets/javascripts/discourse/tests/helpers/assertions.js
|
||||
node_modules/
|
||||
dist/
|
||||
**/*.rb
|
||||
|
|
|
@ -2,7 +2,7 @@ import I18n from "I18n";
|
|||
import { isPresent, isEmpty } from "@ember/utils";
|
||||
import { or, and, not, alias } from "@ember/object/computed";
|
||||
import EmberObject from "@ember/object";
|
||||
import { next, schedule } from "@ember/runloop";
|
||||
import { next, schedule, later } from "@ember/runloop";
|
||||
import Controller, { inject as controller } from "@ember/controller";
|
||||
import { bufferedProperty } from "discourse/mixins/buffered-content";
|
||||
import Composer from "discourse/models/composer";
|
||||
|
@ -30,6 +30,8 @@ import { deepMerge } from "discourse-common/lib/object";
|
|||
|
||||
let customPostMessageCallbacks = {};
|
||||
|
||||
const RETRIES_ON_RATE_LIMIT = 4;
|
||||
|
||||
export function resetCustomPostMessageCallbacks() {
|
||||
customPostMessageCallbacks = {};
|
||||
}
|
||||
|
@ -1292,6 +1294,42 @@ export default Controller.extend(bufferedProperty("model"), {
|
|||
this.model.destroy(this.currentUser);
|
||||
},
|
||||
|
||||
retryOnRateLimit(times, promise, topicId) {
|
||||
const currentTopicId = this.get("model.id");
|
||||
topicId = topicId || currentTopicId;
|
||||
if (topicId !== currentTopicId) {
|
||||
// we navigated to another topic, so skip
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.retryRateLimited || times <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
promise().catch((e) => {
|
||||
const xhr = e.jqXHR;
|
||||
if (
|
||||
xhr &&
|
||||
xhr.status === 429 &&
|
||||
xhr.responseJSON &&
|
||||
xhr.responseJSON.extras &&
|
||||
xhr.responseJSON.extras.wait_seconds
|
||||
) {
|
||||
let waitSeconds = xhr.responseJSON.extras.wait_seconds;
|
||||
if (waitSeconds < 5) {
|
||||
waitSeconds = 5;
|
||||
}
|
||||
|
||||
this.retryRateLimited = true;
|
||||
|
||||
later(() => {
|
||||
this.retryRateLimited = false;
|
||||
this.retryOnRateLimit(times - 1, promise, topicId);
|
||||
}, waitSeconds * 1000);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
subscribe() {
|
||||
this.unsubscribe();
|
||||
|
||||
|
@ -1363,7 +1401,22 @@ export default Controller.extend(bufferedProperty("model"), {
|
|||
break;
|
||||
}
|
||||
case "created": {
|
||||
postStream.triggerNewPostInStream(data.id).then(() => refresh());
|
||||
this.newPostsInStream = this.newPostsInStream || [];
|
||||
this.newPostsInStream.push(data.id);
|
||||
|
||||
this.retryOnRateLimit(RETRIES_ON_RATE_LIMIT, () => {
|
||||
const postIds = this.newPostsInStream;
|
||||
this.newPostsInStream = [];
|
||||
|
||||
return postStream
|
||||
.triggerNewPostsInStream(postIds, { background: true })
|
||||
.then(() => refresh())
|
||||
.catch((e) => {
|
||||
this.newPostsInStream = postIds.concat(this.newPostsInStream);
|
||||
throw e;
|
||||
});
|
||||
});
|
||||
|
||||
if (this.get("currentUser.id") !== data.user_id) {
|
||||
this.documentTitle.incrementBackgroundContextCount();
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import { loadTopicView } from "discourse/models/topic";
|
|||
import { Promise } from "rsvp";
|
||||
import User from "discourse/models/user";
|
||||
import { deepMerge } from "discourse-common/lib/object";
|
||||
import deprecated from "discourse-common/lib/deprecated";
|
||||
|
||||
export default RestModel.extend({
|
||||
_identityMap: null,
|
||||
|
@ -599,15 +600,25 @@ export default RestModel.extend({
|
|||
});
|
||||
},
|
||||
|
||||
/* mainly for backwards compatability with plugins, used in quick messages plugin
|
||||
* TODO: remove July 2021
|
||||
* */
|
||||
triggerNewPostInStream(postId, opts) {
|
||||
deprecated(
|
||||
"Please use triggerNewPostsInStream, this method will be removed July 2021"
|
||||
);
|
||||
return this.triggerNewPostsInStream([postId], opts);
|
||||
},
|
||||
|
||||
/**
|
||||
Finds and adds a post to the stream by id. Typically this would happen if we receive a message
|
||||
Finds and adds posts to the stream by id. Typically this would happen if we receive a message
|
||||
from the message bus indicating there's a new post. We'll only insert it if we currently
|
||||
have no filters.
|
||||
**/
|
||||
triggerNewPostInStream(postId) {
|
||||
triggerNewPostsInStream(postIds, opts) {
|
||||
const resolved = Promise.resolve();
|
||||
|
||||
if (!postId) {
|
||||
if (!postIds || postIds.length === 0) {
|
||||
return resolved;
|
||||
}
|
||||
|
||||
|
@ -617,27 +628,46 @@ export default RestModel.extend({
|
|||
}
|
||||
|
||||
const loadedAllPosts = this.loadedAllPosts;
|
||||
this._loadingPostIds = this._loadingPostIds || [];
|
||||
|
||||
if (this.stream.indexOf(postId) === -1) {
|
||||
this.stream.addObject(postId);
|
||||
if (loadedAllPosts) {
|
||||
this.set("loadingLastPost", true);
|
||||
return this.findPostsByIds([postId])
|
||||
.then((posts) => {
|
||||
const ignoredUsers =
|
||||
User.current() && User.current().get("ignored_users");
|
||||
posts.forEach((p) => {
|
||||
if (ignoredUsers && ignoredUsers.includes(p.username)) {
|
||||
this.stream.removeObject(postId);
|
||||
return;
|
||||
}
|
||||
this.appendPost(p);
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
this.set("loadingLastPost", false);
|
||||
});
|
||||
let missingIds = [];
|
||||
|
||||
postIds.forEach((postId) => {
|
||||
if (postId && this.stream.indexOf(postId) === -1) {
|
||||
missingIds.push(postId);
|
||||
}
|
||||
});
|
||||
|
||||
if (missingIds.length === 0) {
|
||||
return resolved;
|
||||
}
|
||||
|
||||
if (loadedAllPosts) {
|
||||
missingIds.forEach((postId) => {
|
||||
if (this._loadingPostIds.indexOf(postId) === -1) {
|
||||
this._loadingPostIds.push(postId);
|
||||
}
|
||||
});
|
||||
this.set("loadingLastPost", true);
|
||||
return this.findPostsByIds(this._loadingPostIds, opts)
|
||||
.then((posts) => {
|
||||
this._loadingPostIds = null;
|
||||
const ignoredUsers =
|
||||
User.current() && User.current().get("ignored_users");
|
||||
posts.forEach((p) => {
|
||||
if (ignoredUsers && ignoredUsers.includes(p.username)) {
|
||||
this.stream.removeObject(p.id);
|
||||
return;
|
||||
}
|
||||
this.stream.addObject(p.id);
|
||||
this.appendPost(p);
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
this.set("loadingLastPost", false);
|
||||
});
|
||||
} else {
|
||||
missingIds.forEach((postId) => this.stream.addObject(postId));
|
||||
}
|
||||
|
||||
return resolved;
|
||||
|
@ -789,11 +819,11 @@ export default RestModel.extend({
|
|||
// Get the index in the stream of a post id. (Use this for the topic progress bar.)
|
||||
progressIndexOfPostId(post) {
|
||||
const postId = post.get("id");
|
||||
const index = this.stream.indexOf(postId);
|
||||
|
||||
if (this.isMegaTopic) {
|
||||
return post.get("post_number");
|
||||
} else {
|
||||
const index = this.stream.indexOf(postId);
|
||||
return index + 1;
|
||||
}
|
||||
},
|
||||
|
@ -972,17 +1002,17 @@ export default RestModel.extend({
|
|||
});
|
||||
},
|
||||
|
||||
findPostsByIds(postIds) {
|
||||
findPostsByIds(postIds, opts) {
|
||||
const identityMap = this._identityMap;
|
||||
const unloaded = postIds.filter((p) => !identityMap[p]);
|
||||
|
||||
// Load our unloaded posts by id
|
||||
return this.loadIntoIdentityMap(unloaded).then(() => {
|
||||
return this.loadIntoIdentityMap(unloaded, opts).then(() => {
|
||||
return postIds.map((p) => identityMap[p]).compact();
|
||||
});
|
||||
},
|
||||
|
||||
loadIntoIdentityMap(postIds) {
|
||||
loadIntoIdentityMap(postIds, opts) {
|
||||
if (isEmpty(postIds)) {
|
||||
return Promise.resolve([]);
|
||||
}
|
||||
|
@ -993,7 +1023,15 @@ export default RestModel.extend({
|
|||
const data = { post_ids: postIds, include_suggested: includeSuggested };
|
||||
const store = this.store;
|
||||
|
||||
return ajax(url, { data }).then((result) => {
|
||||
let headers = {};
|
||||
if (opts && opts.background) {
|
||||
headers["Discourse-Background"] = "true";
|
||||
}
|
||||
|
||||
return ajax(url, {
|
||||
data,
|
||||
headers,
|
||||
}).then((result) => {
|
||||
if (result.suggested_topics) {
|
||||
this.set("topic.suggested_topics", result.suggested_topics);
|
||||
}
|
||||
|
|
|
@ -785,7 +785,7 @@ test("triggerRecoveredPost", async (assert) => {
|
|||
);
|
||||
});
|
||||
|
||||
test("comitting and triggerNewPostInStream race condition", (assert) => {
|
||||
test("comitting and triggerNewPostsInStream race condition", (assert) => {
|
||||
const postStream = buildStream(4964);
|
||||
const store = postStream.store;
|
||||
|
||||
|
@ -808,7 +808,7 @@ test("comitting and triggerNewPostInStream race condition", (assert) => {
|
|||
stagedPost.set("id", 123);
|
||||
|
||||
sandbox.stub(postStream, "appendMore");
|
||||
postStream.triggerNewPostInStream(123);
|
||||
postStream.triggerNewPostsInStream([123]);
|
||||
assert.equal(postStream.get("filteredPostsCount"), 1, "it added the post");
|
||||
|
||||
postStream.commitPost(stagedPost);
|
||||
|
@ -849,7 +849,7 @@ test("triggerNewPostInStream for ignored posts", async (assert) => {
|
|||
.stub(postStream, "findPostsByIds")
|
||||
.returns(Promise.resolve([post2]));
|
||||
|
||||
await postStream.triggerNewPostInStream(101);
|
||||
await postStream.triggerNewPostsInStream([101]);
|
||||
assert.equal(
|
||||
postStream.posts.length,
|
||||
2,
|
||||
|
@ -864,7 +864,7 @@ test("triggerNewPostInStream for ignored posts", async (assert) => {
|
|||
stub.restore();
|
||||
sandbox.stub(postStream, "findPostsByIds").returns(Promise.resolve([post3]));
|
||||
|
||||
await postStream.triggerNewPostInStream(102);
|
||||
await postStream.triggerNewPostsInStream([102]);
|
||||
assert.equal(
|
||||
postStream.posts.length,
|
||||
2,
|
||||
|
|
|
@ -233,6 +233,11 @@ force_anonymous_min_queue_seconds = 1
|
|||
# only trigger anon if we see more than N requests for this path in last 10 seconds
|
||||
force_anonymous_min_per_10_seconds = 3
|
||||
|
||||
# Any requests with the headers Discourse-Background = true will not be allowed to queue
|
||||
# longer than this amount of time.
|
||||
# Discourse will rate limit and ask client to try again later.
|
||||
background_requests_max_queue_length = 0.5
|
||||
|
||||
# if a message bus request queues for 100ms or longer, we will reject it and ask consumer
|
||||
# to back off
|
||||
reject_message_bus_queue_seconds = 0.1
|
||||
|
|
|
@ -329,6 +329,23 @@ module Middleware
|
|||
helper.force_anonymous!
|
||||
end
|
||||
|
||||
if (env["HTTP_DISCOURSE_BACKGROUND"] == "true") && (queue_time = env["REQUEST_QUEUE_SECONDS"])
|
||||
if queue_time > GlobalSetting.background_requests_max_queue_length
|
||||
return [
|
||||
429,
|
||||
{
|
||||
"content-type" => "application/json; charset=utf-8"
|
||||
},
|
||||
[{
|
||||
errors: I18n.t("rate_limiter.slow_down"),
|
||||
extras: {
|
||||
wait_seconds: 5 + (5 * rand).round(2)
|
||||
}
|
||||
}.to_json]
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
result =
|
||||
if helper.cacheable?
|
||||
helper.cached(env) || helper.cache(@app.call(env), env)
|
||||
|
|
|
@ -127,6 +127,48 @@ describe Middleware::AnonymousCache do
|
|||
end
|
||||
end
|
||||
|
||||
context 'background request rate limit' do
|
||||
it 'will rate limit background requests' do
|
||||
|
||||
app = Middleware::AnonymousCache.new(
|
||||
lambda do |env|
|
||||
[200, {}, ["ok"]]
|
||||
end
|
||||
)
|
||||
|
||||
global_setting :background_requests_max_queue_length, 1
|
||||
|
||||
env = {
|
||||
"HTTP_COOKIE" => "_t=#{SecureRandom.hex}",
|
||||
"HOST" => "site.com",
|
||||
"REQUEST_METHOD" => "GET",
|
||||
"REQUEST_URI" => "/somewhere/rainbow",
|
||||
"REQUEST_QUEUE_SECONDS" => 2.1,
|
||||
"rack.input" => StringIO.new
|
||||
}
|
||||
|
||||
# non background ... long request
|
||||
env["REQUEST_QUEUE_SECONDS"] = 2
|
||||
|
||||
status, _ = app.call(env.dup)
|
||||
expect(status).to eq(200)
|
||||
|
||||
env["HTTP_DISCOURSE_BACKGROUND"] = "true"
|
||||
|
||||
status, headers, body = app.call(env.dup)
|
||||
expect(status).to eq(429)
|
||||
expect(headers["content-type"]).to eq("application/json; charset=utf-8")
|
||||
json = JSON.parse(body.join)
|
||||
expect(json["extras"]["wait_seconds"]).to be > 4.9
|
||||
|
||||
env["REQUEST_QUEUE_SECONDS"] = 0.5
|
||||
|
||||
status, _ = app.call(env.dup)
|
||||
expect(status).to eq(200)
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
context 'force_anonymous!' do
|
||||
before do
|
||||
RateLimiter.enable
|
||||
|
|
Loading…
Reference in New Issue
Block a user