// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/skiplist.cuh" //#include #include #include "leveldb/env.h" #include "cassert" #include "port/port.h" #include "port/thread_annotations.h" #include "util/arena.cuh" #include "util/hash.h" #include "util/random.cuh" #include "util/testutil.h" #include "util/cuda_gtest_plugin.h" namespace leveldb { typedef uint64_t Key; struct Comparator { __device__ int operator()(const Key& a, const Key& b) const { if (a < b) { return -1; } else if (a > b) { return +1; } else { return 0; } } }; /* TEST(SkipTest, InsertAndLookup) { } // We want to make sure that with a single writer and multiple // concurrent readers (with no synchronization other than when a // reader's iterator is created), the reader always observes all the // data that was present in the skip list when the iterator was // constructed. Because insertions are happening concurrently, we may // also observe new values that were inserted since the iterator was // constructed, but we should never miss any values that were present // at iterator construction time. // // We generate multi-part keys: // // where: // key is in range [0..K-1] // gen is a generation number for key // hash is hash(key,gen) // // The insertion code picks a random key, sets gen to be 1 + the last // generation number inserted for that key, and sets hash to Hash(key,gen). // // At the beginning of a read, we snapshot the last inserted // generation number for each key. We then iterate, including random // calls to Next() and Seek(). For every key we encounter, we // check that it is either expected given the initial snapshot or has // been concurrently added since the iterator started. class ConcurrentTest { private: static constexpr uint32_t K = 4; static uint64_t key(Key key) { return (key >> 40); } static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } static uint64_t hash(Key key) { return key & 0xff; } static uint64_t HashNumbers(uint64_t k, uint64_t g) { uint64_t data[2] = {k, g}; return Hash(reinterpret_cast(data), sizeof(data), 0); } static Key MakeKey(uint64_t k, uint64_t g) { static_assert(sizeof(Key) == sizeof(uint64_t), ""); assert(k <= K); // We sometimes pass K to seek to the end of the skiplist assert(g <= 0xffffffffu); return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff)); } static bool IsValidKey(Key k) { return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff); } static Key RandomTarget(Random* rnd) { switch (rnd->Next() % 10) { case 0: // Seek to beginning return MakeKey(0, 0); case 1: // Seek to end return MakeKey(K, 0); default: // Seek to middle return MakeKey(rnd->Next() % K, 0); } } // Per-key generation struct State { std::atomic generation[K]; void Set(int k, int v) { generation[k].store(v, std::memory_order_release); } int Get(int k) { return generation[k].load(std::memory_order_acquire); } State() { for (int k = 0; k < K; k++) { Set(k, 0); } } }; // Current state of the test State current_; Arena arena_; // SkipList is not protected by mu_. We just use a single writer // thread to modify it. SkipList list_; public: ConcurrentTest() : list_(Comparator(), &arena_) {} // REQUIRES: External synchronization void WriteStep(Random* rnd) { const uint32_t k = rnd->Next() % K; const intptr_t g = current_.Get(k) + 1; const Key key = MakeKey(k, g); list_.Insert(key); current_.Set(k, g); } void ReadStep(Random* rnd) { // Remember the initial committed state of the skiplist. State initial_state; for (int k = 0; k < K; k++) { initial_state.Set(k, current_.Get(k)); } Key pos = RandomTarget(rnd); SkipList::Iterator iter(&list_); iter.Seek(pos); while (true) { Key current; if (!iter.Valid()) { current = MakeKey(K, 0); } else { current = iter.key(); ASSERT_TRUE(IsValidKey(current)) << current; } ASSERT_LE(pos, current) << "should not go backwards"; // Verify that everything in [pos,current) was not present in // initial_state. while (pos < current) { ASSERT_LT(key(pos), K) << pos; // Note that generation 0 is never inserted, so it is ok if // <*,0,*> is missing. ASSERT_TRUE((gen(pos) == 0) || (gen(pos) > static_cast(initial_state.Get(key(pos))))) << "key: " << key(pos) << "; gen: " << gen(pos) << "; initgen: " << initial_state.Get(key(pos)); // Advance to next key in the valid key space if (key(pos) < key(current)) { pos = MakeKey(key(pos) + 1, 0); } else { pos = MakeKey(key(pos), gen(pos) + 1); } } if (!iter.Valid()) { break; } if (rnd->Next() % 2) { iter.Next(); pos = MakeKey(key(pos), gen(pos) + 1); } else { Key new_target = RandomTarget(rnd); if (new_target > pos) { pos = new_target; iter.Seek(new_target); } } } } };*/ struct Node { Key num; Node* next; }; template class MemorySet { public: explicit __device__ MemorySet(): first(nullptr), current(nullptr) {}; __device__ ~MemorySet() { Node * crt = first, * prev; while (crt != nullptr) { prev = crt; crt = crt->next; delete prev; } first = nullptr; } __device__ bool insert(const Key & k) { if (first == nullptr) { first = new Node; first->num = k; first->next = nullptr; current = first; return true; } if (this->find(k)) { return false; } current->next = new Node; current = current->next; current->num = k; current->next = nullptr; return true; } __device__ bool find(const Key & value) { Node * crt = first; while (crt != nullptr) { if (crt->num == value) { return true; } crt = crt->next; } return false; } __device__ Node * get_first() { return this->first; } __device__ size_t count(const Key & k) { if (this->find(k)) { return 1; } return 0; } private: size_t total; Node * first; Node * current; }; template __device__ void ASSERT_EQ_dev(T a, U b) { assert(a == b); } __global__ void insert_and_lookup(SkipList * list) { printf("Lookup success\n"); const int N = 2000; const int R = 5000; Random rnd(1000); MemorySet keys; //auto * arena = new Arena(); //Comparator cmp; //SkipList list(cmp, &*arena); for (int i = 0; i < N; i++) { Key key = rnd.Next() % R; if (keys.insert(key)) { list->Insert(key); } } for (int i = 0; i < R; i++) { if (list->Contains(i)) { ASSERT_EQ_dev(keys.count(i), 1); } else { ASSERT_EQ_dev(keys.count(i), 0); } } Node * cur = keys.get_first(); while (cur != nullptr) { assert(list->Contains(cur->num)); cur = cur->next; } printf("Lookup success\n"); /* // Forward iteration test for (int i = 0; i < R; i++) { SkipList::Iterator iter(&list); iter.Seek(i); // Compare against model iterator std::set::iterator model_iter = keys.lower_bound(i); for (int j = 0; j < 3; j++) { if (model_iter == keys.end()) { ASSERT_TRUE(!iter.Valid()); break; } else { ASSERT_TRUE(iter.Valid()); ASSERT_EQ(*model_iter, iter.key()); ++model_iter; iter.Next(); } } } // Backward iteration test { SkipList::Iterator iter(&list); iter.SeekToLast(); // Compare against model iterator for (std::set::reverse_iterator model_iter = keys.rbegin(); model_iter != keys.rend(); ++model_iter) { ASSERT_TRUE(iter.Valid()); ASSERT_EQ(*model_iter, iter.key()); iter.Prev(); } ASSERT_TRUE(!iter.Valid()); } */ } class TestClass { public: explicit __device__ TestClass(): atomic(0), alloc_ptr_(nullptr), alloc_bytes_remaining_(0), head_(nullptr), blocks_(nullptr) { } TestClass(const TestClass&) = delete; TestClass& operator=(const TestClass&) = delete; char* alloc_ptr_; size_t alloc_bytes_remaining_; // Array of new[] allocated memory blocks //thrust::host_vector blocks_; //std::vector blocks_; void * head_; void * blocks_; cuda::atomic atomic; }; constexpr size_t SKIPLIST_TEST_SIZE = 10000; constexpr size_t TEST_STEP = SKIPLIST_TEST_SIZE / 10; constexpr unsigned UNLOCKED = 0; class CudaSpinLock { static constexpr int UNLOCKED = 0; static constexpr int LOCKED = 1; cuda::atomic m_value; public: __device__ __host__ explicit CudaSpinLock(): m_value(0) {} __device__ void lock() { while (true) { int expected = UNLOCKED; if (this->m_value.compare_exchange_strong(expected, LOCKED)) break; } } __device__ void unlock() { m_value.store(UNLOCKED); } __device__ bool isLock() { return this->m_value.load() == LOCKED; } }; __global__ void testLock() { CudaSpinLock lock; lock.lock(); assert(lock.isLock()); lock.unlock(); assert(!lock.isLock()); } TEST(SkipTest, TestLock) { testLock<<<1, 1>>>(); cudaDeviceSynchronize(); } __global__ void testParallel(SkipList * skipList, Key * keys, CudaSpinLock * lock) { unsigned int start = threadIdx.x; //printf("start: %u\n", start); lock->lock(); //printf("start insert: %u\n", start); for (unsigned i = start * TEST_STEP; i < (start + 1) * TEST_STEP; i++) { //printf("%u %02u %lu\n", start, i, keys[i]); //printf("key: %lu\n", keys[i]); skipList->Insert(keys[i]); } lock->unlock(); //printf("done: %u\n", start); } __global__ void testSingle(SkipList* skipList, Key * keys) { for (unsigned i = 0; i < SKIPLIST_TEST_SIZE; i ++) { skipList->Insert(keys[i]); } } __global__ void testKeysIsEqualLists(SkipList * skiplist, const Key * sorted_keys) { SkipList::Iterator iter(skiplist); iter.SeekToFirst(); for (unsigned i = 0; i < SKIPLIST_TEST_SIZE ; i++ ) { assert(iter.Valid()); assert(iter.key() == sorted_keys[i]); iter.Next(); } assert(!iter.Valid()); } __global__ void initSkipList(Arena ** pArena, SkipList ** pSkipList) { Comparator cmp; *pArena = new Arena(); *pSkipList = new SkipList(cmp, *pArena); } __global__ void freeSkipList(Arena *** pArena, SkipList *** pSkipList) { cudaFree(**pArena); cudaFree(**pSkipList); cudaFree(*pArena); cudaFree(*pSkipList); } TEST(SkipTest, TestInitSkiplist) { Arena ** pArena; SkipList ** pSkipList; cudaMallocManaged((void**)&pArena, sizeof(void*)); cudaMallocManaged((void**)&pSkipList, sizeof(void*)); initSkipList<<<1, 1>>>(pArena, pSkipList); cudaDeviceSynchronize(); //printf("%p %p\n", *pArena, *pSkipList); cudaFree(*pArena); cudaFree(*pSkipList); cudaFree(pArena); cudaFree(pSkipList); } __global__ void resetLock(CudaSpinLock * lock) { assert(!lock->isLock()); lock->unlock(); } TEST(SkipTest, TestSingleCudaInsert) { //Key * keys; //SkipList list(cmp, &arena); /* for (int i = 0; i < 1000; i++) { keys[i] = .Next(); }*/ Key * keys = new Key[SKIPLIST_TEST_SIZE], * sorted_keys = new Key[SKIPLIST_TEST_SIZE]; Arena ** pArena; SkipList ** skipList; std::set k; cudaMallocManaged((void**)&pArena, sizeof(void*)); cudaMallocManaged((void**)&skipList, sizeof(void*)); auto * device_rnd = new Random(test::RandomSeed()); Key * device_keys = nullptr; cudaMallocManaged((void**)&device_keys, sizeof(Key) * SKIPLIST_TEST_SIZE ); for (int i = 0; i < SKIPLIST_TEST_SIZE; i++) { Key tmp; size_t current = k.size(); do { tmp = device_rnd->Next(); k.insert(tmp); } while (k.size() == current); /*do { tmp = device_rnd->Next(); } while (k.find(tmp) != k.end());*/ keys[i] = tmp; //printf("%ld\n", tmp); } /* for (int i = 0; i< SKIPLIST_TEST_SIZE; i++) { keys[i] = i; }*/ memcpy(sorted_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key)); cudaMemcpy(device_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); dim3 gridSize(1, 1); dim3 blockSize(1, 1); initSkipList<<<1, 1>>>(pArena, skipList); //sleep(5); cudaDeviceSynchronize(); //insert_skiplist<<>>(skipList, device_rnd); //testParallel<<>>(*skipList, device_keys); testSingle<<<1, 1>>>(*skipList, device_keys); cudaDeviceSynchronize(); std::sort(sorted_keys, sorted_keys + SKIPLIST_TEST_SIZE); cudaMemcpy(device_keys, sorted_keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); testKeysIsEqualLists<<<1, 1>>>(*skipList, device_keys); //insert_and_lookup<<>>(skipList); cudaDeviceSynchronize(); cudaFree(*skipList); cudaFree(*pArena); cudaFree(device_keys); cudaFree(skipList); cudaFree(pArena); delete [] sorted_keys; delete [] keys; } TEST(SkipTest, TestMultiThreadInsert) { Key * keys = new Key[SKIPLIST_TEST_SIZE], * sorted_keys = new Key[SKIPLIST_TEST_SIZE]; Arena ** pArena; SkipList ** pSkipList; std::set k; Key * device_keys = nullptr; cudaMallocManaged((void**)&pArena, sizeof(void*)); cudaMallocManaged((void**)&pSkipList, sizeof(void*)); auto * device_rnd = new Random(test::RandomSeed()); cudaMallocManaged((void**)&device_keys, sizeof(Key) * SKIPLIST_TEST_SIZE ); //cuda::atomic lock; CudaSpinLock lock; CudaSpinLock * device_lock = nullptr; cudaMallocManaged((void**)&device_lock, sizeof(CudaSpinLock)); cudaMemcpy(device_lock, &lock, sizeof(CudaSpinLock), cudaMemcpyHostToDevice); resetLock<<<1, 1>>>(device_lock); cudaDeviceSynchronize(); for (int i = 0; i < SKIPLIST_TEST_SIZE; i++) { Key tmp; size_t current = k.size(); do { tmp = device_rnd->Next(); k.insert(tmp); } while (k.size() == current); keys[i] = tmp; } memcpy(sorted_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key)); cudaMemcpy(device_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); dim3 gridSize(1, 1); dim3 blockSize(10, 1); initSkipList<<<1, 1>>>(pArena, pSkipList); //sleep(5); cudaDeviceSynchronize(); //insert_skiplist<<>>(skipList, device_rnd); //testParallel<<>>(*skipList, device_keys); testParallel<<<1, blockSize>>>(*pSkipList, device_keys, device_lock); cudaDeviceSynchronize(); std::sort(sorted_keys, sorted_keys + SKIPLIST_TEST_SIZE); cudaMemcpy(device_keys, sorted_keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); testKeysIsEqualLists<<<1, 1>>>(*pSkipList, device_keys); //insert_and_lookup<<>>(skipList); cudaDeviceSynchronize(); cudaFree(*pSkipList); cudaFree(*pArena); cudaFree(device_keys); cudaFree(pSkipList); cudaFree(pArena); cudaFree(device_lock); delete [] sorted_keys; delete [] keys; } __global__ void globalTestEmpty() { Arena arena; Comparator cmp; SkipList list(cmp, &arena); assert(!list.Contains(10)); SkipList::Iterator iter(&list); assert(!iter.Valid()); iter.SeekToFirst(); assert(!iter.Valid()); iter.Seek(100); assert(!iter.Valid()); iter.SeekToLast(); assert(!iter.Valid()); } TEST(SkipTest, TestCudaEmpty) { globalTestEmpty<<<1, 1>>>(); cudaDeviceSynchronize(); } /* // Needed when building in C++11 mode. constexpr uint32_t ConcurrentTest::K; // Simple test that does single-threaded testing of the ConcurrentTest // scaffolding. TEST(SkipTest, ConcurrentWithoutThreads) { ConcurrentTest test; Random rnd(test::RandomSeed()); for (int i = 0; i < 10000; i++) { test.ReadStep(&rnd); test.WriteStep(&rnd); } } class TestState { public: ConcurrentTest t_; int seed_; std::atomic quit_flag_; enum ReaderState { STARTING, RUNNING, DONE }; explicit TestState(int s) : seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {} void Wait(ReaderState s) LOCKS_EXCLUDED(mu_) { mu_.Lock(); while (state_ != s) { state_cv_.Wait(); } mu_.Unlock(); } void Change(ReaderState s) LOCKS_EXCLUDED(mu_) { mu_.Lock(); state_ = s; state_cv_.Signal(); mu_.Unlock(); } private: port::Mutex mu_; ReaderState state_ GUARDED_BY(mu_); port::CondVar state_cv_ GUARDED_BY(mu_); }; static void ConcurrentReader(void* arg) { TestState* state = reinterpret_cast(arg); Random rnd(state->seed_); int64_t reads = 0; state->Change(TestState::RUNNING); while (!state->quit_flag_.load(std::memory_order_acquire)) { state->t_.ReadStep(&rnd); ++reads; } state->Change(TestState::DONE); } static void RunConcurrent(int run) { const int seed = test::RandomSeed() + (run * 100); Random rnd(seed); const int N = 1000; const int kSize = 1000; for (int i = 0; i < N; i++) { if ((i % 100) == 0) { std::fprintf(stderr, "Run %d of %d\n", i, N); } TestState state(seed + 1); Env::Default()->Schedule(ConcurrentReader, &state); state.Wait(TestState::RUNNING); for (int i = 0; i < kSize; i++) { state.t_.WriteStep(&rnd); } state.quit_flag_.store(true, std::memory_order_release); state.Wait(TestState::DONE); } } TEST(SkipTest, Concurrent1) { RunConcurrent(1); } TEST(SkipTest, Concurrent2) { RunConcurrent(2); } TEST(SkipTest, Concurrent3) { RunConcurrent(3); } TEST(SkipTest, Concurrent4) { RunConcurrent(4); } TEST(SkipTest, Concurrent5) { RunConcurrent(5); } */ } // namespace leveldb int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }