Blink DB Documentation v1.0.0
Blink DB Documentation
Loading...
Searching...
No Matches
lsm.hpp
Go to the documentation of this file.
1
9#ifndef LSM
10#define LSM
11
12#include "constants.hpp"
13#include "memtable.hpp"
14#include "sstable.hpp"
15
16#include <chrono>
17#include <deque>
18#include <filesystem>
19#include <fstream>
20#include <iterator>
21#include <map>
22#include <memory>
23#include <string>
24#include <thread>
25#include <vector>
26
36
37class LSMTree {
38private:
39 std::string SS_TABLE_PATH;
40 std::unique_ptr<MemTable> activeMemTable;
41 std::deque<std::unique_ptr<MemTable>> memTables;
42 std::deque<std::unique_ptr<SS_Table>> sstables;
43
44 std::mutex active_memtable_mtx;
45 std::mutex memtables_mtx;
46 std::mutex sstables_mtx;
47 std::mutex compaction_mtx;
48
49 std::condition_variable cv;
50 std::condition_variable compaction_cv;
51
52 std::atomic<bool> running;
53 std::thread flush_thread;
54 std::thread compaction_thread;
55
61 void rotate_memtable() {
62 std::unique_ptr<MemTable> new_memtable = std::make_unique<MemTable>();
63 memTables.push_back(std::move(activeMemTable));
64 activeMemTable = std::move(new_memtable);
65 cv.notify_one();
66 }
67
72 void flush_worker() {
73 while (running) {
74 std::unique_ptr<MemTable> memtable_to_flush;
75 {
76 std::unique_lock<std::mutex> lock(memtables_mtx);
77 cv.wait(lock, [this] { return !running || !memTables.empty(); });
78
79 if (!running && memTables.empty()) {
80 break;
81 }
82
83 if (!memTables.empty()) {
84 memtable_to_flush = std::move(memTables.front());
85 memTables.erase(memTables.begin());
86 }
87 }
88
89 if (memtable_to_flush) {
90 flush_memtable(std::move(memtable_to_flush));
91 }
92 }
93 }
94
99 void flush_memtable(std::unique_ptr<MemTable> memtable) {
100 long long timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
101 std::chrono::system_clock::now().time_since_epoch())
102 .count();
103 std::string filename = SS_TABLE_PATH + "sstable_" + std::to_string(timestamp);
104 bool success = SS_Table::createFromMemTable(filename, memtable.get());
105 if (success) {
106 std::lock_guard<std::mutex> lock(sstables_mtx);
107 std::unique_ptr<SS_Table> sstable = std::make_unique<SS_Table>(filename);
108 sstables.push_back(std::move(sstable));
109
110 if (sstables.size() >= MAX_SSTABLE_COUNT) {
111 compaction_cv.notify_one();
112 }
113 }
114 }
115
120 void compaction_worker() {
121 while (running) {
122 {
123 std::unique_lock<std::mutex> lock(compaction_mtx);
124 compaction_cv.wait(lock, [this] {
125 std::lock_guard<std::mutex> sstables_lock(sstables_mtx);
126 return !running || sstables.size() >= MAX_SSTABLE_COUNT;
127 });
128
129 if (!running) {
130 break;
131 }
132 perform_compaction();
133 std::this_thread::sleep_for(std::chrono::seconds(2));
134 }
135 }
136 }
137
142 void perform_compaction() {
143 std::vector<std::unique_ptr<SS_Table>> tables_to_compact;
144 {
145 std::lock_guard<std::mutex> lock(sstables_mtx);
146
147 if (sstables.size() < MAX_SSTABLE_COUNT) {
148 return;
149 }
150
151 size_t tables_to_take = MAX_SSTABLE_COUNT;
152
153 for (size_t i = 0; i < tables_to_take && !sstables.empty(); ++i) {
154 tables_to_compact.push_back(std::move(sstables.front()));
155 sstables.erase(sstables.begin());
156 }
157 }
158
159 if (tables_to_compact.empty()) {
160 return;
161 }
162
163 std::sort(tables_to_compact.begin(), tables_to_compact.end(),
164 [](const std::unique_ptr<SS_Table> &a, const std::unique_ptr<SS_Table> &b) {
165 return a->getIndexFile() < b->getIndexFile();
166 });
167 std::unique_ptr<MemTable> merged_memtable = std::make_unique<MemTable>();
168
169 std::map<std::string, std::string> merged_data;
170 for (std::unique_ptr<SS_Table> &sstable : tables_to_compact) {
171 std::ifstream data_file(sstable->getDataFilename(), std::ios::binary);
172 if (!data_file.is_open()) {
173 continue;
174 }
175 while (!data_file.eof()) {
176 uint32_t key_size;
177 data_file.read(reinterpret_cast<char *>(&key_size), sizeof(key_size));
178 if (data_file.eof()) {
179 break;
180 }
181
182 std::string key(key_size, '\0');
183 data_file.read(&key[0], key_size);
184
185 uint32_t value_size;
186 data_file.read(reinterpret_cast<char *>(&value_size), sizeof(value_size));
187
188 std::string value(value_size, '\0');
189 data_file.read(&value[0], value_size);
190 if (merged_data.find(key) == merged_data.end()) {
191 merged_data[key] = value;
192 }
193 }
194 }
195
196 for (std::pair<std::string, std::string> x : merged_data) {
197 if (x.second != TOMBSTONE) {
198 merged_memtable->put(x.first, x.second);
199 }
200 }
201
202 std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
203 long long timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
204 now.time_since_epoch())
205 .count();
206 std::string filename = SS_TABLE_PATH + "sstable_" + std::to_string(timestamp);
207 bool success = SS_Table::createFromMemTable(filename, merged_memtable.get());
208
209 if (success) {
210 for (auto &sstable : tables_to_compact) {
211 std::string index_file = sstable->getIndexFile();
212 std::string data_file = sstable->getDataFilename();
213
214 sstable.reset();
215
216 std::filesystem::remove(index_file);
217 std::filesystem::remove(data_file);
218 }
219
220 std::lock_guard<std::mutex> lock(sstables_mtx);
221 std::unique_ptr<SS_Table> new_sstable = std::make_unique<SS_Table>(filename);
222 sstables.push_back(std::move(new_sstable));
223 }
224 }
225
230 void load_existing_sstables() {
231 std::vector<std::string> files;
232 for (const std::filesystem::directory_entry &entry : std::filesystem::directory_iterator(SS_TABLE_PATH)) {
233 files.push_back(entry.path().string());
234 }
235 for (std::string const &file : files) {
236 if (file.find(INDEX_EXTENSION) != std::string::npos) {
237 std::string data_file = file.substr(0, file.find(INDEX_EXTENSION));
238 std::unique_ptr<SS_Table> sstable = std::make_unique<SS_Table>(file, data_file + DATA_EXTENSION);
239 if (sstable->loadIndex()) {
240 sstables.push_back(std::move(sstable));
241 }
242 }
243 }
244 // sort sstables by filename based on the creation time the filenames are in the format data/sstable_timestamp.index and data/sstable_timestamp.data
245 std::sort(sstables.begin(), sstables.end(), [](const std::unique_ptr<SS_Table> &a, const std::unique_ptr<SS_Table> &b) {
246 return a->getIndexFile() < b->getIndexFile();
247 });
248 if (sstables.size() >= MAX_SSTABLE_COUNT) {
249 compaction_cv.notify_one();
250 }
251 }
252
253public:
258 : SS_TABLE_PATH(DATA_DIR),
259 running(true) {
260 std::filesystem::create_directories(SS_TABLE_PATH);
261
262 flush_thread = std::thread(&LSMTree::flush_worker, this);
263 compaction_thread = std::thread(&LSMTree::compaction_worker, this);
264 activeMemTable = std::make_unique<MemTable>();
265
266 load_existing_sstables();
267 }
268
273 running = false;
274 cv.notify_all();
275 compaction_cv.notify_all();
276
277 if (flush_thread.joinable()) {
278 flush_thread.join();
279 }
280 if (compaction_thread.joinable()) {
281 compaction_thread.join();
282 }
283 }
284
290 void put(const std::string &key, const std::string &value) {
291 {
292 std::lock_guard<std::mutex> lock(active_memtable_mtx);
293 activeMemTable->put(key, value);
294 if (activeMemTable->getSize() >= MAX_MEMTABLE_SIZE) {
295 rotate_memtable();
296 }
297 }
298 }
299
305 std::pair<bool, std::string> get(const std::string &key) {
306 {
307 std::lock_guard<std::mutex> lock(active_memtable_mtx);
308 std::pair<bool, std::string> result = activeMemTable->get(key);
309 if (result.first) {
310 return result;
311 } else if (!result.first && result.second == TOMBSTONE) {
312 return {false, result.second};
313 }
314 }
315
316 {
317 std::lock_guard<std::mutex> lock(memtables_mtx);
318 for (std::reverse_iterator it = memTables.rbegin(); it != memTables.rend(); ++it) {
319 std::pair<bool, std::string> result = (*it)->get(key);
320 if (result.first) {
321 return result;
322 } else if (!result.first && result.second == TOMBSTONE) {
323 return {false, result.second};
324 }
325 }
326 }
327
328 {
329 std::lock_guard<std::mutex> lock(sstables_mtx);
330 for (std::reverse_iterator it = sstables.rbegin(); it != sstables.rend(); ++it) {
331 std::pair<bool, std::string> result = (*it)->getValue(key);
332 if (result.first) {
333 return result;
334 } else if (!result.first && result.second == TOMBSTONE) {
335 return {false, result.second};
336 }
337 }
338 }
339 return {false, ""};
340 }
341
346 void remove(const std::string &key) {
347 std::lock_guard<std::mutex> lock(active_memtable_mtx);
348 activeMemTable->put(key, TOMBSTONE);
349 }
350};
351
352#endif
std::pair< bool, std::string > get(const std::string &key)
Retrieves the value associated with a given key.
Definition lsm.hpp:305
~LSMTree()
Destructor that gracefully shuts down the LSMTree, ensuring all background tasks complete.
Definition lsm.hpp:272
LSMTree()
Constructs an LSMTree instance and initializes background worker threads.
Definition lsm.hpp:257
void remove(const std::string &key)
Marks a key as deleted by inserting a tombstone value.
Definition lsm.hpp:346
void put(const std::string &key, const std::string &value)
Inserts a key-value pair into the LSM Tree.
Definition lsm.hpp:290
static bool createFromMemTable(const std::string &filename, MemTable *memTable)
Creates an SSTable from a given MemTable.
Definition sstable.hpp:84
Constants for the database engine.
const std::string INDEX_EXTENSION
Index Extenstion Constant.
Definition constants.hpp:29
const std::string DATA_EXTENSION
Data Extension Constant.
Definition constants.hpp:37
const size_t MAX_MEMTABLE_SIZE
Constants for memory limit in bytes. (Default: 32MB).
Definition constants.hpp:49
const size_t MAX_SSTABLE_COUNT
Constants for the maximum number of sstables.
Definition constants.hpp:57
#define TOMBSTONE
TOMOBSTONE constant.
Definition constants.hpp:22
const std::string DATA_DIR
Data Directory Constant.
Definition constants.hpp:43
MemTable Class.
SSTable implementation.