diff options
author | KunoiSayami <[email protected]> | 2022-01-19 19:40:07 +0800 |
---|---|---|
committer | KunoiSayami <[email protected]> | 2022-01-19 19:40:07 +0800 |
commit | 4fea7d70a4d1dbccb62e04f80106e4980aed60fa (patch) | |
tree | c88743c7e4b5ad6a162c447f0b56e1e4bc01cf29 | |
parent | b0d2dff85a05a5ced0f910e220daef108358289e (diff) |
test(skiplist): Fix parallel insert error
Signed-off-by: KunoiSayami <[email protected]>
-rw-r--r-- | db/skiplist.cuh | 80 | ||||
-rw-r--r-- | db/skiplist_test.cu | 85 |
2 files changed, 102 insertions, 63 deletions
diff --git a/db/skiplist.cuh b/db/skiplist.cuh index 9f1cdaa..3f7b7e9 100644 --- a/db/skiplist.cuh +++ b/db/skiplist.cuh @@ -37,6 +37,43 @@ namespace leveldb { +class CudaSpinLock { + static constexpr int UNLOCKED = 0; + static constexpr int LOCKED = 1; + + cuda::atomic<int> m_value; + bool isFake; + + public: + + __device__ __host__ explicit CudaSpinLock(): m_value(UNLOCKED), isFake(false) {} + + __device__ __host__ explicit CudaSpinLock(bool fake): m_value(UNLOCKED), isFake(fake) {} + + __device__ void lock() + { + if (!isFake) { + while (true) { + int expected = UNLOCKED; + // this->m_value.wait(LOCKED); + if (this->m_value.compare_exchange_weak(expected, LOCKED)) break; + } + } + } + + __device__ void unlock() + { + if (!isFake) { + m_value.store(UNLOCKED); + } + } + + __device__ bool isLock() { + //printf("%d\n", this->m_value.load()); + return !isFake && this->m_value.load() == LOCKED; + } +}; + class Arena; template <typename Key, class Comparator> @@ -139,6 +176,9 @@ class SkipList { // values are ok. cuda::atomic<int> max_height_; // Height of the entire list + CudaSpinLock arena_lock_; + CudaSpinLock find_lock_; + // Read/written only by Insert(). Random rnd_; }; @@ -185,18 +225,21 @@ struct SkipList<Key, Comparator>::Node { next_[n].store(x, cuda::memory_order_release); } - __device__ bool SetNextSafe(int n, Node *x, Node * origin) { + __device__ bool SetNextSafe(int n, Node *x) { assert(n >= 0); + Node * origin = next_[n].load(cuda::memory_order_acquire); + //assert(origin != x); if (origin == nullptr) { - next_[n].store(x, cuda::memory_order_release); + next_[n].store(x); + x->NoBarrier_SetNext(n, origin); return true; } - printf("%d %p %p\n", n, x, origin); + if (origin->key < x->key) { + return false; + } bool ret = next_[n].compare_exchange_weak(origin, x, cuda::memory_order_acquire); - if (!ret) { - Node * rep = next_[n].load(cuda::memory_order_acquire); - printf("%p %p\n", rep, origin); - assert(rep != origin); + if (ret) { + x->SetNext(n, origin); } return ret; } @@ -219,9 +262,12 @@ struct SkipList<Key, Comparator>::Node { template <typename Key, class Comparator> __device__ typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode( const Key& key, int height) { + arena_lock_.lock(); char* const node_memory = arena_->AllocateAligned( sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1)); - return new (node_memory) Node(key); + Node* ret = new (node_memory) Node(key); + arena_lock_.unlock(); + return ret; } template <typename Key, class Comparator> @@ -303,6 +349,8 @@ __device__ SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key, int level = GetMaxHeight() - 1; while (true) { Node* next = x->Next(level); + assert(x != next); + //printf("%u %lu %x %p\n", threadIdx.x, key, x, next); if (KeyIsAfterNode(key, next)) { // Keep searching in this list x = next; @@ -365,7 +413,9 @@ __device__ SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena) arena_(arena), head_(NewNode(0 /* any key will do */, kMaxHeight)), max_height_(1), - rnd_(0xdeadbeef) { + rnd_(0xdeadbeef), + arena_lock_(), + find_lock_() { for (int i = 0; i < kMaxHeight; i++) { head_->SetNext(i, nullptr); } @@ -381,6 +431,7 @@ __device__ void SkipList<Key, Comparator>::Insert(const Key& key) { // Our data structure does not allow duplicate insertion assert(x == nullptr || !Equal(key, x->key)); + int height = RandomHeight(); if (height > GetMaxHeight()) { for (int i = GetMaxHeight(); i < height; i++) { @@ -396,19 +447,20 @@ __device__ void SkipList<Key, Comparator>::Insert(const Key& key) { max_height_.store(height, cuda::memory_order_relaxed); } - Node * original_next; x = NewNode(key, height); + //printf("x: %p\n", x); for (int i = 0; i < height; i++) { // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. while (true) { - original_next = prev[i]->NoBarrier_Next(i); - x->NoBarrier_SetNext(i, original_next); - if (prev[i]->SetNextSafe(i, x, original_next)) + assert(prev[i]->key < key); + if (prev[i]->SetNextSafe(i, x)) break; - printf("failed %p\n", original_next); + //printf("failed\n"); FindGreaterOrEqual(key, prev); } + /*x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); + prev[i]->SetNext(i, x);*/ } } diff --git a/db/skiplist_test.cu b/db/skiplist_test.cu index 8d924f4..3499cd0 100644 --- a/db/skiplist_test.cu +++ b/db/skiplist_test.cu @@ -363,47 +363,6 @@ __global__ void insert_and_lookup(SkipList<Key, Comparator> * list) { } -constexpr size_t SKIPLIST_TEST_SIZE = 10000; -constexpr size_t TEST_STEP = SKIPLIST_TEST_SIZE / 10; - -// source: https://stackoverflow.com/a/22598599 -class CudaSpinLock { - static constexpr int UNLOCKED = 0; - static constexpr int LOCKED = 1; - - cuda::atomic<int> m_value; - bool isFake; - - public: - - __device__ __host__ explicit CudaSpinLock(): m_value(UNLOCKED), isFake(false) {} - - __device__ __host__ explicit CudaSpinLock(bool fake): m_value(UNLOCKED), isFake(fake) {} - - __device__ void lock() - { - if (!isFake) { - while (true) { - int expected = UNLOCKED; - // this->m_value.wait(LOCKED); - if (this->m_value.compare_exchange_weak(expected, LOCKED)) break; - } - } - } - - __device__ void unlock() - { - if (!isFake) { - m_value.store(UNLOCKED); - } - } - - __device__ bool isLock() { - //printf("%d\n", this->m_value.load()); - return !isFake && this->m_value.load() == LOCKED; - } -}; - __global__ void testCudaAtomic() { cuda::atomic<void *> node; void * test_point = reinterpret_cast<void*>(0xdeadbeef); @@ -438,18 +397,24 @@ TEST(SkipTest, TestLock) { cudaDeviceSynchronize(); } +constexpr size_t BLOCK_COUNT_X = 40; +constexpr size_t BLOCK_COUNT_Y = 256; +constexpr size_t TEST_STEP = 1; +constexpr size_t SKIPLIST_TEST_SIZE = BLOCK_COUNT_X * BLOCK_COUNT_Y * TEST_STEP; + __global__ void testParallel(SkipList<Key, Comparator> * skipList, Key * keys, CudaSpinLock * lock) { - unsigned int start = threadIdx.x; - printf("start: %u\n", start); + unsigned int start = blockIdx.x * blockDim.x + threadIdx.x; + //printf("start: %u %d %d %d\n", start, blockIdx.x ,blockDim.x, threadIdx.x); //lock->lock(); //printf("start insert: %u\n", start); - for (unsigned i = start * TEST_STEP; i < (start + 1) * TEST_STEP; i++) { - printf("%u %02u %lu\n", start, i, keys[i]); + /*for (unsigned i = start * TEST_STEP; i < (start + 1) * TEST_STEP; i++) { + //printf("%u %02u %lu\n", start, i, keys[i]); //printf("key: %lu\n", keys[i]); skipList->Insert(keys[i]); - } + }*/ + skipList->Insert(keys[start]); //lock->unlock(); - printf("done: %u\n", start); + //printf("done: %u\n", start); } __global__ void testSingle(SkipList<Key, Comparator>* skipList, Key * keys) { @@ -466,6 +431,7 @@ __global__ void testKeysIsEqualLists(SkipList<Key, Comparator> * skiplist, const for (unsigned i = 0; i < SKIPLIST_TEST_SIZE ; i++ ) { assert(iter.Valid()); + //printf("%d %lu %lu\n", i, iter.key(), sorted_keys[i]); assert(iter.key() == sorted_keys[i]); iter.Next(); } @@ -508,6 +474,16 @@ __global__ void resetLock(CudaSpinLock * lock) { lock->unlock(); } +#ifdef SHOW_TIME +#include <chrono> +void showTimeSpan(const std::chrono::high_resolution_clock::time_point & start_time) { + std::chrono::duration<double> time_span = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::high_resolution_clock::now() - start_time); + printf("used time: %.4lf seconds\n", time_span.count()); +} +#else +#define showTimeSpan(x) +#endif + TEST(SkipTest, TestSingleCudaInsert) { //Key * keys; //SkipList<Key, Comparator> list(cmp, &arena); @@ -557,8 +533,14 @@ TEST(SkipTest, TestSingleCudaInsert) { //insert_skiplist<<<gridSize, blockSize>>>(skipList, device_rnd); //testParallel<<<gridSize, blockSize>>>(*skipList, device_keys); +#ifdef SHOW_TIME + const std::chrono::high_resolution_clock::time_point start_time = + std::chrono::high_resolution_clock::now(); +#endif testSingle<<<1, 1>>>(*skipList, device_keys); cudaDeviceSynchronize(); + printf("%d\n", cudaGetLastError()); + showTimeSpan(start_time); std::sort(sorted_keys, sorted_keys + SKIPLIST_TEST_SIZE); cudaMemcpy(device_keys, sorted_keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); @@ -610,7 +592,7 @@ TEST(SkipTest, TestMultiThreadInsert) { memcpy(sorted_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key)); cudaMemcpy(device_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); dim3 gridSize(1, 1); - dim3 blockSize(10, 1); + dim3 blockSize(BLOCK_COUNT_X, BLOCK_COUNT_Y); initSkipList<<<1, 1>>>(pArena, pSkipList); //sleep(5); @@ -618,8 +600,13 @@ TEST(SkipTest, TestMultiThreadInsert) { //insert_skiplist<<<gridSize, blockSize>>>(skipList, device_rnd); //testParallel<<<gridSize, blockSize>>>(*skipList, device_keys); - testParallel<<<gridSize, blockSize>>>(*pSkipList, device_keys, device_lock); +#ifdef SHOW_TIME + const std::chrono::high_resolution_clock::time_point start_time = + std::chrono::high_resolution_clock::now(); +#endif + testParallel<<<BLOCK_COUNT_X, BLOCK_COUNT_Y>>>(*pSkipList, device_keys, device_lock); cudaDeviceSynchronize(); + showTimeSpan(start_time); std::sort(sorted_keys, sorted_keys + SKIPLIST_TEST_SIZE); cudaMemcpy(device_keys, sorted_keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice); |