GoLMDB

Artifact [90f29f91a8]
Login

Artifact 90f29f91a80704354f085ac79d936d966c9a94407017f6959d36d0f32c60f78c:


package golmdb_test

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"math/rand"
	"os"
	"sync"
	"testing"
	"time"

	"github.com/matryer/is"
	"wellquite.org/golmdb"
)

// Running this with -soak=100 takes around 100 seconds. Don't forget
// the default timeout is 10m, so you might want -timeout=0 for long
// runs.
//
// This will also need about 200MB of space in your TMPDIR
func TestSoak(t *testing.T) {
	if soak == 0 {
		t.Skip("-soak flag not provided or 0, so skipping.")
	}

	SetGlobalLogLevelDebug()
	log := NewTestLogger(t)
	is := is.New(t)

	log.Info().Int64("seed", seed).Send()

	rng := rand.New(rand.NewSource(seed))

	// create a million keys (numbers) with random values
	keys, keyValueMap := makeAllKeys(rng)
	// shuffle them
	rng.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] })

	client, dir, err := createDatabase(log, 16)
	is.NoErr(err)
	defer os.RemoveAll(dir)
	defer client.TerminateSync()

	log.Info().Msg("Loading key-value pairs")
	dbRef, err := createDBRef(client, t.Name(), 0)
	// chuck all the key value pairs in. We could range through
	// keyValueMap but that's non-deterministic, and I'd prefer to keep
	// as much as possible deterministic.
	err = client.Update(func(rwtxn *golmdb.ReadWriteTxn) (err error) {
		for _, key := range keys {
			val := keyValueMap[key]
			if err = rwtxn.Put(dbRef, key[:], val[:], 0); err != nil {
				return err
			}
		}
		return err
	})
	is.NoErr(err)

	// we're going to spawn a number of go routines to do reading of
	// the data base. They need to report errors back to the main
	// thread.
	errLock := new(sync.Mutex)
	var errs []error

	maxViewersPerRound := 32 // must not be greater than numReaders
	totalGetCount := 0
	totalRePutCount := 0

	log.Info().Msg("Reading and modifying concurrently")

	// The "big idea" here is that for each "round", spawn a number of
	// readers. They all concurrently start View transactions, and
	// verify that some prefix of keys can all be read and match what
	// we think should be there.
	//
	// Once they've all started, LMDB should give them all a snapshot
	// of the database, so we can then go on and modify the database,
	// but the viewers should see the previous version - the snapshot.
	//
	// Once the modifications are done, we update our expectations
	// (carefully - viewers could still be running and they need to
	// continue to expect the previous set of expectations), and then
	// go on to the next "round".
	//
	// So reads and writes can happen at the same time, but writes
	// should only affect reads that _start_ _after_ the _write_ has
	// _finished_ (committed).

	start := time.Now()
	allViewersFinishedWG := new(sync.WaitGroup)

	for roundNum := uint(0); roundNum < soak; roundNum++ {
		// how many viewers to launch this round?
		viewersCount := 1 + rng.Intn(maxViewersPerRound)
		// to wait for this round's viewers to start:
		viewersStartedWG := new(sync.WaitGroup)
		viewersStartedWG.Add(viewersCount)
		allViewersFinishedWG.Add(viewersCount)

		keyValueMapSnapshot := keyValueMap
		for viewNum := 0; viewNum < viewersCount; viewNum++ {
			// each viewer can check a different prefix of keys
			toCheck := keys[0 : 1+rng.Intn(len(keys))]
			totalGetCount += len(toCheck)
			go runViewer(allViewersFinishedWG, viewersStartedWG, client, dbRef, toCheck, keyValueMapSnapshot, errLock, &errs)
		}

		// wait until all the viewers for this round have started, and
		// thus established their snapshot with LMDB.
		viewersStartedWG.Wait()

		// what are we going to modify?
		toModify := keys[0 : 1+rng.Intn(len(keys))]
		keyValueMapModified, modifiedCount, err := modify(rng, client, dbRef, keyValueMap, toModify)
		is.NoErr(err)
		// add in to keyValueMapModified everything from keyValueMap that we've not modified (i.e. was after toModify)
		for _, key := range keys[len(toModify):] {
			keyValueMapModified[key] = keyValueMap[key]
		}
		// pointer swizzle
		keyValueMap = keyValueMapModified

		totalRePutCount += modifiedCount
		log.Debug().Uint("round", roundNum).Int("viewers", viewersCount).Int("re-puts", modifiedCount).Send()
	}

	allViewersFinishedWG.Wait()
	end := time.Now()
	errLock.Lock()
	if len(errs) > 0 {
		t.Error(errs)
	}
	errLock.Unlock()

	elapsed := end.Sub(start)

	log.Info().Str("duration", elapsed.String()).Int("total gets", totalGetCount).Int("total re-puts", totalRePutCount).Send()
	log.Info().Float64("rough gets per second", float64(totalGetCount)*float64(time.Second)/float64(elapsed)).Float64("rough re-puts per second", float64(totalRePutCount)*float64(time.Second)/float64(elapsed)).Send()
}

