diff options
author | KunoiSayami <[email protected]> | 2022-05-16 15:47:09 +0800 |
---|---|---|
committer | KunoiSayami <[email protected]> | 2022-05-16 15:47:09 +0800 |
commit | 1bb82039b44cb6d7be5e60a3a6b96b3808ee0725 (patch) | |
tree | 60223f32c08ac77851a3b2ee65e0aefd7477a7d0 | |
parent | 0b1a8e9299df42ad22ac889dc9907c8efe2dd9d3 (diff) |
Signed-off-by: KunoiSayami <[email protected]>
-rw-r--r-- | db/memtable.cu | 246 | ||||
-rw-r--r-- | db/memtable.cuh | 325 |
2 files changed, 533 insertions, 38 deletions
diff --git a/db/memtable.cu b/db/memtable.cu index 241a35b..51ca578 100644 --- a/db/memtable.cu +++ b/db/memtable.cu @@ -20,7 +20,6 @@ static Slice GetLengthPrefixedSlice(const char* data) { MemTable::MemTable(const InternalKeyComparator& comparator) : comparator_(comparator), refs_(0) { - } MemTable::~MemTable() { assert(refs_ == 0); } @@ -266,4 +265,249 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { return false; } +__device__ const char* GetVarint32PtrFallbackCuda(const char* p, const char* limit, + uint32_t* value) { + uint32_t result = 0; + for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) { + uint32_t byte = *(reinterpret_cast<const uint8_t*>(p)); + p++; + if (byte & 128) { + // More bytes are present + result |= ((byte & 127) << shift); + } else { + result |= (byte << shift); + *value = result; + return reinterpret_cast<const char*>(p); + } + } + return nullptr; +} + +__device__ inline const char* GetVarint32PtrCuda(const char* p, const char* limit, + uint32_t* value) { + if (p < limit) { + uint32_t result = *(reinterpret_cast<const uint8_t*>(p)); + if ((result & 128) == 0) { + *value = result; + return p + 1; + } + } + return GetVarint32PtrFallbackCuda(p, limit, value); +} + +struct SizedSlice { + const char * ptr; + size_t len; + + __device__ __host__ explicit SizedSlice(const char * ptr, size_t len) { + this->ptr = ptr; + this->len = len; + } +}; + +__device__ static SizedSlice GetLengthPrefixedSliceCuda(const char* data) { + uint32_t len; + const char* p = data; + p = GetVarint32PtrCuda(p, p + 5, &len); // +5: we assume "p" is not corrupted + return SizedSlice(p, len); +} + + +__device__ MemTableCuda::MemTableCuda(const InternalKeyComparator& comparator) + : comparator_(comparator), refs_(0), table_(comparator_, &arena_) { +} + +MemTableCuda::~MemTableCuda() { assert(refs_ == 0); } + +size_t MemTableCuda::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } + +__device__ int MemTableCuda::KeyComparator::operator()(const char* aptr, + const char* bptr) const { + // Internal keys are encoded as length-prefixed strings. + Slice a = GetLengthPrefixedSliceCuda(aptr); + Slice b = GetLengthPrefixedSliceCuda(bptr); + return comparator.Compare(a, b); +} + +class MemTableCudaIterator : public Iterator { + public: + explicit MemTableCudaIterator(MemTableCuda::Table* table) : iter_(table) {} + + MemTableCudaIterator(const MemTableCudaIterator&) = delete; + MemTableCudaIterator& operator=(const MemTableCudaIterator&) = delete; + + ~MemTableCudaIterator() override = default; + + bool Valid() const override { return iter_.Valid(); } + void Seek(const Slice& k) override { + char * key = nullptr; + EncodeKey(&tmp_, k); + Seek_<<<1, 1>>>(this, key); + } + void SeekToFirst() override { iter_.SeekToFirst(); } + void SeekToLast() override { iter_.SeekToLast(); } + void Next() override { iter_.Next(); } + void Prev() override { Prev_<<<1, 1>>>(this); } + Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); } + Slice value() const override { + Slice key_slice = GetLengthPrefixedSlice(iter_.key()); + return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); + } + + Status status() const override { return Status::OK(); } + + static __global__ void Prev_(MemTableCudaIterator* this_) { + this_->iter_.Prev(); + } + + static __global__ void Seek_(MemTableCudaIterator* this_, const char * key) { + this_->iter_.Seek(key); + } + + private: + MemTableCuda::Table::Iterator iter_; + std::string tmp_; // For passing to EncodeKey +}; + +Iterator* MemTableCuda::NewIterator() { return new MemTableCudaIterator(&table_); } + + +__global__ void MemTableCudaAdd_(MemTableCuda * mtb, size_t encoded_len, char * encode_data) { + char* buf = mtb->arena_.Allocate(encoded_len); + memcpy(buf, encode_data, encoded_len); + mtb->table_.Insert(buf); +} + + +void MemTableCuda::Add(SequenceNumber s, ValueType type, const Slice& key, + const Slice& value) { + // Format of an entry is concatenation of: + // key_size : varint32 of internal_key.size() + // key bytes : char[internal_key.size()] + // tag : uint64((sequence << 8) | type) + // value_size : varint32 of value.size() + // value point : point to host memory + size_t key_size = key.size(); + size_t val_size = value.size(); + size_t internal_key_size = key_size + 8; + //const size_t encoded_len = VarintLength(internal_key_size) + + // internal_key_size + VarintLength(val_size) + + // val_size; + const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + + VarintLength(val_size) + 8; + + + char * key_mem = this->arena_.Allocate( key_size); + std::memcpy(key_mem, key.data(), key_size); + + //char * tag_mem = key_mem + key_size; + //EncodeFixed64(tag_mem, (s << 8) | type); + + char * val_mem = this->arena_.Allocate(val_size); + std::memcpy(val_mem, value.data(), val_size); + + char * insert_val = new char[encoded_len], *cuda_insert = nullptr; + // EncodeVarint32(insert_val, encoded_len); + char * p = EncodeVarint32(insert_val + 8, internal_key_size); + //EncodeFixed64(p, reinterpret_cast<uint64_t>(key_mem)); + memcpy(p, key_mem, key_size); + p += key_size; + EncodeFixed64(p, (s << 8) | type ); + p += 8; + //EncodeFixed64(p, reinterpret_cast<uint64_t>(val_size)); + p = EncodeVarint32(p, val_size); + EncodeFixed64(p, reinterpret_cast<uint64_t>(val_mem)); + + assert(p + 8 == insert_val + encoded_len); + + cudaMalloc((void**)&cuda_insert, encoded_len); + cudaMemcpy(cuda_insert, insert_val, encoded_len, cudaMemcpyHostToDevice); + + MemTableCudaAdd_<<<1, 1>>>(this, encoded_len, cuda_insert); + cudaDeviceSynchronize(); + + cudaFree(cuda_insert); + delete[] insert_val; +} + +__global__ void MemTableCudaGet_(MemTableCuda * met, char * memkey, char ** data, size_t* malloc_size) { + *data = nullptr; + auto iter = met->getIter(); + iter.Seek(memkey); + if (iter.Valid()) { + // entry format is: + // klength varint32 + // userkey char[klength] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + size_t key_size; + const char *entry = iter.key(); + const char * p = GetVarint32PtrCuda(entry, entry + 5, + reinterpret_cast<uint32_t*>(&key_size)); + p = GetVarint32PtrCuda(p + key_size, p + key_size + 5, nullptr); + *malloc_size = (p - entry + 8); + cudaMalloc((void**)*data, *malloc_size); + memcpy(*data, entry, *malloc_size); + } +} + +bool MemTableCuda::Get(const LookupKey& key, std::string* value, Status* s) { + + Slice memkey = key.memtable_key(); + char * cuda_mem_key = nullptr; + cudaMalloc((void**)&cuda_mem_key, memkey.size()); + cudaMemcpy(cuda_mem_key, memkey.data(), memkey.size(), cudaMemcpyHostToDevice); + + char ** cuda_skiplist_key = nullptr; + cudaMalloc((void**)&cuda_skiplist_hkey, sizeof(char *)); + + size_t * cuda_malloc_size = nullptr; + cudaMalloc((void**)&cuda_malloc_size, sizeof(cuda_malloc_size)); + + MemTableCudaGet_<<<1,1>>>(this, cuda_mem_key, cuda_skiplist_key, cuda_malloc_size); + cudaDeviceSynchronize(); + cudaFree(cuda_mem_key); + + if (*cuda_skiplist_key == nullptr) { + cudaFree(cuda_skiplist_key); + cudaFree(cuda_malloc_size); + return false; + } + auto * malloc_size = new size_t; + cudaMemcpy(malloc_size, cuda_malloc_size, sizeof(size_t), cudaMemcpyDeviceToHost); + + char * entry = this->host_arena_.Allocate(*malloc_size); + cudaMemcpy(entry, cuda_skiplist_key, *malloc_size, cudaMemcpyDeviceToHost); + + cudaFree(cuda_skiplist_key); + cudaFree(cuda_malloc_size); + delete malloc_size; + + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), key.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast<ValueType>(tag & 0xff)) { + case kTypeValue: { + // HOW CAN WE GET IT? + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + value->assign(v.data(), v.size()); + return true; + } + case kTypeDeletion: + *s = Status::NotFound(Slice()); + return true; + } + } + + + return false; +} + } // namespace leveldb diff --git a/db/memtable.cuh b/db/memtable.cuh index 766eb03..6f0b2cd 100644 --- a/db/memtable.cuh +++ b/db/memtable.cuh @@ -18,36 +18,6 @@ namespace leveldb { class InternalKeyComparator; class MemTableIterator; -__device__ const char* GetVarint32PtrFallbackCuda(const char* p, const char* limit, - uint32_t* value) { - uint32_t result = 0; - for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) { - uint32_t byte = *(reinterpret_cast<const uint8_t*>(p)); - p++; - if (byte & 128) { - // More bytes are present - result |= ((byte & 127) << shift); - } else { - result |= (byte << shift); - *value = result; - return reinterpret_cast<const char*>(p); - } - } - return nullptr; -} - -__device__ inline const char* GetVarint32PtrCuda(const char* p, const char* limit, - uint32_t* value) { - if (p < limit) { - uint32_t result = *(reinterpret_cast<const uint8_t*>(p)); - if ((result & 128) == 0) { - *value = result; - return p + 1; - } - } - return GetVarint32PtrFallbackCuda(p, limit, value); -} - struct SizedString { char * data; size_t length; @@ -73,13 +43,6 @@ struct SizedString { }; -__device__ static SizedString GetLengthPrefixedSliceCuda(const char* data) { - uint32_t len; - const char* p = data; - p = GetVarint32PtrCuda(p, p + 5, &len); // +5: we assume "p" is not corrupted - return SizedString(p, len); -} - class MemTable { public: // MemTables are reference counted. The initial reference count @@ -154,6 +117,294 @@ class MemTable { Table table_; }; + +class CudaSlice; + +// A Comparator object provides a total order across slices that are +// used as keys in an sstable or a database. A Comparator implementation +// must be thread-safe since leveldb may invoke its methods concurrently +// from multiple threads. +class LEVELDB_EXPORT CudaComparator { + public: + virtual ~CudaComparator(); + + virtual __device__ int Compare(const CudaSlice& a, const CudaSlice& b) const = 0; + + virtual __device__ __host__ const char* Name() const = 0; + + virtual __device__ void FindShortestSeparator(const CudaSlice* start, + const CudaSlice& limit) const = 0; + + virtual __device__ void FindShortSuccessor(CudaSlice* key) const = 0; +}; + +// Return a builtin comparator that uses lexicographic byte-wise +// ordering. The result remains the property of this module and +// must not be deleted. +LEVELDB_EXPORT const Comparator* BytewiseComparator(); + + +// A comparator for internal keys that uses a specified comparator for +// the user key portion and breaks ties by decreasing sequence number. +class CudaInternalKeyComparator : public CudaComparator { + private: + const CudaComparator* user_comparator_; + + public: + explicit CudaInternalKeyComparator(const CudaComparator* c) : user_comparator_(c) {} + __device__ __host__ const char* Name() const override; + __device__ int Compare(const CudaSlice& a, const CudaSlice& b) const override; + __device__ void FindShortestSeparator(const CudaSlice* start, + const CudaSlice& limit) const override; + __device__ void FindShortSuccessor(CudaSlice* key) const override; + + __device__ __host__ const CudaComparator* user_comparator() const { return user_comparator_; } + + __device__ int Compare(const InternalKey& a, const InternalKey& b) const; +}; + + +class LEVELDB_EXPORT CudaSlice { + public: + // Create an empty slice. + CudaSlice() : data_(""), size_(0) {} + + // Create a slice that refers to d[0,n-1]. + CudaSlice(const char* d, size_t n) : data_(d), size_(n) {} + + // Create a slice that refers to the contents of "s" + //CudaSlice(const std::string& s) : data_(s.data()), size_(s.size()) {} + + // Create a slice that refers to s[0,strlen(s)-1] + CudaSlice(const char* s) : data_(s), size_(strlen(s)) {} + + // Intentionally copyable. + CudaSlice(const CudaSlice&) = default; + CudaSlice& operator=(const CudaSlice&) = default; + + // Return a pointer to the beginning of the referenced data + __device__ const char* data() const { return data_; } + + // Return the length (in bytes) of the referenced data + __device__ __host__ size_t size() const { return size_; } + + // Return true iff the length of the referenced data is zero + __device__ __host__ bool empty() const { return size_ == 0; } + + // Return the ith byte in the referenced data. + // REQUIRES: n < size() + /*__device__ char operator[](size_t n) const { + assert(n < size()); + return data_[n]; + }*/ + __device__ char get_index(size_t n) const { + assert(n < size()); + return data_[n]; + } + + // Change this slice to refer to an empty array + __device__ __host__ void clear() { + data_ = nullptr; + size_ = 0; + } + + // Drop the first "n" bytes from this slice. + void remove_prefix(size_t n) { + assert(n <= size()); + data_ += n; + size_ -= n; + } + + // Return a string that contains the copy of the referenced data. + //std::string ToString() const { return std::string(data_, size_); } + + // Three-way comparison. Returns value: + // < 0 iff "*this" < "b", + // == 0 iff "*this" == "b", + // > 0 iff "*this" > "b" + __device__ int compare(const CudaSlice& b) const; + + // Return true iff "x" is a prefix of "*this" + bool starts_with(const CudaSlice& x) const { + return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0)); + } + + private: + const char* data_; + size_t size_; +}; + + +template<typename T> +__device__ T cudaMin(const T & a, const T &b) { + return a>b?a:b; +} + +CudaComparator::~CudaComparator() = default; + +namespace { +class BytewiseComparatorImpl : public CudaComparator { + public: + BytewiseComparatorImpl() = default; + + + + __device__ __host__ const char* Name() const override { return "leveldb.BytewiseComparator"; } + + __device__ int Compare(const CudaSlice& a, const CudaSlice& b) const override { + return a.compare(b); + } + + __device__ void FindShortestSeparator(const CudaSlice * start, + const CudaSlice& limit) const override { + // Find length of common prefix + size_t min_length = std::min(start->size(), limit.size()); + size_t diff_index = 0; + while ((diff_index < min_length) && + ((*start)[diff_index] == limit[diff_index])) { + diff_index++; + } + + if (diff_index >= min_length) { + // Do not shorten if one string is a prefix of the other + } else { + uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]); + if (diff_byte < static_cast<uint8_t>(0xff) && + diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) { + (*start)[diff_index]++; + start->resize(diff_index + 1); + assert(Compare(*start, limit) < 0); + } + } + } + + void FindShortSuccessor(CudaSlice* key) const override { + // Find first character that can be incremented + size_t n = key->size(); + for (size_t i = 0; i < n; i++) { + const uint8_t byte = (*key)[i]; + if (byte != static_cast<uint8_t>(0xff)) { + (*key)[i] = byte + 1; + key->resize(i + 1); + return; + } + } + // *key is a run of 0xffs. Leave it alone. + } +}; +} // namespace + +const Comparator* BytewiseComparator() { + static NoDestructor<BytewiseComparatorImpl> singleton; + return singleton.get(); +} + +template<typename T> +__device__ int cudaMemcmp(const T * a, const T * b, size_t min_length) { + for (size_t i = 0; i < min_length ; i++) { + if (a[i] == b[i]) + continue; + return a[i] < b[i] ? -1 : 1; + } + return 0; +} + +/*__device__ inline bool operator==(const CudaSlice& x, const CudaSlice& y) { + return ((x.size() == y.size()) && + (cudaMemcmp(x.data(), y.data(), x.size()) == 0)); +}*/ + +/*inline bool operator!=(const CudaSlice& x, const CudaSlice& y) { return !(x == y); }*/ + + + +__device__ inline int CudaSlice::compare(const CudaSlice& b) const { + const size_t min_len = (size_ < b.size_) ? size_ : b.size_; + int r = cudaMemcmp(data_, b.data_, min_len); + if (r == 0) { + if (size_ < b.size_) + r = -1; + else if (size_ > b.size_) + r = +1; + } + return r; +} + + + +class MemTableCuda { + public: + // MemTables are reference counted. The initial reference count + // is zero and the caller must call Ref() at least once. + __device__ explicit MemTableCuda(const InternalKeyComparator& comparator); + + MemTableCuda(const MemTableCuda&) = delete; + MemTableCuda& operator=(const MemTableCuda&) = delete; + + // Increase reference count. + void Ref() { ++refs_; } + + // Drop reference count. Delete if no more references exist. + void Unref() { + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + delete this; + } + } + + // Returns an estimate of the number of bytes of data in use by this + // data structure. It is safe to call when MemTable is being modified. + size_t ApproximateMemoryUsage(); + + // Return an iterator that yields the contents of the memtable. + // + // The caller must ensure that the underlying MemTable remains live + // while the returned iterator is live. The keys returned by this + // iterator are internal keys encoded by AppendInternalKey in the + // db/format.{h,cc} module. + Iterator* NewIterator(); + + // Add an entry into memtable that maps key to value at the + // specified sequence number and with the specified type. + // Typically value will be empty if type==kTypeDeletion. + __device__ void Add(SequenceNumber seq, ValueType type, const CudaSlice& key, + const Slice& value); + + // If memtable contains a value for key, store it in *value and return true. + // If memtable contains a deletion for key, store a NotFound() error + // in *status and return true. + // Else, return false. + __device__ bool Get(const LookupKey& key, std::string* value, Status* s); + + private: + friend class MemTableCudaIterator; + friend class MemTableCudaBackwardIterator; + friend __global__ void MemTableCudaAdd_(MemTableCuda *, size_t, char *); + friend __global__ void MemTableCudaGet_(MemTableCuda *, char *, char **, size_t* malloc_size); + + struct KeyComparator { + const InternalKeyComparator comparator; + __device__ __host__ explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {} + __device__ int operator()(const char* a, const char* b) const; + __device__ __host__ ~KeyComparator() = default; + }; + + typedef SkipList<const char*, KeyComparator> Table; + + __device__ Table::Iterator getIter() { + Table::Iterator iter(&this->table_); + return iter; + } + + ~MemTableCuda(); // Private since only Unref() should be used to delete it + + KeyComparator comparator_; + int refs_; + ArenaCuda arena_; + Table table_; +}; + } // namespace leveldb #endif // STORAGE_LEVELDB_DB_MEMTABLE_H_ |