2018-02-03 10:15:28 +08:00
// Copyright 2015 Light Code Labs, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2018-02-09 10:55:10 +08:00
// Package diagnostics implements the client for server-side diagnostics
// of the network. Functions in this package are synchronous and blocking
// unless otherwise specified. For convenience, most functions here do
// not return errors, but errors are logged to the standard logger.
//
// To use this package, first call Init(). You can then call any of the
// collection/aggregation functions. Call StartEmitting() when you are
// ready to begin sending diagnostic updates.
//
2018-02-11 03:59:23 +08:00
// When collecting metrics (functions like Set, AppendUnique, or Increment),
// it may be desirable and even recommended to invoke them in a new
2018-02-09 10:55:10 +08:00
// goroutine (use the go keyword) in case there is lock contention;
// they are thread-safe (unless noted), and you may not want them to
// block the main thread of execution. However, sometimes blocking
// may be necessary too; for example, adding startup metrics to the
// buffer before the call to StartEmitting().
2018-02-11 03:59:23 +08:00
//
// This package is designed to be as fast and space-efficient as reasonably
// possible, so that it does not disrupt the flow of execution.
2018-02-03 10:15:28 +08:00
package diagnostics
import (
2018-02-09 10:55:10 +08:00
"bytes"
"encoding/json"
"fmt"
2018-03-22 07:51:07 +08:00
"io/ioutil"
2018-02-09 10:55:10 +08:00
"log"
"net/http"
2018-03-19 05:49:17 +08:00
"strconv"
2018-02-09 10:55:10 +08:00
"strings"
"sync"
"time"
2018-02-03 10:15:28 +08:00
"github.com/google/uuid"
)
2018-02-09 10:55:10 +08:00
// logEmit calls emit and then logs the error, if any.
2018-03-22 07:01:14 +08:00
// See docs for emit.
2018-02-09 10:55:10 +08:00
func logEmit ( final bool ) {
err := emit ( final )
if err != nil {
2018-03-22 07:01:14 +08:00
log . Printf ( "[ERROR] Sending diagnostics: %v" , err )
2018-02-09 10:55:10 +08:00
}
}
// emit sends an update to the diagnostics server.
2018-03-22 07:01:14 +08:00
// Set final to true if this is the last call to emit.
2018-02-09 10:55:10 +08:00
// If final is true, no future updates will be scheduled.
// Otherwise, the next update will be scheduled.
func emit ( final bool ) error {
if ! enabled {
return fmt . Errorf ( "diagnostics not enabled" )
}
// ensure only one update happens at a time;
// skip update if previous one still in progress
updateMu . Lock ( )
if updating {
updateMu . Unlock ( )
log . Println ( "[NOTICE] Skipping this diagnostics update because previous one is still working" )
return nil
}
updating = true
updateMu . Unlock ( )
defer func ( ) {
updateMu . Lock ( )
updating = false
updateMu . Unlock ( )
} ( )
// terminate any pending update if this is the last one
if final {
2018-03-22 07:51:07 +08:00
stopUpdateTimer ( )
2018-02-09 10:55:10 +08:00
}
payloadBytes , err := makePayloadAndResetBuffer ( )
if err != nil {
return err
}
// this will hold the server's reply
var reply Response
// transmit the payload - use a loop to retry in case of failure
for i := 0 ; i < 4 ; i ++ {
if i > 0 && err != nil {
// don't hammer the server; first failure might have been
// a fluke, but back off more after that
2018-03-19 05:49:17 +08:00
log . Printf ( "[WARNING] Sending diagnostics (attempt %d): %v - backing off and retrying" , i , err )
time . Sleep ( time . Duration ( ( i + 1 ) * ( i + 1 ) * ( i + 1 ) ) * time . Second )
2018-02-09 10:55:10 +08:00
}
// send it
var resp * http . Response
resp , err = httpClient . Post ( endpoint + instanceUUID . String ( ) , "application/json" , bytes . NewReader ( payloadBytes ) )
if err != nil {
continue
}
2018-03-22 07:51:07 +08:00
// check for any special-case response codes
if resp . StatusCode == http . StatusGone {
// the endpoint has been deprecated and is no longer servicing clients
err = fmt . Errorf ( "diagnostics server replied with HTTP %d; upgrade required" , resp . StatusCode )
if clen := resp . Header . Get ( "Content-Length" ) ; clen != "0" && clen != "" {
bodyBytes , readErr := ioutil . ReadAll ( resp . Body )
if readErr != nil {
log . Printf ( "[ERROR] Reading response body from server: %v" , readErr )
}
err = fmt . Errorf ( "%v - %s" , err , bodyBytes )
}
resp . Body . Close ( )
reply . Stop = true
break
}
if resp . StatusCode == http . StatusUnavailableForLegalReasons {
// the endpoint is unavailable, at least to this client, for legal reasons (!)
err = fmt . Errorf ( "diagnostics server replied with HTTP %d %s: please consult the project website and developers for guidance" , resp . StatusCode , resp . Status )
if clen := resp . Header . Get ( "Content-Length" ) ; clen != "0" && clen != "" {
bodyBytes , readErr := ioutil . ReadAll ( resp . Body )
if readErr != nil {
log . Printf ( "[ERROR] Reading response body from server: %v" , readErr )
}
err = fmt . Errorf ( "%v - %s" , err , bodyBytes )
}
resp . Body . Close ( )
reply . Stop = true
break
}
// okay, ensure we can interpret the response
2018-02-09 10:55:10 +08:00
if ct := resp . Header . Get ( "Content-Type" ) ; ( resp . StatusCode < 300 || resp . StatusCode >= 400 ) &&
! strings . Contains ( ct , "json" ) {
2018-03-19 05:49:17 +08:00
err = fmt . Errorf ( "diagnostics server replied with unknown content-type: '%s' and HTTP %s" , ct , resp . Status )
2018-02-09 10:55:10 +08:00
resp . Body . Close ( )
continue
}
// read the response body
err = json . NewDecoder ( resp . Body ) . Decode ( & reply )
resp . Body . Close ( ) // close response body as soon as we're done with it
if err != nil {
continue
}
// make sure we didn't send the update too soon; if so,
// just wait and try again -- this is a special case of
// error that we handle differently, as you can see
if resp . StatusCode == http . StatusTooManyRequests {
2018-03-19 05:49:17 +08:00
if reply . NextUpdate <= 0 {
raStr := resp . Header . Get ( "Retry-After" )
if ra , err := strconv . Atoi ( raStr ) ; err == nil {
reply . NextUpdate = time . Duration ( ra ) * time . Second
}
}
2018-03-22 07:01:14 +08:00
if ! final {
log . Printf ( "[NOTICE] Sending diagnostics: we were too early; waiting %s before trying again" , reply . NextUpdate )
time . Sleep ( reply . NextUpdate )
continue
}
2018-02-09 10:55:10 +08:00
} else if resp . StatusCode >= 400 {
err = fmt . Errorf ( "diagnostics server returned status code %d" , resp . StatusCode )
continue
}
break
}
2018-03-22 07:01:14 +08:00
if err == nil && ! final {
2018-02-09 10:55:10 +08:00
// (remember, if there was an error, we return it
2018-03-19 05:49:17 +08:00
// below, so it WILL get logged if it's supposed to)
2018-02-09 10:55:10 +08:00
log . Println ( "[INFO] Sending diagnostics: success" )
}
2018-03-19 05:49:17 +08:00
// even if there was an error after all retries, we should
2018-02-09 10:55:10 +08:00
// schedule the next update using our default update
// interval because the server might be healthy later
2018-02-11 03:59:23 +08:00
// ensure we won't slam the diagnostics server
if reply . NextUpdate < 1 * time . Second {
reply . NextUpdate = defaultUpdateInterval
}
2018-02-09 10:55:10 +08:00
// schedule the next update (if this wasn't the last one and
// if the remote server didn't tell us to stop sending)
if ! final && ! reply . Stop {
updateTimerMu . Lock ( )
updateTimer = time . AfterFunc ( reply . NextUpdate , func ( ) {
logEmit ( false )
} )
updateTimerMu . Unlock ( )
}
return err
2018-02-03 10:15:28 +08:00
}
2018-03-22 07:51:07 +08:00
func stopUpdateTimer ( ) {
updateTimerMu . Lock ( )
updateTimer . Stop ( )
updateTimer = nil
updateTimerMu . Unlock ( )
}
2018-02-09 10:55:10 +08:00
// makePayloadAndResetBuffer prepares a payload
// by emptying the collection buffer. It returns
// the bytes of the payload to send to the server.
// Since the buffer is reset by this, if the
// resulting byte slice is lost, the payload is
// gone with it.
func makePayloadAndResetBuffer ( ) ( [ ] byte , error ) {
2018-03-22 07:01:14 +08:00
bufCopy := resetBuffer ( )
2018-02-09 10:55:10 +08:00
// encode payload in preparation for transmission
payload := Payload {
InstanceID : instanceUUID . String ( ) ,
Timestamp : time . Now ( ) . UTC ( ) ,
Data : bufCopy ,
}
return json . Marshal ( payload )
}
2018-03-22 07:01:14 +08:00
// resetBuffer makes a local pointer to the buffer,
// then resets the buffer by assigning to be a newly-
// made value to clear it out, then sets the buffer
// item count to 0. It returns the copied pointer to
// the original map so the old buffer value can be
// used locally.
func resetBuffer ( ) map [ string ] interface { } {
bufferMu . Lock ( )
bufCopy := buffer
buffer = make ( map [ string ] interface { } )
bufferItemCount = 0
bufferMu . Unlock ( )
return bufCopy
}
2018-02-09 10:55:10 +08:00
// Response contains the body of a response from the
// diagnostics server.
type Response struct {
// NextUpdate is how long to wait before the next update.
NextUpdate time . Duration ` json:"next_update" `
// Stop instructs the diagnostics server to stop sending
// diagnostics. This would only be done under extenuating
// circumstances, but we are prepared for it nonetheless.
Stop bool ` json:"stop,omitempty" `
// Error will be populated with an error message, if any.
// This field should be empty if the status code is < 400.
Error string ` json:"error,omitempty" `
}
// Payload is the data that gets sent to the diagnostics server.
type Payload struct {
// The universally unique ID of the instance
InstanceID string ` json:"instance_id" `
// The UTC timestamp of the transmission
Timestamp time . Time ` json:"timestamp" `
2018-03-22 07:01:14 +08:00
// The timestamp before which the next update is expected
// (NOT populated by client - the server fills this in
// before it stores the data)
ExpectNext time . Time ` json:"expect_next,omitempty" `
2018-02-09 10:55:10 +08:00
// The metrics
Data map [ string ] interface { } ` json:"data,omitempty" `
}
2018-03-22 07:01:14 +08:00
// Int returns the value of the data keyed by key
// if it is an integer; otherwise it returns 0.
func ( p Payload ) Int ( key string ) int {
val , _ := p . Data [ key ]
switch p . Data [ key ] . ( type ) {
case int :
return val . ( int )
case float64 : // after JSON-decoding, int becomes float64...
return int ( val . ( float64 ) )
}
return 0
}
2018-02-11 03:59:23 +08:00
// countingSet implements a set that counts how many
// times a key is inserted. It marshals to JSON in a
// way such that keys are converted to values next
// to their associated counts.
type countingSet map [ interface { } ] int
// MarshalJSON implements the json.Marshaler interface.
// It converts the set to an array so that the values
// are JSON object values instead of keys, since keys
// are difficult to query in databases.
func ( s countingSet ) MarshalJSON ( ) ( [ ] byte , error ) {
type Item struct {
Value interface { } ` json:"value" `
Count int ` json:"count" `
}
var list [ ] Item
for k , v := range s {
list = append ( list , Item { Value : k , Count : v } )
}
return json . Marshal ( list )
}
2018-02-09 12:15:28 +08:00
var (
// httpClient should be used for HTTP requests. It
// is configured with a timeout for reliability.
httpClient = http . Client { Timeout : 1 * time . Minute }
2018-02-09 10:55:10 +08:00
2018-02-09 12:15:28 +08:00
// buffer holds the data that we are building up to send.
buffer = make ( map [ string ] interface { } )
bufferItemCount = 0
bufferMu sync . RWMutex // protects both the buffer and its count
// updating is used to ensure only one
// update happens at a time.
updating bool
updateMu sync . Mutex
2018-02-09 10:55:10 +08:00
2018-02-09 12:15:28 +08:00
// updateTimer fires off the next update.
// If no update is scheduled, this is nil.
updateTimer * time . Timer
updateTimerMu sync . Mutex
2018-02-09 10:55:10 +08:00
2018-02-09 12:15:28 +08:00
// instanceUUID is the ID of the current instance.
// This MUST be set to emit diagnostics.
2018-03-22 07:01:14 +08:00
// This MUST NOT be openly exposed to clients, for privacy.
2018-02-09 12:15:28 +08:00
instanceUUID uuid . UUID
2018-02-09 10:55:10 +08:00
2018-02-09 12:15:28 +08:00
// enabled indicates whether the package has
// been initialized and can be actively used.
enabled bool
2018-02-09 10:55:10 +08:00
2018-02-09 12:15:28 +08:00
// maxBufferItems is the maximum number of items we'll allow
// in the buffer before we start dropping new ones, in a
// rough (simple) attempt to keep memory use under control.
maxBufferItems = 100000
)
2018-02-09 10:55:10 +08:00
const (
// endpoint is the base URL to remote diagnostics server;
// the instance ID will be appended to it.
2018-02-11 03:59:23 +08:00
endpoint = "https://diagnostics-staging.caddyserver.com/update/" // TODO: make configurable, "http://localhost:8085/update/"
2018-02-09 10:55:10 +08:00
// defaultUpdateInterval is how long to wait before emitting
2018-03-19 05:49:17 +08:00
// more diagnostic data if all retires fail. This value is
// only used if the client receives a nonsensical value, or
// doesn't send one at all, or if a connection can't be made,
// likely indicating a problem with the server. Thus, this
// value should be a long duration to help alleviate extra
// load on the server.
2018-02-09 10:55:10 +08:00
defaultUpdateInterval = 1 * time . Hour
)