2017-05-11 22:39:54 +08:00
/ *
Copyright 2017 Google Inc . All Rights Reserved .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package spanner
import (
"container/heap"
"container/list"
"fmt"
"math/rand"
"strings"
"sync"
"time"
log "github.com/golang/glog"
"golang.org/x/net/context"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)
// sessionHandle is an interface for transactions to access Cloud Spanner sessions safely. It is generated by sessionPool.take().
type sessionHandle struct {
// mu guarantees that inner session object is returned / destroyed only once.
mu sync . Mutex
// session is a pointer to a session object. Transactions never need to access it directly.
session * session
}
// recycle gives the inner session object back to its home session pool. It is safe to call recycle multiple times but only the first one would take effect.
func ( sh * sessionHandle ) recycle ( ) {
sh . mu . Lock ( )
defer sh . mu . Unlock ( )
if sh . session == nil {
// sessionHandle has already been recycled.
return
}
sh . session . recycle ( )
sh . session = nil
}
// getID gets the Cloud Spanner session ID from the internal session object. getID returns empty string if the sessionHandle is nil or the inner session
// object has been released by recycle / destroy.
func ( sh * sessionHandle ) getID ( ) string {
sh . mu . Lock ( )
defer sh . mu . Unlock ( )
if sh . session == nil {
// sessionHandle has already been recycled/destroyed.
return ""
}
return sh . session . getID ( )
}
// getClient gets the Cloud Spanner RPC client associated with the session ID in sessionHandle.
func ( sh * sessionHandle ) getClient ( ) sppb . SpannerClient {
sh . mu . Lock ( )
defer sh . mu . Unlock ( )
if sh . session == nil {
return nil
}
return sh . session . client
}
// getMetadata returns the metadata associated with the session in sessionHandle.
func ( sh * sessionHandle ) getMetadata ( ) metadata . MD {
sh . mu . Lock ( )
defer sh . mu . Unlock ( )
if sh . session == nil {
return nil
}
return sh . session . md
}
// getTransactionID returns the transaction id in the session if available.
func ( sh * sessionHandle ) getTransactionID ( ) transactionID {
sh . mu . Lock ( )
defer sh . mu . Unlock ( )
if sh . session == nil {
return nil
}
return sh . session . tx
}
// destroy destroys the inner session object. It is safe to call destroy multiple times and only the first call would attempt to
// destroy the inner session object.
func ( sh * sessionHandle ) destroy ( ) {
sh . mu . Lock ( )
s := sh . session
sh . session = nil
sh . mu . Unlock ( )
if s == nil {
// sessionHandle has already been destroyed.
return
}
s . destroy ( false )
}
// session wraps a Cloud Spanner session ID through which transactions are created and executed.
type session struct {
// client is the RPC channel to Cloud Spanner. It is set only once during session's creation.
client sppb . SpannerClient
// id is the unique id of the session in Cloud Spanner. It is set only once during session's creation.
id string
// pool is the session's home session pool where it was created. It is set only once during session's creation.
pool * sessionPool
// createTime is the timestamp of the session's creation. It is set only once during session's creation.
createTime time . Time
// mu protects the following fields from concurrent access: both healthcheck workers and transactions can modify them.
mu sync . Mutex
// valid marks the validity of a session.
valid bool
// hcIndex is the index of the session inside the global healthcheck queue. If hcIndex < 0, session has been unregistered from the queue.
hcIndex int
// idleList is the linkedlist node which links the session to its home session pool's idle list. If idleList == nil, the
// session is not in idle list.
idleList * list . Element
// nextCheck is the timestamp of next scheduled healthcheck of the session. It is maintained by the global health checker.
nextCheck time . Time
// checkingHelath is true if currently this session is being processed by health checker. Must be modified under health checker lock.
checkingHealth bool
// md is the Metadata to be sent with each request.
md metadata . MD
// tx contains the transaction id if the session has been prepared for write.
tx transactionID
}
// isValid returns true if the session is still valid for use.
func ( s * session ) isValid ( ) bool {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . valid
}
// isWritePrepared returns true if the session is prepared for write.
func ( s * session ) isWritePrepared ( ) bool {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . tx != nil
}
// String implements fmt.Stringer for session.
func ( s * session ) String ( ) string {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return fmt . Sprintf ( "<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>" ,
s . id , s . hcIndex , s . idleList , s . valid , s . createTime , s . nextCheck )
}
// ping verifies if the session is still alive in Cloud Spanner.
func ( s * session ) ping ( ) error {
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
defer cancel ( )
return runRetryable ( ctx , func ( ctx context . Context ) error {
2017-07-23 15:51:42 +08:00
_ , err := s . client . GetSession ( contextWithOutgoingMetadata ( ctx , s . pool . md ) , & sppb . GetSessionRequest { Name : s . getID ( ) } ) // s.getID is safe even when s is invalid.
2017-05-11 22:39:54 +08:00
return err
} )
}
// refreshIdle refreshes the session's session ID if it is in its home session pool's idle list
// and returns true if successful.
func ( s * session ) refreshIdle ( ) bool {
s . mu . Lock ( )
validAndIdle := s . valid && s . idleList != nil
s . mu . Unlock ( )
if ! validAndIdle {
// Optimization: return early if s is not valid or if s is not in idle list.
return false
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
var sid string
err := runRetryable ( ctx , func ( ctx context . Context ) error {
2017-07-23 15:51:42 +08:00
session , e := s . client . CreateSession ( contextWithOutgoingMetadata ( ctx , s . pool . md ) , & sppb . CreateSessionRequest { Database : s . pool . db } )
2017-05-11 22:39:54 +08:00
if e != nil {
return e
}
sid = session . Name
return nil
} )
if err != nil {
return false
}
s . pool . mu . Lock ( )
s . mu . Lock ( )
var recycle bool
if s . valid && s . idleList != nil {
// session is in idle list, refresh its session id.
sid , s . id = s . id , sid
2017-09-30 22:27:27 +08:00
s . createTime = time . Now ( )
2017-05-11 22:39:54 +08:00
if s . tx != nil {
s . tx = nil
s . pool . idleWriteList . Remove ( s . idleList )
// We need to put this session back into the pool.
recycle = true
}
}
s . mu . Unlock ( )
s . pool . mu . Unlock ( )
if recycle {
s . pool . recycle ( s )
}
// If we fail to explicitly destroy the session, it will be eventually garbage collected by
// Cloud Spanner.
if err = runRetryable ( ctx , func ( ctx context . Context ) error {
2017-07-23 15:51:42 +08:00
_ , e := s . client . DeleteSession ( contextWithOutgoingMetadata ( ctx , s . pool . md ) , & sppb . DeleteSessionRequest { Name : sid } )
2017-05-11 22:39:54 +08:00
return e
2017-07-23 15:51:42 +08:00
} ) ; err != nil && log . V ( 2 ) {
log . Warningf ( "Failed to delete session %v. Error: %v" , sid , err )
2017-05-11 22:39:54 +08:00
}
return true
}
// setHcIndex atomically sets the session's index in the healthcheck queue and returns the old index.
func ( s * session ) setHcIndex ( i int ) int {
s . mu . Lock ( )
defer s . mu . Unlock ( )
oi := s . hcIndex
s . hcIndex = i
return oi
}
// setIdleList atomically sets the session's idle list link and returns the old link.
func ( s * session ) setIdleList ( le * list . Element ) * list . Element {
s . mu . Lock ( )
defer s . mu . Unlock ( )
old := s . idleList
s . idleList = le
return old
}
// invalidate marks a session as invalid and returns the old validity.
func ( s * session ) invalidate ( ) bool {
s . mu . Lock ( )
defer s . mu . Unlock ( )
ov := s . valid
s . valid = false
return ov
}
// setNextCheck sets the timestamp for next healthcheck on the session.
func ( s * session ) setNextCheck ( t time . Time ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . nextCheck = t
}
// setTransactionID sets the transaction id in the session
func ( s * session ) setTransactionID ( tx transactionID ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . tx = tx
}
// getID returns the session ID which uniquely identifies the session in Cloud Spanner.
func ( s * session ) getID ( ) string {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . id
}
// getHcIndex returns the session's index into the global healthcheck priority queue.
func ( s * session ) getHcIndex ( ) int {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . hcIndex
}
// getIdleList returns the session's link in its home session pool's idle list.
func ( s * session ) getIdleList ( ) * list . Element {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . idleList
}
// getNextCheck returns the timestamp for next healthcheck on the session.
func ( s * session ) getNextCheck ( ) time . Time {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . nextCheck
}
// recycle turns the session back to its home session pool.
func ( s * session ) recycle ( ) {
s . setTransactionID ( nil )
if ! s . pool . recycle ( s ) {
// s is rejected by its home session pool because it expired and the session pool is currently having enough number of open sessions.
s . destroy ( false )
}
}
// destroy removes the session from its home session pool, healthcheck queue and Cloud Spanner service.
func ( s * session ) destroy ( isExpire bool ) bool {
// Remove s from session pool.
if ! s . pool . remove ( s , isExpire ) {
return false
}
// Unregister s from healthcheck queue.
s . pool . hc . unregister ( s )
// Remove s from Cloud Spanner service.
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
// Ignore the error returned by runRetryable because even if we fail to explicitly destroy the session,
// it will be eventually garbage collected by Cloud Spanner.
2017-07-23 15:51:42 +08:00
err := runRetryable ( ctx , func ( ctx context . Context ) error {
2017-05-11 22:39:54 +08:00
_ , e := s . client . DeleteSession ( ctx , & sppb . DeleteSessionRequest { Name : s . getID ( ) } )
return e
} )
2017-07-23 15:51:42 +08:00
if err != nil && log . V ( 2 ) {
log . Warningf ( "Failed to delete session %v. Error: %v" , s . getID ( ) , err )
}
2017-05-11 22:39:54 +08:00
return true
}
// prepareForWrite prepares the session for write if it is not already in that state.
func ( s * session ) prepareForWrite ( ctx context . Context ) error {
if s . isWritePrepared ( ) {
return nil
}
tx , err := beginTransaction ( ctx , s . getID ( ) , s . client )
if err != nil {
return err
}
s . setTransactionID ( tx )
return nil
}
// SessionPoolConfig stores configurations of a session pool.
type SessionPoolConfig struct {
// getRPCClient is the caller supplied method for getting a gRPC client to Cloud Spanner, this makes session pool able to use client pooling.
getRPCClient func ( ) ( sppb . SpannerClient , error )
// MaxOpened is the maximum number of opened sessions that is allowed by the
2017-07-23 15:51:42 +08:00
// session pool. Default to NumChannels * 100.
2017-05-11 22:39:54 +08:00
MaxOpened uint64
// MinOpened is the minimum number of opened sessions that the session pool
// tries to maintain. Session pool won't continue to expire sessions if number
// of opened connections drops below MinOpened. However, if session is found
// to be broken, it will still be evicted from session pool, therefore it is
// posssible that the number of opened sessions drops below MinOpened.
MinOpened uint64
2017-09-30 22:27:27 +08:00
// maxSessionAge is the maximum duration that a session can be reused, zero
2017-05-11 22:39:54 +08:00
// means session pool will never expire sessions.
2017-09-30 22:27:27 +08:00
maxSessionAge time . Duration
// MaxIdle is the maximum number of idle sessions, pool is allowed to keep. Defaults to 0.
MaxIdle uint64
2017-07-23 15:51:42 +08:00
// MaxBurst is the maximum number of concurrent session creation requests. Defaults to 10.
2017-05-11 22:39:54 +08:00
MaxBurst uint64
// WriteSessions is the fraction of sessions we try to keep prepared for write.
WriteSessions float64
// HealthCheckWorkers is number of workers used by health checker for this pool.
HealthCheckWorkers int
2017-09-30 22:27:27 +08:00
// HealthCheckInterval is how often the health checker pings a session. Defaults to 5 min.
2017-05-11 22:39:54 +08:00
HealthCheckInterval time . Duration
2017-09-30 22:27:27 +08:00
// healthCheckMaintainerEnabled enables the session pool maintainer.
healthCheckMaintainerEnabled bool
// healthCheckSampleInterval is how often the health checker samples live session (for use in maintaining session pool size). Defaults to 1 min.
healthCheckSampleInterval time . Duration
2017-05-11 22:39:54 +08:00
}
// errNoRPCGetter returns error for SessionPoolConfig missing getRPCClient method.
func errNoRPCGetter ( ) error {
return spannerErrorf ( codes . InvalidArgument , "require SessionPoolConfig.getRPCClient != nil, got nil" )
}
// errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set.
func errMinOpenedGTMaxOpened ( spc * SessionPoolConfig ) error {
return spannerErrorf ( codes . InvalidArgument ,
"require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %v and %v" , spc . MaxOpened , spc . MinOpened )
}
// validate verifies that the SessionPoolConfig is good for use.
func ( spc * SessionPoolConfig ) validate ( ) error {
if spc . getRPCClient == nil {
return errNoRPCGetter ( )
}
if spc . MinOpened > spc . MaxOpened && spc . MaxOpened > 0 {
return errMinOpenedGTMaxOpened ( spc )
}
return nil
}
// sessionPool creates and caches Cloud Spanner sessions.
type sessionPool struct {
// mu protects sessionPool from concurrent access.
mu sync . Mutex
// valid marks the validity of the session pool.
valid bool
// db is the database name that all sessions in the pool are associated with.
db string
// idleList caches idle session IDs. Session IDs in this list can be allocated for use.
idleList list . List
// idleWriteList caches idle sessions which have been prepared for write.
idleWriteList list . List
// mayGetSession is for broadcasting that session retrival/creation may proceed.
mayGetSession chan struct { }
// numOpened is the total number of open sessions from the session pool.
numOpened uint64
// createReqs is the number of ongoing session creation requests.
createReqs uint64
// prepareReqs is the number of ongoing session preparation request.
prepareReqs uint64
// configuration of the session pool.
SessionPoolConfig
// Metadata to be sent with each request
md metadata . MD
// hc is the health checker
hc * healthChecker
}
// newSessionPool creates a new session pool.
func newSessionPool ( db string , config SessionPoolConfig , md metadata . MD ) ( * sessionPool , error ) {
if err := config . validate ( ) ; err != nil {
return nil , err
}
pool := & sessionPool {
db : db ,
valid : true ,
mayGetSession : make ( chan struct { } ) ,
SessionPoolConfig : config ,
md : md ,
}
if config . HealthCheckWorkers == 0 {
// With 10 workers and assuming average latency of 5 ms for BeginTransaction, we will be able to
// prepare 2000 tx/sec in advance. If the rate of takeWriteSession is more than that, it will
// degrade to doing BeginTransaction inline.
// TODO: consider resizing the worker pool dynamically according to the load.
config . HealthCheckWorkers = 10
}
if config . HealthCheckInterval == 0 {
config . HealthCheckInterval = 5 * time . Minute
}
2017-09-30 22:27:27 +08:00
if config . healthCheckSampleInterval == 0 {
config . healthCheckSampleInterval = time . Minute
}
2017-05-11 22:39:54 +08:00
// On GCE VM, within the same region an healthcheck ping takes on average 10ms to finish, given a 5 minutes interval and
// 10 healthcheck workers, a healthChecker can effectively mantain 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
2017-09-30 22:27:27 +08:00
pool . hc = newHealthChecker ( config . HealthCheckInterval , config . HealthCheckWorkers , config . healthCheckSampleInterval , pool )
close ( pool . hc . ready )
2017-05-11 22:39:54 +08:00
return pool , nil
}
// isValid checks if the session pool is still valid.
func ( p * sessionPool ) isValid ( ) bool {
if p == nil {
return false
}
p . mu . Lock ( )
defer p . mu . Unlock ( )
return p . valid
}
// close marks the session pool as closed.
func ( p * sessionPool ) close ( ) {
if p == nil {
return
}
p . mu . Lock ( )
if ! p . valid {
p . mu . Unlock ( )
return
}
p . valid = false
p . mu . Unlock ( )
p . hc . close ( )
// destroy all the sessions
p . hc . mu . Lock ( )
allSessions := make ( [ ] * session , len ( p . hc . queue . sessions ) )
copy ( allSessions , p . hc . queue . sessions )
p . hc . mu . Unlock ( )
for _ , s := range allSessions {
s . destroy ( false )
}
}
// errInvalidSessionPool returns error for using an invalid session pool.
func errInvalidSessionPool ( ) error {
return spannerErrorf ( codes . InvalidArgument , "invalid session pool" )
}
// errGetSessionTimeout returns error for context timeout during sessionPool.take().
func errGetSessionTimeout ( ) error {
return spannerErrorf ( codes . Canceled , "timeout / context canceled during getting session" )
}
// shouldPrepareWrite returns true if we should prepare more sessions for write.
func ( p * sessionPool ) shouldPrepareWrite ( ) bool {
return float64 ( p . numOpened ) * p . WriteSessions > float64 ( p . idleWriteList . Len ( ) + int ( p . prepareReqs ) )
}
func ( p * sessionPool ) createSession ( ctx context . Context ) ( * session , error ) {
doneCreate := func ( done bool ) {
p . mu . Lock ( )
if ! done {
// Session creation failed, give budget back.
p . numOpened --
}
p . createReqs --
// Notify other waiters blocking on session creation.
close ( p . mayGetSession )
p . mayGetSession = make ( chan struct { } )
p . mu . Unlock ( )
}
sc , err := p . getRPCClient ( )
if err != nil {
doneCreate ( false )
return nil , err
}
var s * session
err = runRetryable ( ctx , func ( ctx context . Context ) error {
sid , e := sc . CreateSession ( ctx , & sppb . CreateSessionRequest { Database : p . db } )
if e != nil {
return e
}
// If no error, construct the new session.
s = & session { valid : true , client : sc , id : sid . Name , pool : p , createTime : time . Now ( ) , md : p . md }
p . hc . register ( s )
return nil
} )
if err != nil {
doneCreate ( false )
// Should return error directly because of the previous retries on CreateSession RPC.
return nil , err
}
doneCreate ( true )
return s , nil
}
func ( p * sessionPool ) isHealthy ( s * session ) bool {
if s . getNextCheck ( ) . Add ( 2 * p . hc . getInterval ( ) ) . Before ( time . Now ( ) ) {
// TODO: figure out if we need to schedule a new healthcheck worker here.
if err := s . ping ( ) ; shouldDropSession ( err ) {
// The session is already bad, continue to fetch/create a new one.
s . destroy ( false )
return false
}
p . hc . scheduledHC ( s )
}
return true
}
// take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one.
// Session returned by take should be used for read operations.
func ( p * sessionPool ) take ( ctx context . Context ) ( * sessionHandle , error ) {
2017-07-23 15:51:42 +08:00
ctx = contextWithOutgoingMetadata ( ctx , p . md )
2017-05-11 22:39:54 +08:00
for {
var (
s * session
err error
)
p . mu . Lock ( )
if ! p . valid {
p . mu . Unlock ( )
return nil , errInvalidSessionPool ( )
}
if p . idleList . Len ( ) > 0 {
// Idle sessions are available, get one from the top of the idle list.
s = p . idleList . Remove ( p . idleList . Front ( ) ) . ( * session )
} else if p . idleWriteList . Len ( ) > 0 {
s = p . idleWriteList . Remove ( p . idleWriteList . Front ( ) ) . ( * session )
}
if s != nil {
s . setIdleList ( nil )
p . mu . Unlock ( )
// From here, session is no longer in idle list, so healthcheck workers won't destroy it.
// If healthcheck workers failed to schedule healthcheck for the session timely, do the check here.
// Because session check is still much cheaper than session creation, they should be reused as much as possible.
if ! p . isHealthy ( s ) {
continue
}
return & sessionHandle { session : s } , nil
}
// Idle list is empty, block if session pool has reached max session creation concurrency or max number of open sessions.
if ( p . MaxOpened > 0 && p . numOpened >= p . MaxOpened ) || ( p . MaxBurst > 0 && p . createReqs >= p . MaxBurst ) {
mayGetSession := p . mayGetSession
p . mu . Unlock ( )
select {
case <- ctx . Done ( ) :
return nil , errGetSessionTimeout ( )
case <- mayGetSession :
}
continue
}
// Take budget before the actual session creation.
p . numOpened ++
p . createReqs ++
p . mu . Unlock ( )
if s , err = p . createSession ( ctx ) ; err != nil {
return nil , toSpannerError ( err )
}
return & sessionHandle { session : s } , nil
}
}
// takeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one.
// Session returned should be used for read write transactions.
func ( p * sessionPool ) takeWriteSession ( ctx context . Context ) ( * sessionHandle , error ) {
2017-07-23 15:51:42 +08:00
ctx = contextWithOutgoingMetadata ( ctx , p . md )
2017-05-11 22:39:54 +08:00
for {
var (
s * session
err error
)
p . mu . Lock ( )
if ! p . valid {
p . mu . Unlock ( )
return nil , errInvalidSessionPool ( )
}
if p . idleWriteList . Len ( ) > 0 {
// Idle sessions are available, get one from the top of the idle list.
s = p . idleWriteList . Remove ( p . idleWriteList . Front ( ) ) . ( * session )
} else if p . idleList . Len ( ) > 0 {
s = p . idleList . Remove ( p . idleList . Front ( ) ) . ( * session )
}
if s != nil {
s . setIdleList ( nil )
p . mu . Unlock ( )
// From here, session is no longer in idle list, so healthcheck workers won't destroy it.
// If healthcheck workers failed to schedule healthcheck for the session timely, do the check here.
// Because session check is still much cheaper than session creation, they should be reused as much as possible.
if ! p . isHealthy ( s ) {
continue
}
if ! s . isWritePrepared ( ) {
if err = s . prepareForWrite ( ctx ) ; err != nil {
return nil , toSpannerError ( err )
}
}
return & sessionHandle { session : s } , nil
}
// Idle list is empty, block if session pool has reached max session creation concurrency or max number of open sessions.
if ( p . MaxOpened > 0 && p . numOpened >= p . MaxOpened ) || ( p . MaxBurst > 0 && p . createReqs >= p . MaxBurst ) {
mayGetSession := p . mayGetSession
p . mu . Unlock ( )
select {
case <- ctx . Done ( ) :
return nil , errGetSessionTimeout ( )
case <- mayGetSession :
}
continue
}
// Take budget before the actual session creation.
p . numOpened ++
p . createReqs ++
p . mu . Unlock ( )
if s , err = p . createSession ( ctx ) ; err != nil {
return nil , toSpannerError ( err )
}
if err = s . prepareForWrite ( ctx ) ; err != nil {
return nil , toSpannerError ( err )
}
return & sessionHandle { session : s } , nil
}
}
// recycle puts session s back to the session pool's idle list, it returns true if the session pool successfully recycles session s.
func ( p * sessionPool ) recycle ( s * session ) bool {
p . mu . Lock ( )
defer p . mu . Unlock ( )
if ! s . isValid ( ) || ! p . valid {
// Reject the session if session is invalid or pool itself is invalid.
return false
}
2017-09-30 22:27:27 +08:00
if p . maxSessionAge != 0 && s . createTime . Add ( p . maxSessionAge ) . Before ( time . Now ( ) ) && p . numOpened > p . MinOpened {
// session expires and number of opened sessions exceeds MinOpened, let the session itself.
2017-05-11 22:39:54 +08:00
return false
}
2017-09-30 22:27:27 +08:00
// Put session at the back of the list to round robin for load balancing across channels.
2017-05-11 22:39:54 +08:00
if s . isWritePrepared ( ) {
2017-09-30 22:27:27 +08:00
s . setIdleList ( p . idleWriteList . PushBack ( s ) )
2017-05-11 22:39:54 +08:00
} else {
2017-09-30 22:27:27 +08:00
s . setIdleList ( p . idleList . PushBack ( s ) )
2017-05-11 22:39:54 +08:00
}
// Broadcast that a session has been returned to idle list.
close ( p . mayGetSession )
p . mayGetSession = make ( chan struct { } )
return true
}
// remove atomically removes session s from the session pool and invalidates s.
// If isExpire == true, the removal is triggered by session expiration and in such cases, only idle sessions can be removed.
func ( p * sessionPool ) remove ( s * session , isExpire bool ) bool {
p . mu . Lock ( )
defer p . mu . Unlock ( )
if isExpire && ( p . numOpened <= p . MinOpened || s . getIdleList ( ) == nil ) {
// Don't expire session if the session is not in idle list (in use), or if number of open sessions is going below p.MinOpened.
return false
}
ol := s . setIdleList ( nil )
// If the session is in the idlelist, remove it.
if ol != nil {
// Remove from whichever list it is in.
p . idleList . Remove ( ol )
p . idleWriteList . Remove ( ol )
}
if s . invalidate ( ) {
// Decrease the number of opened sessions.
p . numOpened --
// Broadcast that a session has been destroyed.
close ( p . mayGetSession )
p . mayGetSession = make ( chan struct { } )
return true
}
return false
}
// hcHeap implements heap.Interface. It is used to create the priority queue for session healthchecks.
type hcHeap struct {
sessions [ ] * session
}
// Len impelemnts heap.Interface.Len.
func ( h hcHeap ) Len ( ) int {
return len ( h . sessions )
}
// Less implements heap.Interface.Less.
func ( h hcHeap ) Less ( i , j int ) bool {
return h . sessions [ i ] . getNextCheck ( ) . Before ( h . sessions [ j ] . getNextCheck ( ) )
}
// Swap implements heap.Interface.Swap.
func ( h hcHeap ) Swap ( i , j int ) {
h . sessions [ i ] , h . sessions [ j ] = h . sessions [ j ] , h . sessions [ i ]
h . sessions [ i ] . setHcIndex ( i )
h . sessions [ j ] . setHcIndex ( j )
}
// Push implements heap.Interface.Push.
func ( h * hcHeap ) Push ( s interface { } ) {
ns := s . ( * session )
ns . setHcIndex ( len ( h . sessions ) )
h . sessions = append ( h . sessions , ns )
}
// Pop implements heap.Interface.Pop.
func ( h * hcHeap ) Pop ( ) interface { } {
old := h . sessions
n := len ( old )
s := old [ n - 1 ]
h . sessions = old [ : n - 1 ]
s . setHcIndex ( - 1 )
return s
}
// healthChecker performs periodical healthchecks on registered sessions.
type healthChecker struct {
// mu protects concurrent access to hcQueue.
mu sync . Mutex
// queue is the priority queue for session healthchecks. Sessions with lower nextCheck rank higher in the queue.
queue hcHeap
// interval is the average interval between two healthchecks on a session.
interval time . Duration
// workers is the number of concurrent healthcheck workers.
workers int
// waitWorkers waits for all healthcheck workers to exit
waitWorkers sync . WaitGroup
// pool is the underlying session pool.
pool * sessionPool
2017-09-30 22:27:27 +08:00
// sampleInterval is the interval of sampling by the maintainer.
sampleInterval time . Duration
// ready is used to signal that maintainer can start running.
ready chan struct { }
// done is used to signal that health checker should be closed.
done chan struct { }
// once is used for closing channel done only once.
once sync . Once
2017-05-11 22:39:54 +08:00
}
// newHealthChecker initializes new instance of healthChecker.
2017-09-30 22:27:27 +08:00
func newHealthChecker ( interval time . Duration , workers int , sampleInterval time . Duration , pool * sessionPool ) * healthChecker {
2017-05-11 22:39:54 +08:00
if workers <= 0 {
workers = 1
}
hc := & healthChecker {
2017-09-30 22:27:27 +08:00
interval : interval ,
workers : workers ,
pool : pool ,
sampleInterval : sampleInterval ,
ready : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
}
if hc . pool . healthCheckMaintainerEnabled {
hc . waitWorkers . Add ( 1 )
go hc . maintainer ( )
2017-05-11 22:39:54 +08:00
}
2017-09-30 22:27:27 +08:00
for i := 1 ; i <= hc . workers ; i ++ {
2017-05-11 22:39:54 +08:00
hc . waitWorkers . Add ( 1 )
go hc . worker ( i )
}
return hc
}
// close closes the healthChecker and waits for all healthcheck workers to exit.
func ( hc * healthChecker ) close ( ) {
2017-09-30 22:27:27 +08:00
hc . once . Do ( func ( ) { close ( hc . done ) } )
2017-05-11 22:39:54 +08:00
hc . waitWorkers . Wait ( )
}
// isClosing checks if a healthChecker is already closing.
func ( hc * healthChecker ) isClosing ( ) bool {
2017-09-30 22:27:27 +08:00
select {
case <- hc . done :
return true
default :
return false
}
2017-05-11 22:39:54 +08:00
}
// getInterval gets the healthcheck interval.
func ( hc * healthChecker ) getInterval ( ) time . Duration {
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
return hc . interval
}
// scheduledHCLocked schedules next healthcheck on session s with the assumption that hc.mu is being held.
func ( hc * healthChecker ) scheduledHCLocked ( s * session ) {
// The next healthcheck will be scheduled after [interval*0.5, interval*1.5) nanoseconds.
nsFromNow := rand . Int63n ( int64 ( hc . interval ) ) + int64 ( hc . interval ) / 2
s . setNextCheck ( time . Now ( ) . Add ( time . Duration ( nsFromNow ) ) )
if hi := s . getHcIndex ( ) ; hi != - 1 {
// Session is still being tracked by healthcheck workers.
heap . Fix ( & hc . queue , hi )
}
}
// scheduledHC schedules next healthcheck on session s. It is safe to be called concurrently.
func ( hc * healthChecker ) scheduledHC ( s * session ) {
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
hc . scheduledHCLocked ( s )
}
// register registers a session with healthChecker for periodical healthcheck.
func ( hc * healthChecker ) register ( s * session ) {
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
hc . scheduledHCLocked ( s )
heap . Push ( & hc . queue , s )
}
// unregister unregisters a session from healthcheck queue.
func ( hc * healthChecker ) unregister ( s * session ) {
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
oi := s . setHcIndex ( - 1 )
if oi >= 0 {
heap . Remove ( & hc . queue , oi )
}
}
// markDone marks that health check for session has been performed.
func ( hc * healthChecker ) markDone ( s * session ) {
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
s . checkingHealth = false
}
// healthCheck checks the health of the session and pings it if needed.
func ( hc * healthChecker ) healthCheck ( s * session ) {
defer hc . markDone ( s )
if ! s . pool . isValid ( ) {
// Session pool is closed, perform a garbage collection.
s . destroy ( false )
return
}
2017-09-30 22:27:27 +08:00
if s . pool . maxSessionAge != 0 && s . createTime . Add ( s . pool . maxSessionAge ) . Before ( time . Now ( ) ) {
2017-05-11 22:39:54 +08:00
// Session reaches its maximum age, retire it. Failing that try to refresh it.
if s . destroy ( true ) || ! s . refreshIdle ( ) {
return
}
}
if err := s . ping ( ) ; shouldDropSession ( err ) {
// Ping failed, destroy the session.
s . destroy ( false )
}
}
// worker performs the healthcheck on sessions in healthChecker's priority queue.
func ( hc * healthChecker ) worker ( i int ) {
// Returns a session which we should ping to keep it alive.
getNextForPing := func ( ) * session {
hc . pool . mu . Lock ( )
defer hc . pool . mu . Unlock ( )
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
if hc . queue . Len ( ) <= 0 {
// Queue is empty.
return nil
}
s := hc . queue . sessions [ 0 ]
if s . getNextCheck ( ) . After ( time . Now ( ) ) && hc . pool . valid {
// All sessions have been checked recently.
return nil
}
hc . scheduledHCLocked ( s )
if ! s . checkingHealth {
s . checkingHealth = true
return s
}
return nil
}
// Returns a session which we should prepare for write.
getNextForTx := func ( ) * session {
hc . pool . mu . Lock ( )
defer hc . pool . mu . Unlock ( )
if hc . pool . shouldPrepareWrite ( ) {
if hc . pool . idleList . Len ( ) > 0 && hc . pool . valid {
hc . mu . Lock ( )
defer hc . mu . Unlock ( )
if hc . pool . idleList . Front ( ) . Value . ( * session ) . checkingHealth {
return nil
}
session := hc . pool . idleList . Remove ( hc . pool . idleList . Front ( ) ) . ( * session )
session . checkingHealth = true
hc . pool . prepareReqs ++
return session
}
}
return nil
}
for {
if hc . isClosing ( ) {
// Exit when the pool has been closed and all sessions have been destroyed
// or when health checker has been closed.
hc . waitWorkers . Done ( )
return
}
ws := getNextForTx ( )
if ws != nil {
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
2017-09-30 22:27:27 +08:00
err := ws . prepareForWrite ( contextWithOutgoingMetadata ( ctx , hc . pool . md ) )
cancel ( )
if err != nil {
// TODO(dixiao): handle error properly
log . Errorf ( "prepareForWrite failed: %v" , err )
}
2017-05-11 22:39:54 +08:00
hc . pool . recycle ( ws )
hc . pool . mu . Lock ( )
hc . pool . prepareReqs --
hc . pool . mu . Unlock ( )
hc . markDone ( ws )
}
rs := getNextForPing ( )
if rs == nil {
if ws == nil {
// No work to be done so sleep to avoid burning cpu
pause := int64 ( 100 * time . Millisecond )
if pause > int64 ( hc . interval ) {
pause = int64 ( hc . interval )
}
2017-09-30 22:27:27 +08:00
select {
case <- time . After ( time . Duration ( rand . Int63n ( pause ) + pause / 2 ) ) :
break
case <- hc . done :
break
}
2017-05-11 22:39:54 +08:00
}
continue
}
hc . healthCheck ( rs )
}
}
2017-09-30 22:27:27 +08:00
// maintainer maintains the maxSessionsInUse by a window of kWindowSize * sampleInterval.
// Based on this information, health checker will try to maintain the number of sessions by hc..
func ( hc * healthChecker ) maintainer ( ) {
// Wait so that pool is ready.
<- hc . ready
var (
windowSize uint64 = 10
iteration uint64
timeout <- chan time . Time
)
// replenishPool is run if numOpened is less than sessionsToKeep, timeouts on sampleInterval.
replenishPool := func ( sessionsToKeep uint64 ) {
ctx , _ := context . WithTimeout ( context . Background ( ) , hc . sampleInterval )
for {
select {
case <- timeout :
return
default :
break
}
p := hc . pool
p . mu . Lock ( )
// Take budget before the actual session creation.
if sessionsToKeep <= p . numOpened {
p . mu . Unlock ( )
break
}
p . numOpened ++
p . createReqs ++
shouldPrepareWrite := p . shouldPrepareWrite ( )
p . mu . Unlock ( )
var (
s * session
err error
)
if s , err = p . createSession ( ctx ) ; err != nil {
log . Warningf ( "Failed to create session, error: %v" , toSpannerError ( err ) )
continue
}
if shouldPrepareWrite {
if err = s . prepareForWrite ( ctx ) ; err != nil {
log . Warningf ( "Failed to prepare session, error: %v" , toSpannerError ( err ) )
continue
}
}
p . recycle ( s )
}
}
// shrinkPool, scales down the session pool.
shrinkPool := func ( sessionsToKeep uint64 ) {
for {
select {
case <- timeout :
return
default :
break
}
p := hc . pool
p . mu . Lock ( )
if sessionsToKeep >= p . numOpened {
p . mu . Unlock ( )
break
}
var s * session
if p . idleList . Len ( ) > 0 {
s = p . idleList . Front ( ) . Value . ( * session )
} else if p . idleWriteList . Len ( ) > 0 {
s = p . idleWriteList . Front ( ) . Value . ( * session )
}
p . mu . Unlock ( )
if s != nil {
// destroy session as expire.
s . destroy ( true )
} else {
break
}
}
}
for {
if hc . isClosing ( ) {
hc . waitWorkers . Done ( )
return
}
// maxSessionsInUse is the maximum number of sessions in use concurrently over a period of time.
var maxSessionsInUse uint64
// Updates metrics.
hc . pool . mu . Lock ( )
currSessionsInUse := hc . pool . numOpened - uint64 ( hc . pool . idleList . Len ( ) ) - uint64 ( hc . pool . idleWriteList . Len ( ) )
currSessionsOpened := hc . pool . numOpened
hc . pool . mu . Unlock ( )
hc . mu . Lock ( )
if iteration % windowSize == 0 || maxSessionsInUse < currSessionsInUse {
maxSessionsInUse = currSessionsInUse
}
sessionsToKeep := maxUint64 ( hc . pool . MinOpened ,
minUint64 ( currSessionsOpened , hc . pool . MaxIdle + maxSessionsInUse ) )
hc . mu . Unlock ( )
timeout = time . After ( hc . sampleInterval )
// Replenish or Shrink pool if needed.
// Note: we don't need to worry about pending create session requests, we only need to sample the current sessions in use.
// the routines will not try to create extra / delete creating sessions.
if sessionsToKeep > currSessionsOpened {
replenishPool ( sessionsToKeep )
} else {
shrinkPool ( sessionsToKeep )
}
select {
case <- timeout :
break
case <- hc . done :
break
}
iteration ++
}
}
2017-05-11 22:39:54 +08:00
// shouldDropSession returns true if a particular error leads to the removal of a session
func shouldDropSession ( err error ) bool {
if err == nil {
return false
}
// If a Cloud Spanner can no longer locate the session (for example, if session is garbage collected), then caller
// should not try to return the session back into the session pool.
// TODO: once gRPC can return auxilary error information, stop parsing the error message.
if ErrCode ( err ) == codes . NotFound && strings . Contains ( ErrDesc ( err ) , "Session not found:" ) {
return true
}
return false
}