39 std::string SS_TABLE_PATH;
40 std::unique_ptr<MemTable> activeMemTable;
41 std::deque<std::unique_ptr<MemTable>> memTables;
42 std::deque<std::unique_ptr<SS_Table>> sstables;
44 std::mutex active_memtable_mtx;
45 std::mutex memtables_mtx;
46 std::mutex sstables_mtx;
47 std::mutex compaction_mtx;
49 std::condition_variable cv;
50 std::condition_variable compaction_cv;
52 std::atomic<bool> running;
53 std::thread flush_thread;
54 std::thread compaction_thread;
61 void rotate_memtable() {
62 std::unique_ptr<MemTable> new_memtable = std::make_unique<MemTable>();
63 memTables.push_back(std::move(activeMemTable));
64 activeMemTable = std::move(new_memtable);
74 std::unique_ptr<MemTable> memtable_to_flush;
76 std::unique_lock<std::mutex> lock(memtables_mtx);
77 cv.wait(lock, [
this] {
return !running || !memTables.empty(); });
79 if (!running && memTables.empty()) {
83 if (!memTables.empty()) {
84 memtable_to_flush = std::move(memTables.front());
85 memTables.erase(memTables.begin());
89 if (memtable_to_flush) {
90 flush_memtable(std::move(memtable_to_flush));
99 void flush_memtable(std::unique_ptr<MemTable> memtable) {
100 long long timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
101 std::chrono::system_clock::now().time_since_epoch())
103 std::string filename = SS_TABLE_PATH +
"sstable_" + std::to_string(timestamp);
106 std::lock_guard<std::mutex> lock(sstables_mtx);
107 std::unique_ptr<SS_Table> sstable = std::make_unique<SS_Table>(filename);
108 sstables.push_back(std::move(sstable));
111 compaction_cv.notify_one();
120 void compaction_worker() {
123 std::unique_lock<std::mutex> lock(compaction_mtx);
124 compaction_cv.wait(lock, [
this] {
125 std::lock_guard<std::mutex> sstables_lock(sstables_mtx);
132 perform_compaction();
133 std::this_thread::sleep_for(std::chrono::seconds(2));
142 void perform_compaction() {
143 std::vector<std::unique_ptr<SS_Table>> tables_to_compact;
145 std::lock_guard<std::mutex> lock(sstables_mtx);
153 for (
size_t i = 0; i < tables_to_take && !sstables.empty(); ++i) {
154 tables_to_compact.push_back(std::move(sstables.front()));
155 sstables.erase(sstables.begin());
159 if (tables_to_compact.empty()) {
163 std::sort(tables_to_compact.begin(), tables_to_compact.end(),
164 [](
const std::unique_ptr<SS_Table> &a,
const std::unique_ptr<SS_Table> &b) {
165 return a->getIndexFile() < b->getIndexFile();
167 std::unique_ptr<MemTable> merged_memtable = std::make_unique<MemTable>();
169 std::map<std::string, std::string> merged_data;
170 for (std::unique_ptr<SS_Table> &sstable : tables_to_compact) {
171 std::ifstream data_file(sstable->getDataFilename(), std::ios::binary);
172 if (!data_file.is_open()) {
175 while (!data_file.eof()) {
177 data_file.read(
reinterpret_cast<char *
>(&key_size),
sizeof(key_size));
178 if (data_file.eof()) {
182 std::string key(key_size,
'\0');
183 data_file.read(&key[0], key_size);
186 data_file.read(
reinterpret_cast<char *
>(&value_size),
sizeof(value_size));
188 std::string value(value_size,
'\0');
189 data_file.read(&value[0], value_size);
190 if (merged_data.find(key) == merged_data.end()) {
191 merged_data[key] = value;
196 for (std::pair<std::string, std::string> x : merged_data) {
198 merged_memtable->put(x.first, x.second);
202 std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
203 long long timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
204 now.time_since_epoch())
206 std::string filename = SS_TABLE_PATH +
"sstable_" + std::to_string(timestamp);
210 for (
auto &sstable : tables_to_compact) {
211 std::string index_file = sstable->getIndexFile();
212 std::string data_file = sstable->getDataFilename();
216 std::filesystem::remove(index_file);
217 std::filesystem::remove(data_file);
220 std::lock_guard<std::mutex> lock(sstables_mtx);
221 std::unique_ptr<SS_Table> new_sstable = std::make_unique<SS_Table>(filename);
222 sstables.push_back(std::move(new_sstable));
230 void load_existing_sstables() {
231 std::vector<std::string> files;
232 for (
const std::filesystem::directory_entry &entry : std::filesystem::directory_iterator(SS_TABLE_PATH)) {
233 files.push_back(entry.path().string());
235 for (std::string
const &file : files) {
238 std::unique_ptr<SS_Table> sstable = std::make_unique<SS_Table>(file, data_file +
DATA_EXTENSION);
239 if (sstable->loadIndex()) {
240 sstables.push_back(std::move(sstable));
245 std::sort(sstables.begin(), sstables.end(), [](
const std::unique_ptr<SS_Table> &a,
const std::unique_ptr<SS_Table> &b) {
246 return a->getIndexFile() < b->getIndexFile();
249 compaction_cv.notify_one();
260 std::filesystem::create_directories(SS_TABLE_PATH);
262 flush_thread = std::thread(&LSMTree::flush_worker,
this);
263 compaction_thread = std::thread(&LSMTree::compaction_worker,
this);
264 activeMemTable = std::make_unique<MemTable>();
266 load_existing_sstables();
275 compaction_cv.notify_all();
277 if (flush_thread.joinable()) {
280 if (compaction_thread.joinable()) {
281 compaction_thread.join();
290 void put(
const std::string &key,
const std::string &value) {
292 std::lock_guard<std::mutex> lock(active_memtable_mtx);
293 activeMemTable->put(key, value);
305 std::pair<bool, std::string>
get(
const std::string &key) {
307 std::lock_guard<std::mutex> lock(active_memtable_mtx);
308 std::pair<bool, std::string> result = activeMemTable->get(key);
311 }
else if (!result.first && result.second ==
TOMBSTONE) {
312 return {
false, result.second};
317 std::lock_guard<std::mutex> lock(memtables_mtx);
318 for (std::reverse_iterator it = memTables.rbegin(); it != memTables.rend(); ++it) {
319 std::pair<bool, std::string> result = (*it)->get(key);
322 }
else if (!result.first && result.second ==
TOMBSTONE) {
323 return {
false, result.second};
329 std::lock_guard<std::mutex> lock(sstables_mtx);
330 for (std::reverse_iterator it = sstables.rbegin(); it != sstables.rend(); ++it) {
331 std::pair<bool, std::string> result = (*it)->getValue(key);
334 }
else if (!result.first && result.second ==
TOMBSTONE) {
335 return {
false, result.second};
347 std::lock_guard<std::mutex> lock(active_memtable_mtx);