func makeAllKeys(rng *rand.Rand) ([][8]byte, map[[8]byte][8]byte) {
	const lim = 1024 * 1024
	keys := make([][8]byte, lim)
	keyValueMap := make(map[[8]byte][8]byte, lim)
	for idx := 0; idx < lim; idx++ {
		key := [8]byte{}
		binary.BigEndian.PutUint64(key[:], uint64(idx))
		keys[idx] = key
		val := [8]byte{}
		binary.BigEndian.PutUint64(val[:], rng.Uint64())
		keyValueMap[key] = val
	}
	return keys, keyValueMap
}

func runViewer(allViewersFinishedWG, viewersStartedWG *sync.WaitGroup, client *golmdb.LMDBClient, dbRef golmdb.DBRef, toCheck [][8]byte, keyValueMapSnapshot map[[8]byte][8]byte, errLock *sync.Mutex, errs *[]error) {
	defer allViewersFinishedWG.Done()
	err := client.View(func(rotxn *golmdb.ReadOnlyTxn) (err error) {
		viewersStartedWG.Done()
		for idx, key := range toCheck {
			expected := keyValueMapSnapshot[key]
			val, err := rotxn.Get(dbRef, key[:])
			if err != nil {
				return err
			}
			if !bytes.Equal(expected[:], val) {
				return fmt.Errorf("(%d) For key %v, expected %v, got %v", idx, key, expected[:], val)
			}
		}
		return err
	})
	if err != nil {
		errLock.Lock()
		*errs = append(*errs, err)
		errLock.Unlock()
	}
}

func modify(rng *rand.Rand, client *golmdb.LMDBClient, dbRef golmdb.DBRef, keyValueMap map[[8]byte][8]byte, toModify [][8]byte) (keyValueMapModified map[[8]byte][8]byte, modifiedCount int, err error) {
	// Because the txn can run multiple times (the txn could fail due
	// to out of space and so gets re-run once the database size is
	// increased), we want to form these skips outside the txn so that
	// the rng stays deterministic across multiple runs.
	//
	// Also because the txn can run multiple times, we need to be
	// careful about resetting keyValueMapModified and modifiedCount at
	// the start of each txn run.
	skips := make([]bool, len(toModify))
	for idx := range skips {
		skips[idx] = rng.Intn(3) != 0
	}
	err = client.Update(func(rwtxn *golmdb.ReadWriteTxn) (err error) {
		keyValueMapModified = make(map[[8]byte][8]byte, len(keyValueMap))
		modifiedCount = 0
		for idx, key := range toModify {
			val := keyValueMap[key]
			if skips[idx] {
				keyValueMapModified[key] = val
				continue
			}
			binary.BigEndian.PutUint64(val[:], rng.Uint64())
			keyValueMapModified[key] = val
			if err = rwtxn.Put(dbRef, key[:], val[:], 0); err != nil {
				return err
			}
			modifiedCount++
		}
		return err
	})
	return keyValueMapModified, modifiedCount, err
}