mirror of
https://github.com/discourse/discourse.git
synced 2024-11-26 22:53:43 +08:00
REFACTOR: Quick refactor of the webhook event emitter job (#7385)
* REFACTOR: Quick refactor of the webhook event emitter job
This commit is contained in:
parent
16bfc29164
commit
02a9429c38
|
@ -7,62 +7,39 @@ module Jobs
|
|||
RETRY_BACKOFF = 5
|
||||
|
||||
def execute(args)
|
||||
%i{
|
||||
web_hook_id
|
||||
event_type
|
||||
}.each do |key|
|
||||
raise Discourse::InvalidParameters.new(key) unless args[key].present?
|
||||
memoize_arguments(args)
|
||||
validate_arguments!
|
||||
|
||||
unless ping_event?(arguments[:event_type])
|
||||
validate_argument!(:payload)
|
||||
|
||||
return if webhook_inactive?
|
||||
return if group_webhook_invalid?
|
||||
return if category_webhook_invalid?
|
||||
return if tag_webhook_invalid?
|
||||
end
|
||||
|
||||
@orig_args = args.dup
|
||||
|
||||
web_hook = WebHook.find_by(id: args[:web_hook_id])
|
||||
raise Discourse::InvalidParameters.new(:web_hook_id) if web_hook.blank?
|
||||
|
||||
unless ping_event?(args[:event_type])
|
||||
return unless web_hook.active?
|
||||
|
||||
return if web_hook.group_ids.present? && (args[:group_id].present? ||
|
||||
!web_hook.group_ids.include?(args[:group_id]))
|
||||
|
||||
return if web_hook.category_ids.present? && (!args[:category_id].present? ||
|
||||
!web_hook.category_ids.include?(args[:category_id]))
|
||||
|
||||
return if web_hook.tag_ids.present? && (args[:tag_ids].blank? ||
|
||||
(web_hook.tag_ids & args[:tag_ids]).blank?)
|
||||
|
||||
raise Discourse::InvalidParameters.new(:payload) unless args[:payload].present?
|
||||
args[:payload] = JSON.parse(args[:payload])
|
||||
end
|
||||
|
||||
web_hook_request(args, web_hook)
|
||||
send_webhook!
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def guardian
|
||||
Guardian.new(Discourse.system_user)
|
||||
def validate_arguments!
|
||||
validate_argument!(:web_hook_id)
|
||||
validate_argument!(:event_type)
|
||||
raise Discourse::InvalidParameters.new(:web_hook_id) if web_hook.blank?
|
||||
end
|
||||
|
||||
def ping_event?(event_type)
|
||||
PING_EVENT == event_type.to_s
|
||||
def validate_argument!(key)
|
||||
raise Discourse::InvalidParameters.new(key) unless arguments[key].present?
|
||||
end
|
||||
|
||||
def build_web_hook_body(args, web_hook)
|
||||
body = {}
|
||||
event_type = args[:event_type].to_s
|
||||
|
||||
if ping_event?(event_type)
|
||||
body[:ping] = 'OK'
|
||||
else
|
||||
body[event_type] = args[:payload]
|
||||
end
|
||||
|
||||
new_body = Plugin::Filter.apply(:after_build_web_hook_body, self, body)
|
||||
MultiJson.dump(new_body)
|
||||
def memoize_arguments(args)
|
||||
@arguments = args
|
||||
@retry_count = @arguments[:retry_count] || 0
|
||||
end
|
||||
|
||||
def web_hook_request(args, web_hook)
|
||||
def send_webhook!
|
||||
uri = URI(web_hook.payload_url.strip)
|
||||
|
||||
conn = Excon.new(
|
||||
|
@ -71,42 +48,17 @@ module Jobs
|
|||
retry_limit: 0
|
||||
)
|
||||
|
||||
body = build_web_hook_body(args, web_hook)
|
||||
web_hook_event = WebHookEvent.create!(web_hook_id: web_hook.id, payload: body)
|
||||
web_hook_body = build_webhook_body
|
||||
web_hook_event = create_webhook_event(web_hook_body)
|
||||
web_hook_headers = build_webhook_headers(uri, web_hook_body, web_hook_event)
|
||||
|
||||
response = nil
|
||||
|
||||
begin
|
||||
content_type =
|
||||
case web_hook.content_type
|
||||
when WebHook.content_types['application/x-www-form-urlencoded']
|
||||
'application/x-www-form-urlencoded'
|
||||
else
|
||||
'application/json'
|
||||
end
|
||||
|
||||
headers = {
|
||||
'Accept' => '*/*',
|
||||
'Connection' => 'close',
|
||||
'Content-Length' => body.bytesize,
|
||||
'Content-Type' => content_type,
|
||||
'Host' => uri.host,
|
||||
'User-Agent' => "Discourse/#{Discourse::VERSION::STRING}",
|
||||
'X-Discourse-Instance' => Discourse.base_url,
|
||||
'X-Discourse-Event-Id' => web_hook_event.id,
|
||||
'X-Discourse-Event-Type' => args[:event_type]
|
||||
}
|
||||
|
||||
headers['X-Discourse-Event'] = args[:event_name].to_s if args[:event_name].present?
|
||||
|
||||
if web_hook.secret.present?
|
||||
headers['X-Discourse-Event-Signature'] = "sha256=#{OpenSSL::HMAC.hexdigest("sha256", web_hook.secret, body)}"
|
||||
end
|
||||
|
||||
now = Time.zone.now
|
||||
response = conn.post(headers: headers, body: body)
|
||||
|
||||
response = conn.post(headers: web_hook_headers, body: web_hook_body)
|
||||
web_hook_event.update!(
|
||||
headers: MultiJson.dump(headers),
|
||||
headers: MultiJson.dump(web_hook_headers),
|
||||
status: response.status,
|
||||
response_headers: MultiJson.dump(response.headers),
|
||||
response_body: response.body,
|
||||
|
@ -114,28 +66,114 @@ module Jobs
|
|||
)
|
||||
rescue => e
|
||||
web_hook_event.update!(
|
||||
headers: MultiJson.dump(headers),
|
||||
headers: MultiJson.dump(web_hook_headers),
|
||||
status: -1,
|
||||
response_headers: MultiJson.dump(error: e),
|
||||
duration: ((Time.zone.now - now) * 1000).to_i
|
||||
)
|
||||
end
|
||||
|
||||
MessageBus.publish("/web_hook_events/#{web_hook.id}", {
|
||||
web_hook_event_id: web_hook_event.id,
|
||||
event_type: args[:event_type]
|
||||
}, user_ids: User.human_users.staff.pluck(:id))
|
||||
|
||||
publish_webhook_event(web_hook_event)
|
||||
retry_web_hook if response&.status != 200
|
||||
end
|
||||
|
||||
def retry_web_hook
|
||||
if SiteSetting.retry_web_hook_events?
|
||||
@orig_args[:retry_count] = (@orig_args[:retry_count] || 0) + 1
|
||||
return if @orig_args[:retry_count] > MAX_RETRY_COUNT
|
||||
delay = RETRY_BACKOFF**(@orig_args[:retry_count] - 1)
|
||||
Jobs.enqueue_in(delay.minutes, :emit_web_hook_event, @orig_args)
|
||||
@retry_count += 1
|
||||
return if @retry_count > MAX_RETRY_COUNT
|
||||
delay = RETRY_BACKOFF**(@retry_count - 1)
|
||||
Jobs.enqueue_in(delay.minutes, :emit_web_hook_event, arguments)
|
||||
end
|
||||
end
|
||||
|
||||
def publish_webhook_event(web_hook_event)
|
||||
MessageBus.publish("/web_hook_events/#{web_hook.id}", {
|
||||
web_hook_event_id: web_hook_event.id,
|
||||
event_type: arguments[:event_type]
|
||||
}, user_ids: User.human_users.staff.pluck(:id))
|
||||
end
|
||||
|
||||
def ping_event?(event_type)
|
||||
PING_EVENT == event_type
|
||||
end
|
||||
|
||||
def webhook_inactive?
|
||||
!web_hook.active?
|
||||
end
|
||||
|
||||
def group_webhook_invalid?
|
||||
web_hook.group_ids.present? && (arguments[:group_id].present? ||
|
||||
!web_hook.group_ids.include?(arguments[:group_id]))
|
||||
end
|
||||
|
||||
def category_webhook_invalid?
|
||||
web_hook.category_ids.present? && (!arguments[:category_id].present? ||
|
||||
!web_hook.category_ids.include?(arguments[:category_id]))
|
||||
end
|
||||
|
||||
def tag_webhook_invalid?
|
||||
web_hook.tag_ids.present? && (arguments[:tag_ids].blank? ||
|
||||
(web_hook.tag_ids & arguments[:tag_ids]).blank?)
|
||||
end
|
||||
|
||||
def arguments
|
||||
@arguments
|
||||
end
|
||||
|
||||
def parsed_payload
|
||||
@parsed_payload ||= JSON.parse(arguments[:payload])
|
||||
end
|
||||
|
||||
def web_hook
|
||||
@web_hook ||= WebHook.find_by(id: arguments[:web_hook_id])
|
||||
end
|
||||
|
||||
def build_webhook_headers(uri, web_hook_body, web_hook_event)
|
||||
content_type =
|
||||
case web_hook.content_type
|
||||
when WebHook.content_types['application/x-www-form-urlencoded']
|
||||
'application/x-www-form-urlencoded'
|
||||
else
|
||||
'application/json'
|
||||
end
|
||||
|
||||
headers = {
|
||||
'Accept' => '*/*',
|
||||
'Connection' => 'close',
|
||||
'Content-Length' => web_hook_body.bytesize,
|
||||
'Content-Type' => content_type,
|
||||
'Host' => uri.host,
|
||||
'User-Agent' => "Discourse/#{Discourse::VERSION::STRING}",
|
||||
'X-Discourse-Instance' => Discourse.base_url,
|
||||
'X-Discourse-Event-Id' => web_hook_event.id,
|
||||
'X-Discourse-Event-Type' => arguments[:event_type]
|
||||
}
|
||||
|
||||
headers['X-Discourse-Event'] = arguments[:event_name] if arguments[:event_name].present?
|
||||
|
||||
if web_hook.secret.present?
|
||||
headers['X-Discourse-Event-Signature'] = "sha256=#{OpenSSL::HMAC.hexdigest("sha256", web_hook.secret, web_hook_body)}"
|
||||
end
|
||||
|
||||
headers
|
||||
end
|
||||
|
||||
def build_webhook_body
|
||||
body = {}
|
||||
|
||||
if ping_event?(arguments[:event_type])
|
||||
body['ping'] = "OK"
|
||||
else
|
||||
body[arguments[:event_type]] = parsed_payload
|
||||
end
|
||||
|
||||
new_body = Plugin::Filter.apply(:after_build_web_hook_body, self, body)
|
||||
MultiJson.dump(new_body)
|
||||
end
|
||||
|
||||
def create_webhook_event(web_hook_body)
|
||||
WebHookEvent.create!(web_hook_id: web_hook.id, payload: web_hook_body)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -52,10 +52,6 @@ describe Jobs::EmitWebHookEvent do
|
|||
event_type: described_class::PING_EVENT
|
||||
)
|
||||
end.to change { Jobs::EmitWebHookEvent.jobs.size }.by(1)
|
||||
|
||||
job = Jobs::EmitWebHookEvent.jobs.first
|
||||
args = job["args"].first
|
||||
expect(args["retry_count"]).to eq(1)
|
||||
end
|
||||
|
||||
it 'does not retry for more than maximum allowed times' do
|
||||
|
@ -189,7 +185,7 @@ describe Jobs::EmitWebHookEvent do
|
|||
end
|
||||
end
|
||||
|
||||
describe '#web_hook_request' do
|
||||
describe '#send_webhook!' do
|
||||
it 'creates delivery event record' do
|
||||
stub_request(:post, post_hook.payload_url)
|
||||
.to_return(body: 'OK', status: 200)
|
||||
|
|
|
@ -291,7 +291,7 @@ describe WebHook do
|
|||
PostDestroyer.new(user, post).destroy
|
||||
|
||||
job = Jobs::EmitWebHookEvent.new
|
||||
job.expects(:web_hook_request).times(2)
|
||||
job.expects(:send_webhook!).times(2)
|
||||
|
||||
args = Jobs::EmitWebHookEvent.jobs[1]["args"].first
|
||||
job.execute(args.with_indifferent_access)
|
||||
|
|
Loading…
Reference in New Issue
Block a user