aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKunoiSayami <[email protected]>2022-01-19 19:40:07 +0800
committerKunoiSayami <[email protected]>2022-01-19 19:40:07 +0800
commit4fea7d70a4d1dbccb62e04f80106e4980aed60fa (patch)
treec88743c7e4b5ad6a162c447f0b56e1e4bc01cf29
parentb0d2dff85a05a5ced0f910e220daef108358289e (diff)
test(skiplist): Fix parallel insert error
Signed-off-by: KunoiSayami <[email protected]>
-rw-r--r--db/skiplist.cuh80
-rw-r--r--db/skiplist_test.cu85
2 files changed, 102 insertions, 63 deletions
diff --git a/db/skiplist.cuh b/db/skiplist.cuh
index 9f1cdaa..3f7b7e9 100644
--- a/db/skiplist.cuh
+++ b/db/skiplist.cuh
@@ -37,6 +37,43 @@
namespace leveldb {
+class CudaSpinLock {
+ static constexpr int UNLOCKED = 0;
+ static constexpr int LOCKED = 1;
+
+ cuda::atomic<int> m_value;
+ bool isFake;
+
+ public:
+
+ __device__ __host__ explicit CudaSpinLock(): m_value(UNLOCKED), isFake(false) {}
+
+ __device__ __host__ explicit CudaSpinLock(bool fake): m_value(UNLOCKED), isFake(fake) {}
+
+ __device__ void lock()
+ {
+ if (!isFake) {
+ while (true) {
+ int expected = UNLOCKED;
+ // this->m_value.wait(LOCKED);
+ if (this->m_value.compare_exchange_weak(expected, LOCKED)) break;
+ }
+ }
+ }
+
+ __device__ void unlock()
+ {
+ if (!isFake) {
+ m_value.store(UNLOCKED);
+ }
+ }
+
+ __device__ bool isLock() {
+ //printf("%d\n", this->m_value.load());
+ return !isFake && this->m_value.load() == LOCKED;
+ }
+};
+
class Arena;
template <typename Key, class Comparator>
@@ -139,6 +176,9 @@ class SkipList {
// values are ok.
cuda::atomic<int> max_height_; // Height of the entire list
+ CudaSpinLock arena_lock_;
+ CudaSpinLock find_lock_;
+
// Read/written only by Insert().
Random rnd_;
};
@@ -185,18 +225,21 @@ struct SkipList<Key, Comparator>::Node {
next_[n].store(x, cuda::memory_order_release);
}
- __device__ bool SetNextSafe(int n, Node *x, Node * origin) {
+ __device__ bool SetNextSafe(int n, Node *x) {
assert(n >= 0);
+ Node * origin = next_[n].load(cuda::memory_order_acquire);
+ //assert(origin != x);
if (origin == nullptr) {
- next_[n].store(x, cuda::memory_order_release);
+ next_[n].store(x);
+ x->NoBarrier_SetNext(n, origin);
return true;
}
- printf("%d %p %p\n", n, x, origin);
+ if (origin->key < x->key) {
+ return false;
+ }
bool ret = next_[n].compare_exchange_weak(origin, x, cuda::memory_order_acquire);
- if (!ret) {
- Node * rep = next_[n].load(cuda::memory_order_acquire);
- printf("%p %p\n", rep, origin);
- assert(rep != origin);
+ if (ret) {
+ x->SetNext(n, origin);
}
return ret;
}
@@ -219,9 +262,12 @@ struct SkipList<Key, Comparator>::Node {
template <typename Key, class Comparator>
__device__ typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
+ arena_lock_.lock();
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
- return new (node_memory) Node(key);
+ Node* ret = new (node_memory) Node(key);
+ arena_lock_.unlock();
+ return ret;
}
template <typename Key, class Comparator>
@@ -303,6 +349,8 @@ __device__ SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
+ assert(x != next);
+ //printf("%u %lu %x %p\n", threadIdx.x, key, x, next);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
@@ -365,7 +413,9 @@ __device__ SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(1),
- rnd_(0xdeadbeef) {
+ rnd_(0xdeadbeef),
+ arena_lock_(),
+ find_lock_() {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
}
@@ -381,6 +431,7 @@ __device__ void SkipList<Key, Comparator>::Insert(const Key& key) {
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
+
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
@@ -396,19 +447,20 @@ __device__ void SkipList<Key, Comparator>::Insert(const Key& key) {
max_height_.store(height, cuda::memory_order_relaxed);
}
- Node * original_next;
x = NewNode(key, height);
+ //printf("x: %p\n", x);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
while (true) {
- original_next = prev[i]->NoBarrier_Next(i);
- x->NoBarrier_SetNext(i, original_next);
- if (prev[i]->SetNextSafe(i, x, original_next))
+ assert(prev[i]->key < key);
+ if (prev[i]->SetNextSafe(i, x))
break;
- printf("failed %p\n", original_next);
+ //printf("failed\n");
FindGreaterOrEqual(key, prev);
}
+ /*x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
+ prev[i]->SetNext(i, x);*/
}
}
diff --git a/db/skiplist_test.cu b/db/skiplist_test.cu
index 8d924f4..3499cd0 100644
--- a/db/skiplist_test.cu
+++ b/db/skiplist_test.cu
@@ -363,47 +363,6 @@ __global__ void insert_and_lookup(SkipList<Key, Comparator> * list) {
}
-constexpr size_t SKIPLIST_TEST_SIZE = 10000;
-constexpr size_t TEST_STEP = SKIPLIST_TEST_SIZE / 10;
-
-// source: https://stackoverflow.com/a/22598599
-class CudaSpinLock {
- static constexpr int UNLOCKED = 0;
- static constexpr int LOCKED = 1;
-
- cuda::atomic<int> m_value;
- bool isFake;
-
- public:
-
- __device__ __host__ explicit CudaSpinLock(): m_value(UNLOCKED), isFake(false) {}
-
- __device__ __host__ explicit CudaSpinLock(bool fake): m_value(UNLOCKED), isFake(fake) {}
-
- __device__ void lock()
- {
- if (!isFake) {
- while (true) {
- int expected = UNLOCKED;
- // this->m_value.wait(LOCKED);
- if (this->m_value.compare_exchange_weak(expected, LOCKED)) break;
- }
- }
- }
-
- __device__ void unlock()
- {
- if (!isFake) {
- m_value.store(UNLOCKED);
- }
- }
-
- __device__ bool isLock() {
- //printf("%d\n", this->m_value.load());
- return !isFake && this->m_value.load() == LOCKED;
- }
-};
-
__global__ void testCudaAtomic() {
cuda::atomic<void *> node;
void * test_point = reinterpret_cast<void*>(0xdeadbeef);
@@ -438,18 +397,24 @@ TEST(SkipTest, TestLock) {
cudaDeviceSynchronize();
}
+constexpr size_t BLOCK_COUNT_X = 40;
+constexpr size_t BLOCK_COUNT_Y = 256;
+constexpr size_t TEST_STEP = 1;
+constexpr size_t SKIPLIST_TEST_SIZE = BLOCK_COUNT_X * BLOCK_COUNT_Y * TEST_STEP;
+
__global__ void testParallel(SkipList<Key, Comparator> * skipList, Key * keys, CudaSpinLock * lock) {
- unsigned int start = threadIdx.x;
- printf("start: %u\n", start);
+ unsigned int start = blockIdx.x * blockDim.x + threadIdx.x;
+ //printf("start: %u %d %d %d\n", start, blockIdx.x ,blockDim.x, threadIdx.x);
//lock->lock();
//printf("start insert: %u\n", start);
- for (unsigned i = start * TEST_STEP; i < (start + 1) * TEST_STEP; i++) {
- printf("%u %02u %lu\n", start, i, keys[i]);
+ /*for (unsigned i = start * TEST_STEP; i < (start + 1) * TEST_STEP; i++) {
+ //printf("%u %02u %lu\n", start, i, keys[i]);
//printf("key: %lu\n", keys[i]);
skipList->Insert(keys[i]);
- }
+ }*/
+ skipList->Insert(keys[start]);
//lock->unlock();
- printf("done: %u\n", start);
+ //printf("done: %u\n", start);
}
__global__ void testSingle(SkipList<Key, Comparator>* skipList, Key * keys) {
@@ -466,6 +431,7 @@ __global__ void testKeysIsEqualLists(SkipList<Key, Comparator> * skiplist, const
for (unsigned i = 0; i < SKIPLIST_TEST_SIZE ; i++ ) {
assert(iter.Valid());
+ //printf("%d %lu %lu\n", i, iter.key(), sorted_keys[i]);
assert(iter.key() == sorted_keys[i]);
iter.Next();
}
@@ -508,6 +474,16 @@ __global__ void resetLock(CudaSpinLock * lock) {
lock->unlock();
}
+#ifdef SHOW_TIME
+#include <chrono>
+void showTimeSpan(const std::chrono::high_resolution_clock::time_point & start_time) {
+ std::chrono::duration<double> time_span = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::high_resolution_clock::now() - start_time);
+ printf("used time: %.4lf seconds\n", time_span.count());
+}
+#else
+#define showTimeSpan(x)
+#endif
+
TEST(SkipTest, TestSingleCudaInsert) {
//Key * keys;
//SkipList<Key, Comparator> list(cmp, &arena);
@@ -557,8 +533,14 @@ TEST(SkipTest, TestSingleCudaInsert) {
//insert_skiplist<<<gridSize, blockSize>>>(skipList, device_rnd);
//testParallel<<<gridSize, blockSize>>>(*skipList, device_keys);
+#ifdef SHOW_TIME
+ const std::chrono::high_resolution_clock::time_point start_time =
+ std::chrono::high_resolution_clock::now();
+#endif
testSingle<<<1, 1>>>(*skipList, device_keys);
cudaDeviceSynchronize();
+ printf("%d\n", cudaGetLastError());
+ showTimeSpan(start_time);
std::sort(sorted_keys, sorted_keys + SKIPLIST_TEST_SIZE);
cudaMemcpy(device_keys, sorted_keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice);
@@ -610,7 +592,7 @@ TEST(SkipTest, TestMultiThreadInsert) {
memcpy(sorted_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key));
cudaMemcpy(device_keys, keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice);
dim3 gridSize(1, 1);
- dim3 blockSize(10, 1);
+ dim3 blockSize(BLOCK_COUNT_X, BLOCK_COUNT_Y);
initSkipList<<<1, 1>>>(pArena, pSkipList);
//sleep(5);
@@ -618,8 +600,13 @@ TEST(SkipTest, TestMultiThreadInsert) {
//insert_skiplist<<<gridSize, blockSize>>>(skipList, device_rnd);
//testParallel<<<gridSize, blockSize>>>(*skipList, device_keys);
- testParallel<<<gridSize, blockSize>>>(*pSkipList, device_keys, device_lock);
+#ifdef SHOW_TIME
+ const std::chrono::high_resolution_clock::time_point start_time =
+ std::chrono::high_resolution_clock::now();
+#endif
+ testParallel<<<BLOCK_COUNT_X, BLOCK_COUNT_Y>>>(*pSkipList, device_keys, device_lock);
cudaDeviceSynchronize();
+ showTimeSpan(start_time);
std::sort(sorted_keys, sorted_keys + SKIPLIST_TEST_SIZE);
cudaMemcpy(device_keys, sorted_keys, SKIPLIST_TEST_SIZE * sizeof(Key), cudaMemcpyHostToDevice);