From a85e75b2b19b1defa90364e8131b373489025ee6 Mon Sep 17 00:00:00 2001
From: zeripath <art27@cantab.net>
Date: Fri, 19 Nov 2021 01:13:25 +0000
Subject: [PATCH] Prevent deadlock in TestPersistableChannelQueue (#17717)

* Prevent deadlock in TestPersistableChannelQueue

There is a potential deadlock in TestPersistableChannelQueue due to attempting to
shutdown the test queue before it is ready.

Signed-off-by: Andrew Thornton <art27@cantab.net>

* prevent npe

Signed-off-by: Andrew Thornton <art27@cantab.net>
---
 modules/queue/queue_disk_channel_test.go | 33 ++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index d464a183a0f..c90d715a73c 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -18,6 +18,9 @@ func TestPersistableChannelQueue(t *testing.T) {
 	handleChan := make(chan *testData)
 	handle := func(data ...Data) {
 		for _, datum := range data {
+			if datum == nil {
+				continue
+			}
 			testDatum := datum.(*testData)
 			handleChan <- testDatum
 		}
@@ -42,13 +45,26 @@ func TestPersistableChannelQueue(t *testing.T) {
 	}, &testData{})
 	assert.NoError(t, err)
 
+	readyForShutdown := make(chan struct{})
+	readyForTerminate := make(chan struct{})
+
 	go queue.Run(func(shutdown func()) {
 		lock.Lock()
 		defer lock.Unlock()
+		select {
+		case <-readyForShutdown:
+		default:
+			close(readyForShutdown)
+		}
 		queueShutdown = append(queueShutdown, shutdown)
 	}, func(terminate func()) {
 		lock.Lock()
 		defer lock.Unlock()
+		select {
+		case <-readyForTerminate:
+		default:
+			close(readyForTerminate)
+		}
 		queueTerminate = append(queueTerminate, terminate)
 	})
 
@@ -74,6 +90,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 	err = queue.Push(test1)
 	assert.Error(t, err)
 
+	<-readyForShutdown
 	// Now shutdown the queue
 	lock.Lock()
 	callbacks := make([]func(), len(queueShutdown))
@@ -97,6 +114,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 	}
 
 	// terminate the queue
+	<-readyForTerminate
 	lock.Lock()
 	callbacks = make([]func(), len(queueTerminate))
 	copy(callbacks, queueTerminate)
@@ -123,13 +141,26 @@ func TestPersistableChannelQueue(t *testing.T) {
 	}, &testData{})
 	assert.NoError(t, err)
 
+	readyForShutdown = make(chan struct{})
+	readyForTerminate = make(chan struct{})
+
 	go queue.Run(func(shutdown func()) {
 		lock.Lock()
 		defer lock.Unlock()
+		select {
+		case <-readyForShutdown:
+		default:
+			close(readyForShutdown)
+		}
 		queueShutdown = append(queueShutdown, shutdown)
 	}, func(terminate func()) {
 		lock.Lock()
 		defer lock.Unlock()
+		select {
+		case <-readyForTerminate:
+		default:
+			close(readyForTerminate)
+		}
 		queueTerminate = append(queueTerminate, terminate)
 	})
 
@@ -141,6 +172,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 	assert.Equal(t, test2.TestString, result4.TestString)
 	assert.Equal(t, test2.TestInt, result4.TestInt)
 
+	<-readyForShutdown
 	lock.Lock()
 	callbacks = make([]func(), len(queueShutdown))
 	copy(callbacks, queueShutdown)
@@ -148,6 +180,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 	for _, callback := range callbacks {
 		callback()
 	}
+	<-readyForTerminate
 	lock.Lock()
 	callbacks = make([]func(), len(queueTerminate))
 	copy(callbacks, queueTerminate)