By Brian Sutioso
Any development team will come across many different kinds of bugs that are hard to eliminate. In this post, we will try to tackle one of the most common problems most programmers will face: race conditions. Using multiple threads in a computer program may serve as a convenient way of increasing performance by allowing tasks to run in parallel. This is especially useful in computers with multiple processors but it comes with the cost of potential race condition errors.
What are race conditions?
Data races occur when a system outputs an undesirable result due to improper sequencing of events. In Go, this happens when more than one goroutine accesses a value more than once. This results in a ‘race’ to modify the value which, when executed incorrectly, will cause a logical error.
For example, consider a program with the following order. We start with an integer 100, subtract 50, then divide the result by 2. Now if we put the first and second operation into separate goroutines and run them in parallel, chances are that the final integer will end up being 0 instead of 25.
Mutexes and WaitGroup
When performing an audit, it is important to note the uses of mutexes and waitgroups. Also note that there are two forms of locking in Go: coarse-grained and fine-grained. Coarse-grained locking involves locking an entire data structure with one lock while fine-grained locking focuses on locking individual components of a data structure.
Mutexes provide a thread safe way of accessing a shared resource. Think of mutexes as locks with a single key scenario. When a goroutine attempts to manipulate a particular value, mutexes ensure that it is unlocked before allowing any modifications. It then locks the data value to prevent other goroutines from making changes. This ensures that operations flow in the correct order as intended.
WaitGroup allows a program to wait for a set of concurrent operations to complete before executing the following lines of code. It emulates a thread-safe counter which keeps track of the number of goroutines to complete before it exits. This is done by using the Add function to increment the counter and calling the Done function within a goroutine to decrement it. The Wait function is used to block until the counter hits 0.
How to find race conditions in Go programs
Extensive logging: Verbosely printing out every variable manipulation proves a simple but effective manner of keeping race conditions in check. In the event of a blocking program, you would be able to pinpoint the location of error easily by looking at debug logs and mapping out what happens sequentially.
Go -race flag: Go provides a race detector you can use to check for race conditions. Add a -race flag in the command line and let Go do the work. (Note: Go only detects race conditions when it is consistently occurring)
Concurrent test cases: It’s as simple as it sounds. Running a function in a goroutine 20 times tests for concurrency well. Especially when done with extensive logging.
Read through the code: Make sure every value is properly locked before manipulating it in a goroutine. Refer to Mutexes and WaitGroups to see how concurrency can be achieved in a multithreaded scenario.
Let’s take a look at Meson as an example
One of the important ways of ensuring concurrency is by making sure every data structure is thread-safe. We can take a look at a queue data structure within the Meson codebase that is specially designed as an egress FIFO queue for messages sent by the client.
// Queue is our in-memory queue implementation used as our egress FIFO queue
// for messages sent by the client.
type Queue struct {
sync.Mutex
content \[constants.MaxEgressQueueSize\]Item
readHead int
writeHead int
len int
}
// Push pushes the given message ref onto the queue and returns nil
// on success, otherwise an error is returned.
func (q *Queue) Push(e Item) error {
q.Lock()
defer q.Unlock()
if q.len >= constants.MaxEgressQueueSize {
return ErrQueueFull
}
q.content\[q.writeHead\] = e
q.writeHead = (q.writeHead + 1) % constants.MaxEgressQueueSize
q.len++
return nil
}
// Pop pops the next message ref off the queue and returns nil
// upon success, otherwise an error is returned.
func (q *Queue) Pop() (Item, error) {
q.Lock()
defer q.Unlock()
if q.len <= 0 {
return nil, ErrQueueEmpty
}
result := q.content\[q.readHead\]
q.content\[q.readHead\] = &Message{}
q.readHead = (q.readHead + 1) % constants.MaxEgressQueueSize
q.len--
return result, nil
}
// Peek returns the next message ref from the queue without
// modifying the queue.
func (q *Queue) Peek() (Item, error) {
q.Lock()
defer q.Unlock()
if q.len <= 0 {
return nil, ErrQueueEmpty
}
result := q.content\[q.readHead\]
return result, nil
}
From this code snippet, we can clearly see that the data structure requires the employment of coarse-grained locking. For every push, pop, or peek operation, the queue needs to be locked and unlocked. This prevents any of the fields in the data structure from being modified in multiple threads at the same time.
However, more concretely, how do we test for concurrency issues? If we scan through the codebase, the main potential for race conditions would lie in the send functions in client/send.go.
func (s *Session) sendDropDecoy() {
s.log.Info("sending drop decoy")
serviceDesc, err := s.GetService(cConstants.LoopService)
if err != nil {
s.fatalErrCh <- errors.New("failure to get loop service")
return
}
payload := make([]byte, constants.UserForwardPayloadLength)
id := [cConstants.MessageIDLength]byte{}
_, err = io.ReadFull(rand.Reader, id[:])
if err != nil {
s.fatalErrCh <- errors.New("failure to generate message ID for drop decoy")
return
}
msg := &Message{
ID: &id,
Recipient: serviceDesc.Name,
Provider: serviceDesc.Provider,
Payload: payload[:],
WithSURB: false,
IsDecoy: true,
}
s.doSend(msg)
}
func (s *Session) sendLoopDecoy() {
s.log.Info("sending loop decoy")
serviceDesc, err := s.GetService(cConstants.LoopService)
if err != nil {
s.fatalErrCh <- errors.New("failure to get loop service")
return
}
payload := make([]byte, constants.UserForwardPayloadLength)
id := [cConstants.MessageIDLength]byte{}
_, err = io.ReadFull(rand.Reader, id[:])
if err != nil {
s.fatalErrCh <- errors.New("failure to generate message ID for loop decoy")
return
}
msg := &Message{
ID: &id,
Recipient: serviceDesc.Name,
Provider: serviceDesc.Provider,
Payload: payload[:],
WithSURB: true,
IsDecoy: true,
}
defer s.incrementDecoyLoopTally()
s.doSend(msg)
}
// SendUnreliableMessage asynchronously sends message without any automatic retransmissions.
func (s *Session) SendUnreliableMessage(recipient, provider string, message []byte) (*[cConstants.MessageIDLength]byte, error) {
msg, err := s.composeMessage(recipient, provider, message, false)
if err != nil {
return nil, err
}
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
}
return msg.ID, nil
}
func (s *Session) BlockingSendUnreliableMessage(recipient, provider string, message []byte) ([]byte, error) {
msg, err := s.composeMessage(recipient, provider, message, true)
if err != nil {
return nil, err
}
sentWaitChan := make(chan *Message)
s.sentWaitChanMap.Store(*msg.ID, sentWaitChan)
defer s.sentWaitChanMap.Delete(*msg.ID)
replyWaitChan := make(chan []byte)
s.replyWaitChanMap.Store(*msg.ID, replyWaitChan)
defer s.replyWaitChanMap.Delete(*msg.ID)
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
}
// wait until sent so that we know the ReplyETA for the waiting below
sentMessage := <-sentWaitChan
// if the message failed to send we will receive a nil message
if sentMessage == nil {
return nil, ErrMessageNotSent
}
// wait for reply or round trip timeout
select {
case reply := <-replyWaitChan:
return reply, nil
// these timeouts are often far too aggressive
case <-time.After(sentMessage.ReplyETA + cConstants.RoundTripTimeSlop):
return nil, ErrReplyTimeout
}
// unreachable
}
Here, we can see that we have several functions to send messages. First, we set up a register function to set up the configurations. Then, we can go straight into testing.
package client
import (
"fmt"
"testing"
"time"
"github.com/hashcloak/Meson/client/config"
"github.com/katzenpost/client/utils"
)
func register() (*Client, *Session, *utils.ServiceDescriptor) {
cfg, err := config.LoadFile("client_test.toml")
if err != nil {
panic(err)
}
_ = cfg.UpdateTrust()
_ = cfg.SaveConfig("client_test.toml")
linkKey := AutoRegisterRandomClient(cfg)
c, err := NewFromConfig(cfg, "echo")
if err != nil {
c.Shutdown()
panic(err)
}
s, err := c.NewSession(linkKey)
if err != nil {
c.Shutdown()
panic(err)
}
serviceDesc, err := s.GetService("echo")
if err != nil {
c.Shutdown()
panic(err)
}
return c, s, serviceDesc
}
func TestBasicBlockingSend(t *testing.T) {
c, s, serviceDesc := register()
fmt.Printf("Sending Sphinx packet payload to: %s@%s\n", serviceDesc.Name, serviceDesc.Provider)
resp, err := s.BlockingSendUnreliableMessage(serviceDesc.Name, serviceDesc.Provider, []byte(`Data encryption is used widely today!`))
if err != nil {
c.Shutdown()
panic(err)
}
payload, err := ValidateReply(resp)
if err != nil {
c.Shutdown()
panic(err)
}
fmt.Printf("Return: %s\n", payload)
c.Shutdown()
}
func TestBasicNonBlockingSend(t *testing.T) {
c, s, serviceDesc := register()
fmt.Printf("Sending Sphinx packet payload to: %s@%s\n", serviceDesc.Name, serviceDesc.Provider)
id, err := s.SendUnreliableMessage(serviceDesc.Name, serviceDesc.Provider, []byte(`Data encryption is used widely today!`))
if err != nil {
c.Shutdown()
panic(err)
}
fmt.Printf("Return: %s\n", id)
c.Shutdown()
}
func TestSendingBlockingConcurrently(t *testing.T) {
c, s, serviceDesc := register()
done := make(chan struct{})
timeout := time.After(time.Second * 80)
fmt.Printf("Concurrently Sending 20 Sphinx packet payloads to: %s@%s\n", serviceDesc.Name, serviceDesc.Provider)
for i := 0; i < 20; i++ {
go func(i int) {
fmt.Printf("Iteration %d\n", i+1)
resp, err := s.BlockingSendUnreliableMessage(serviceDesc.Name, serviceDesc.Provider, []byte(fmt.Sprintf("Data encryption is used widely today! Iteration: %d", i+1)))
if err != nil {
c.Shutdown()
panic(err)
}
payload, err := ValidateReply(resp)
if err != nil {
c.Shutdown()
panic(err)
}
fmt.Printf("Return: %s\n", payload)
done <- struct{}{}
}(i)
}
for i := 0; i < 20; i++ {
select {
case <-done:
case <-timeout:
c.Shutdown()
panic("Timeout")
}
}
c.Shutdown()
}
func TestSendingNonBlockingConcurrently(t *testing.T) {
c, s, serviceDesc := register()
done := make(chan struct{})
timeout := time.After(time.Second * 80)
fmt.Printf("Concurrently Sending 20 Sphinx packet payloads to: %s@%s\n", serviceDesc.Name, serviceDesc.Provider)
for i := 0; i < 20; i++ {
go func(i int) {
fmt.Printf("Iteration %d\n", i+1)
id, err := s.SendUnreliableMessage(serviceDesc.Name, serviceDesc.Provider, []byte(fmt.Sprintf("Data encryption is used widely today! Iteration: %d", i+1)))
if err != nil {
c.Shutdown()
panic(err)
}
fmt.Printf("Msg ID: %s\n", id)
done <- struct{}{}
}(i)
}
for i := 0; i < 20; i++ {
select {
case <-done:
case <-timeout:
c.Shutdown()
panic("Timeout")
}
}
c.Shutdown()
}
func TestSendingDropLoopDecoy(t *testing.T) {
c, s, serviceDesc := register()
done := make(chan struct{})
timeout := time.After(time.Second * 80)
fmt.Printf("Sending 20 Drop and Loop decoys to: %s@%s\n", serviceDesc.Name, serviceDesc.Provider)
for i := 0; i < 20; i++ {
go func(i int) {
fmt.Printf("Iteration %d\n", i+1)
s.sendDropDecoy()
s.sendLoopDecoy()
done <- struct{}{}
}(i)
}
for i := 0; i < 20; i++ {
select {
case <-done:
case <-timeout:
c.Shutdown()
panic("Timeout")
case <-s.fatalErrCh:
c.Shutdown()
panic("Fatal error")
}
}
c.Shutdown()
}
The test cases above check for concurrency issues during sending blocking, non-blocking, drop, and loop messages. We can see that we employed a goroutine to run those functions 20 times every time the parent function is called. A 'done' channel is used to keep track of the number of times the goroutine successfully runs.
One potential red flag for a concurrency error within the code is when the timeout counter runs out. This happens when the 'done' channel blocks. (Which should not be occuring when running a thread safe function) Multiple logging should then be done to pinpoint the bugs.
Conclusion
In this article, we introduced the concept of race conditions along with important relevant terminologies. We also touched upon several methods in which we can catch concurrency bugs and worked through sample test cases that were used to audit Meson.
Check out the Meson repository by clicking this here!