44 static constexpr size_t INITIAL_BUFFER_SIZE = 4 * 1024;
45 static constexpr size_t INITIAL_EVENT_LIST_SIZE = 512;
46 const size_t CHUNK_SIZE = 4096;
50 std::vector<struct kevent> event_list;
51 std::unordered_map<int, ClientData> client_buffers;
63 int createServerSocket(
const std::string &addr,
int port) {
64 sockaddr_in server_addr;
65 server_addr.sin_family = AF_INET;
66 server_addr.sin_port = htons(port);
68 if (inet_pton(AF_INET, addr.c_str(), &server_addr.sin_addr) <= 0) {
69 std::cerr <<
"Invalid address: " << addr << std::endl;
73 int sock = socket(AF_INET, SOCK_STREAM, 0);
75 std::cerr <<
"Failed to create socket" << std::endl;
80 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(opt)) < 0) {
81 std::cerr <<
"Socket option setting failed" << std::endl;
86 if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0 ||
87 bind(sock,
reinterpret_cast<sockaddr *
>(&server_addr),
sizeof(server_addr)) < 0 ||
88 listen(sock, SOMAXCONN) < 0) {
89 std::cerr <<
"Socket setup failed" << std::endl;
94 if (listen(sock, SOMAXCONN) == -1) {
95 std::cerr <<
"Failed to listen on socket" << std::endl;
109 std::cerr <<
"Failed to create kqueue" << std::endl;
122 bool addToKqueue(
int fd,
int filter,
int flags) {
123 struct kevent change_event;
124 EV_SET(&change_event, fd, filter, flags, 0, 0, NULL);
125 if (kevent(kq, &change_event, 1, NULL, 0, NULL) == -1) {
126 std::cerr <<
"Failed to add to kqueue for fd " << fd << std::endl;
136 void handleNewConnection() {
137 sockaddr_in client_addr;
138 socklen_t client_len =
sizeof(client_addr);
141 int client_socket = accept(server_socket, (sockaddr *)&client_addr, &client_len);
142 if (client_socket == -1) {
143 if (errno == EAGAIN || errno == EWOULDBLOCK) {
146 std::cerr <<
"Connection failed: " << strerror(errno) << std::endl;
151 if (fcntl(client_socket, F_SETFL, O_NONBLOCK) < 0) {
152 std::cerr <<
"Failed to set non-blocking on client socket" << std::endl;
153 close(client_socket);
157 if (addToKqueue(client_socket, EVFILT_READ, EV_ADD)) {
159 client_buffers[client_socket].buffer.reserve(INITIAL_BUFFER_SIZE);
161 close(client_socket);
172 void handle_op(
Resp &resp,
int client_fd) {
174 std::ostringstream response;
175 switch (resp.operation) {
177 std::pair<bool, std::string> result = lsm.get(resp.key);
182 lsm.put(resp.key, resp.value);
186 lsm.remove(resp.key);
193 send(client_fd, response.str().c_str(), response.str().size(), 0);
194 }
catch (
const std::exception &e) {
195 std::cerr <<
"Exception in handle_op: " << e.what() << std::endl;
197 send(client_fd, message.c_str(), message.size(), 0);
207 void handleClientMessage(
int client_fd) {
208 ClientData &client_data = client_buffers[client_fd];
210 char temp_buffer[CHUNK_SIZE];
213 ssize_t bytes_read = recv(client_fd, temp_buffer, CHUNK_SIZE, 0);
215 if (bytes_read < 0) {
216 if (errno == EAGAIN || errno == EWOULDBLOCK) {
219 std::cerr <<
"recv error for client " << client_fd <<
": " << strerror(errno) << std::endl;
220 closeConnection(client_fd);
223 }
else if (bytes_read == 0) {
224 closeConnection(client_fd);
228 client_data.buffer.insert(client_data.buffer.end(), temp_buffer, temp_buffer + bytes_read);
229 client_data.total_bytes += bytes_read;
231 if (client_data.buffer.size() + CHUNK_SIZE > client_data.buffer.capacity()) {
232 client_data.buffer.reserve(client_data.buffer.capacity() * 2);
245 if (client_data.total_bytes > 0) {
246 client_data.buffer.push_back(
'\0');
248 client_data.buffer.clear();
249 client_data.total_bytes = 0;
253 std::cerr <<
"Error: " << resp.error << std::endl;
254 send(client_fd, message.c_str(), message.size(), 0);
257 handle_op(resp, client_fd);
266 void closeConnection(
int client_fd) {
267 addToKqueue(client_fd, EVFILT_READ, EV_DELETE);
268 client_buffers.erase(client_fd);
280 server_socket = createServerSocket(addr, port);
281 if (server_socket == -1) {
282 throw std::runtime_error(
"Failed to create server socket");
287 close(server_socket);
288 throw std::runtime_error(
"Failed to create kqueue");
291 event_list.resize(INITIAL_EVENT_LIST_SIZE);
293 if (!addToKqueue(server_socket, EVFILT_READ, EV_ADD)) {
294 close(server_socket);
296 throw std::runtime_error(
"Failed to add server socket to kqueue");
299 std::cout <<
"Server is listening on " << addr <<
":" << port << std::endl;
309 int new_events = kevent(kq, NULL, 0, event_list.data(), event_list.size(), NULL);
310 if (new_events == -1) {
311 if (errno == EINTR) {
314 std::cerr <<
"kevent error: " << strerror(errno) << std::endl;
318 if (new_events ==
static_cast<int>(event_list.size())) {
319 event_list.resize(event_list.size() * 2);
322 for (
int i = 0; i < new_events; i++) {
323 int event_fd = event_list[i].ident;
325 if (event_list[i].flags & (EV_ERROR | EV_EOF)) {
326 if (event_fd != server_socket) {
327 closeConnection(event_fd);
331 event_fd == server_socket ? handleNewConnection() : handleClientMessage(event_fd);
337 for (
const auto &client : client_buffers) {
340 close(server_socket);