Blink DB Documentation v1.0.0
Blink DB Documentation
Loading...
Searching...
No Matches
kqueue_server.hpp
Go to the documentation of this file.
1
12#ifndef KQUEUE_SERVER_HPP
13#define KQUEUE_SERVER_HPP
16#include "engine/lsm.hpp"
17
18#include <arpa/inet.h>
19#include <fcntl.h>
20#include <iostream>
21#include <sys/event.h>
22#include <unistd.h>
23
29struct ClientData {
30 std::vector<char> buffer;
31 size_t total_bytes = 0;
32};
33
43private:
44 static constexpr size_t INITIAL_BUFFER_SIZE = 4 * 1024;
45 static constexpr size_t INITIAL_EVENT_LIST_SIZE = 512;
46 const size_t CHUNK_SIZE = 4096;
47
48 int server_socket;
49 int kq;
50 std::vector<struct kevent> event_list;
51 std::unordered_map<int, ClientData> client_buffers;
52 LSMTree lsm;
53
63 int createServerSocket(const std::string &addr, int port) {
64 sockaddr_in server_addr;
65 server_addr.sin_family = AF_INET;
66 server_addr.sin_port = htons(port);
67
68 if (inet_pton(AF_INET, addr.c_str(), &server_addr.sin_addr) <= 0) {
69 std::cerr << "Invalid address: " << addr << std::endl;
70 return -1;
71 }
72
73 int sock = socket(AF_INET, SOCK_STREAM, 0);
74 if (sock == -1) {
75 std::cerr << "Failed to create socket" << std::endl;
76 return -1;
77 }
78
79 int opt = 1;
80 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
81 std::cerr << "Socket option setting failed" << std::endl;
82 close(sock);
83 return -1;
84 }
85
86 if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0 ||
87 bind(sock, reinterpret_cast<sockaddr *>(&server_addr), sizeof(server_addr)) < 0 ||
88 listen(sock, SOMAXCONN) < 0) {
89 std::cerr << "Socket setup failed" << std::endl;
90 close(sock);
91 return -1;
92 }
93
94 if (listen(sock, SOMAXCONN) == -1) {
95 std::cerr << "Failed to listen on socket" << std::endl;
96 close(sock);
97 return -1;
98 }
99 return sock;
100 }
101
106 int createKqueue() {
107 int kq = kqueue();
108 if (kq == -1) {
109 std::cerr << "Failed to create kqueue" << std::endl;
110 return -1;
111 }
112 return kq;
113 }
114
122 bool addToKqueue(int fd, int filter, int flags) {
123 struct kevent change_event;
124 EV_SET(&change_event, fd, filter, flags, 0, 0, NULL);
125 if (kevent(kq, &change_event, 1, NULL, 0, NULL) == -1) {
126 std::cerr << "Failed to add to kqueue for fd " << fd << std::endl;
127 return false;
128 }
129 return true;
130 }
131
136 void handleNewConnection() {
137 sockaddr_in client_addr;
138 socklen_t client_len = sizeof(client_addr);
139
140 while (true) {
141 int client_socket = accept(server_socket, (sockaddr *)&client_addr, &client_len);
142 if (client_socket == -1) {
143 if (errno == EAGAIN || errno == EWOULDBLOCK) {
144 break;
145 } else {
146 std::cerr << "Connection failed: " << strerror(errno) << std::endl;
147 break;
148 }
149 }
150
151 if (fcntl(client_socket, F_SETFL, O_NONBLOCK) < 0) {
152 std::cerr << "Failed to set non-blocking on client socket" << std::endl;
153 close(client_socket);
154 continue;
155 }
156
157 if (addToKqueue(client_socket, EVFILT_READ, EV_ADD)) {
158 client_buffers[client_socket] = ClientData();
159 client_buffers[client_socket].buffer.reserve(INITIAL_BUFFER_SIZE);
160 } else {
161 close(client_socket);
162 }
163 }
164 }
165
172 void handle_op(Resp &resp, int client_fd) {
173 try {
174 std::ostringstream response;
175 switch (resp.operation) {
176 case GET: {
177 std::pair<bool, std::string> result = lsm.get(resp.key);
178 response << RespEncoder::bulkString(result.second, !result.first);
179 break;
180 }
181 case SET:
182 lsm.put(resp.key, resp.value);
183 response << RespEncoder::simpleString("OK");
184 break;
185 case DEL:
186 lsm.remove(resp.key);
187 response << RespEncoder::integer(1);
188 break;
189 default:
190 response << RespEncoder::error("Unknown operation");
191 break;
192 }
193 send(client_fd, response.str().c_str(), response.str().size(), 0);
194 } catch (const std::exception &e) {
195 std::cerr << "Exception in handle_op: " << e.what() << std::endl;
196 std::string message = RespEncoder::error("Internal server error");
197 send(client_fd, message.c_str(), message.size(), 0);
198 }
199 }
200
207 void handleClientMessage(int client_fd) {
208 ClientData &client_data = client_buffers[client_fd];
209
210 char temp_buffer[CHUNK_SIZE];
211
212 while (true) {
213 ssize_t bytes_read = recv(client_fd, temp_buffer, CHUNK_SIZE, 0);
214
215 if (bytes_read < 0) {
216 if (errno == EAGAIN || errno == EWOULDBLOCK) {
217 break;
218 } else {
219 std::cerr << "recv error for client " << client_fd << ": " << strerror(errno) << std::endl;
220 closeConnection(client_fd);
221 return;
222 }
223 } else if (bytes_read == 0) {
224 closeConnection(client_fd);
225 return;
226 }
227
228 client_data.buffer.insert(client_data.buffer.end(), temp_buffer, temp_buffer + bytes_read);
229 client_data.total_bytes += bytes_read;
230
231 if (client_data.buffer.size() + CHUNK_SIZE > client_data.buffer.capacity()) {
232 client_data.buffer.reserve(client_data.buffer.capacity() * 2);
233 }
234
235 // size_t current_size = client_data.buffer.size();
236 // client_data.buffer.resize(current_size + bytes_read);
237 // memcpy(client_data.buffer.data() + current_size, temp_buffer, bytes_read);
238 // client_data.total_bytes += bytes_read;
239
240 // if (client_data.buffer.capacity() - client_data.buffer.size() < CHUNK_SIZE) {
241 // client_data.buffer.reserve(client_data.buffer.capacity() * 2);
242 // }
243 }
244
245 if (client_data.total_bytes > 0) {
246 client_data.buffer.push_back('\0');
247 Resp resp = RespDecoder::decode(client_data.buffer.data());
248 client_data.buffer.clear();
249 client_data.total_bytes = 0;
250
251 if (!resp.success) {
252 std::string message = RespEncoder::error(resp.error);
253 std::cerr << "Error: " << resp.error << std::endl;
254 send(client_fd, message.c_str(), message.size(), 0);
255 return;
256 }
257 handle_op(resp, client_fd);
258 }
259 }
260
266 void closeConnection(int client_fd) {
267 addToKqueue(client_fd, EVFILT_READ, EV_DELETE);
268 client_buffers.erase(client_fd);
269 close(client_fd);
270 }
271
272public:
279 KqueueServer(const std::string &addr, int port) {
280 server_socket = createServerSocket(addr, port);
281 if (server_socket == -1) {
282 throw std::runtime_error("Failed to create server socket");
283 }
284
285 kq = createKqueue();
286 if (kq == -1) {
287 close(server_socket);
288 throw std::runtime_error("Failed to create kqueue");
289 }
290
291 event_list.resize(INITIAL_EVENT_LIST_SIZE);
292
293 if (!addToKqueue(server_socket, EVFILT_READ, EV_ADD)) {
294 close(server_socket);
295 close(kq);
296 throw std::runtime_error("Failed to add server socket to kqueue");
297 }
298
299 std::cout << "Server is listening on " << addr << ":" << port << std::endl;
300 }
301
307 void run() {
308 while (true) {
309 int new_events = kevent(kq, NULL, 0, event_list.data(), event_list.size(), NULL);
310 if (new_events == -1) {
311 if (errno == EINTR) {
312 continue;
313 }
314 std::cerr << "kevent error: " << strerror(errno) << std::endl;
315 break;
316 }
317
318 if (new_events == static_cast<int>(event_list.size())) {
319 event_list.resize(event_list.size() * 2);
320 }
321
322 for (int i = 0; i < new_events; i++) {
323 int event_fd = event_list[i].ident;
324
325 if (event_list[i].flags & (EV_ERROR | EV_EOF)) {
326 if (event_fd != server_socket) {
327 closeConnection(event_fd);
328 }
329 continue;
330 }
331 event_fd == server_socket ? handleNewConnection() : handleClientMessage(event_fd);
332 }
333 }
334 }
335
336 ~KqueueServer() {
337 for (const auto &client : client_buffers) {
338 close(client.first);
339 }
340 close(server_socket);
341 close(kq);
342 }
343};
344#endif
Kqueue-based server class.
Definition kqueue_server.hpp:42
void run()
Run the server loop.
Definition kqueue_server.hpp:307
KqueueServer(const std::string &addr, int port)
KqueueServer constructor.
Definition kqueue_server.hpp:279
LSM Tree implementation.
Definition lsm.hpp:37
static Resp decode(const char *buffer, size_t length)
Decode a RESP command from a buffer.
Definition resp_decoder.hpp:62
static std::string simpleString(const std::string &str)
Converts a simple string to RESP format.
Definition resp_encoder.hpp:30
static std::string bulkString(const std::string &str, const bool isNull)
Converts a bulk string to RESP format.
Definition resp_encoder.hpp:75
static std::string error(const std::string &message)
Converts an error message to RESP format.
Definition resp_encoder.hpp:44
static std::string integer(int value)
Used to encode an integer to RESP format.
Definition resp_encoder.hpp:57
Response object for RESP commands.
Definition resp_decoder.hpp:37
LSM Tree implementation.
Used to encode messages into RESP (REdis Serialization Protocol) format.
Redis Serialization Protocol (RESP) Decoder.
Client Data.
Definition kqueue_server.hpp:29