From ffdc63181fde3b38a02c6117afe366151efe1ec4 Mon Sep 17 00:00:00 2001 From: varunrmantri23 Date: Wed, 21 May 2025 16:08:35 +0530 Subject: [PATCH] feat: implement redis-style transactions (multi/exec/discard) --- Makefile | 2 +- src/commands.c | 173 ++++++++++++++++++++++++++++++++++++++------- src/commands.h | 3 + src/crimsoncache.h | 9 +++ src/main.c | 40 ++++++++++- src/replication.c | 1 - src/transaction.c | 123 ++++++++++++++++++++++++++++++++ src/transaction.h | 21 ++++++ 8 files changed, 341 insertions(+), 31 deletions(-) create mode 100644 src/transaction.c create mode 100644 src/transaction.h diff --git a/Makefile b/Makefile index c67efbc..fd17344 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -Wall -Wextra -pedantic -std=c11 -g +CFLAGS = -Wall -Wextra -pedantic -g LDFLAGS = -pthread SRC_DIR = src diff --git a/src/commands.c b/src/commands.c index 93c6c4d..d4ee10f 100644 --- a/src/commands.c +++ b/src/commands.c @@ -13,9 +13,11 @@ #include #include #include "crimsoncache.h" +#include "transaction.h" extern void track_command_change(void); extern volatile sig_atomic_t server_running; +extern client_t *get_client_by_socket(int socket); // fallback implementations if not available #ifndef HAVE_STRDUP @@ -56,6 +58,9 @@ static command_def commands[] = { {"role", role_command, 1, 1}, {"incr", incr_command, 2, 2}, {"replconf", replconf_command, 2, -1}, + {"multi", multi_command, 1, 1}, + {"exec", exec_command, 1, 1}, + {"discard", discard_command, 1, 1}, {NULL, NULL, 0, 0} // sentinel to mark end of array }; @@ -197,58 +202,102 @@ void free_tokens(char **tokens, int count) { extern void track_command_change(); +// this is where we figure out what the client wants to do cmd_result execute_command(int client_sock, char *input, dict *db) { - int argc = 0; - char **argv = tokenize_command(input, &argc); - cmd_result result = CMD_UNKNOWN; - + int argc = 0; // to count how many parts the command has + char *input_copy = strdup(input); // make a copy so strtok doesn't mess up the original + if (!input_copy) { + // oops, couldn't make a copy, something's wrong + if (client_sock >= 0) { + reply_error(client_sock, "err out of memory"); + } + return CMD_ERR; + } + + char **argv = tokenize_command(input_copy, &argc); // break the command into pieces + cmd_result result = CMD_UNKNOWN; // let's assume we don't know the command yet + + // if we didn't get any pieces, or something went wrong tokenizing if (!argv || argc == 0) { - if (argv) free_tokens(argv, argc); - reply_error(client_sock, "ERR empty command"); + if (argv) free_tokens(argv, argc); // clean up if argv was allocated + free(input_copy); // clean up the copy + if (client_sock >= 0) { + reply_error(client_sock, "err empty command"); + } return CMD_ERR; } - - for (size_t i = 0; i < strlen(argv[0]); i++) { + + // make the command name lowercase so "SET" and "set" are the same + for (size_t i = 0; argv[0][i]; i++) { argv[0][i] = tolower(argv[0][i]); } + + // see if this client is in a transaction + client_t *client = client_sock >= 0 ? get_client_by_socket(client_sock) : NULL; + + // is this a command that controls transactions, like multi, exec, or discard? + int is_tx_command = strcmp(argv[0], "multi") == 0 || + strcmp(argv[0], "exec") == 0 || + strcmp(argv[0], "discard") == 0; + + // if we're in a transaction and this isn't a transaction control command, just queue it + if (client && client->in_transaction && !is_tx_command) { + // use the original input string for queuing to preserve quotes and stuff + if (tx_queue_command(client, input)) { + reply_string(client_sock, "queued"); + } else { + reply_error(client_sock, "err queue command failed"); + client->transaction_errors = 1; // mark that something went wrong + } + free_tokens(argv, argc); // clean up the pieces + free(input_copy); // clean up the copy + return CMD_OK; // we're done for now, it's queued + } + + // okay, not queuing, let's find the actual command to run for (int i = 0; commands[i].name != NULL; i++) { if (strcmp(argv[0], commands[i].name) == 0) { - // check argument count + // found it! now check if they gave the right number of arguments if ((commands[i].min_args > 0 && argc < commands[i].min_args) || - (commands[i].max_args > 0 && argc > commands[i].max_args)) { - reply_error(client_sock, "ERR wrong number of arguments"); + (commands[i].max_args > 0 && argc > commands[i].max_args && commands[i].max_args != -1)) { // -1 means any number of args is fine + if (client_sock >= 0) { + reply_error(client_sock, "err wrong number of arguments"); + } result = CMD_ERR; } else { + // looks good, run the command's handler function result = commands[i].handler(client_sock, argc, argv, db); } - break; + break; // no need to check other commands } } - + + // if we went through all commands and didn't find it if (result == CMD_UNKNOWN) { - reply_error(client_sock, "ERR unknown command"); - result = CMD_ERR; + if (client_sock >= 0) { // only send error if it's a real client connection + reply_error(client_sock, "err unknown command"); + } + result = CMD_ERR; // make sure we return an error status } - // Add this after applying the commands but before free_tokens - // Propagate write commands to replicas - if (result == CMD_OK && server_repl.role == ROLE_PRIMARY && client_sock >= 0) { - // Commands that modify data + // if the command was okay, and we're the primary server, and it was a write command... + // then we need to tell our replicas about it + // but only if we're not in a transaction (exec will handle propagation for transactions) + if (result == CMD_OK && (!client || !client->in_transaction) && server_repl.role == ROLE_PRIMARY && client_sock >= 0) { if (strcasecmp(argv[0], "set") == 0 || strcasecmp(argv[0], "del") == 0 || strcasecmp(argv[0], "expire") == 0 || strcasecmp(argv[0], "incr") == 0) { - - // Track for persistence - track_command_change(); - - // Propagate to replicas with proper formatting + + track_command_change(); // for persistence, like auto-saving + // use the original input for replication to keep quotes and exact format replication_feed_slaves(input, strlen(input)); } } - free_tokens(argv, argc); - return result; + free_tokens(argv, argc); // always clean up the token pieces + free(input_copy); // and the copy we made + return result; // tell the caller how it went } // response formatters @@ -624,6 +673,78 @@ cmd_result replconf_command(int client_sock, int argc, char **argv, dict *db) { return CMD_OK; } +// MULTI command - begin transaction +cmd_result multi_command(int client_sock, int argc, char **argv, dict *db) { + (void)argc; + (void)argv; + (void)db; + + client_t *client = get_client_by_socket(client_sock); + if (!client) { + reply_error(client_sock, "ERR client not found"); + return CMD_ERR; + } + + if (client->in_transaction) { + reply_error(client_sock, "ERR MULTI calls can not be nested"); + return CMD_ERR; + } + + client->in_transaction = 1; + client->transaction_errors = 0; + reply_string(client_sock, "OK"); + return CMD_OK; +} + +// EXEC command - execute transaction +cmd_result exec_command(int client_sock, int argc, char **argv, dict *db) { + (void)argc; + (void)argv; + + client_t *client = get_client_by_socket(client_sock); + if (!client) { + reply_error(client_sock, "ERR client not found"); + return CMD_ERR; + } + + if (!client->in_transaction) { + reply_error(client_sock, "ERR EXEC without MULTI"); + return CMD_ERR; + } + + printf("Executing transaction with %d commands\n", client->queue_size); + tx_execute_commands(client, db); + + if (client->in_transaction != 0) { + printf("WARNING: Transaction state not properly reset, forcing to 0\n"); + client->in_transaction = 0; + } + + printf("Transaction execution complete, in_transaction=%d\n", client->in_transaction); + return CMD_OK; +} + +// DISCARD command - discard transaction +cmd_result discard_command(int client_sock, int argc, char **argv, dict *db) { + (void)argc; + (void)argv; + (void)db; + + client_t *client = get_client_by_socket(client_sock); + if (!client) { + reply_error(client_sock, "ERR client not found"); + return CMD_ERR; + } + + if (!client->in_transaction) { + reply_error(client_sock, "ERR DISCARD without MULTI"); + return CMD_ERR; + } + + tx_discard_commands(client); + return CMD_OK; +} + diff --git a/src/commands.h b/src/commands.h index b78a813..4a7f4de 100644 --- a/src/commands.h +++ b/src/commands.h @@ -55,5 +55,8 @@ cmd_result replicaof_command(int client_sock, int argc, char **argv, dict *db); cmd_result role_command(int client_sock, int argc, char **argv, dict *db); cmd_result incr_command(int client_sock, int argc, char **argv, dict *db); cmd_result replconf_command(int client_sock, int argc, char **argv, dict *db); +cmd_result multi_command(int client_sock, int argc, char **argv, dict *db); +cmd_result exec_command(int client_sock, int argc, char **argv, dict *db); +cmd_result discard_command(int client_sock, int argc, char **argv, dict *db); #endif /* COMMANDS_H */ \ No newline at end of file diff --git a/src/crimsoncache.h b/src/crimsoncache.h index 443ae75..62e839e 100644 --- a/src/crimsoncache.h +++ b/src/crimsoncache.h @@ -16,11 +16,20 @@ typedef struct client { struct sockaddr_in address; char buffer[BUFFER_SIZE]; int buffer_pos; + + int in_transaction; // flag to indicate if in MULTI state + int transaction_errors; // tracks if any errors occurred during MULTI + char **queued_commands; + int queue_size; // current size of queue + int queue_capacity; // allocated capacity of queue } client_t; // function prototypes void handle_signal(int sig); void *handle_client(void *arg); +void register_client(client_t *client); +void unregister_client(client_t *client); +client_t *get_client_by_socket(int socket); // global dictionary extern dict *server_db; diff --git a/src/main.c b/src/main.c index e4c8ebb..aeb2774 100644 --- a/src/main.c +++ b/src/main.c @@ -15,7 +15,39 @@ #include "commands.h" #include "persistence.h" #include "replication.h" +#include "transaction.h" +// #define MAX_CLIENTS 1000 +client_t *client_list[MAX_CLIENTS]; + +void register_client(client_t *client) { + for (int i = 0; i < MAX_CLIENTS; i++) { + if (client_list[i] == NULL) { + client_list[i] = client; + tx_init(client); // Initialize transaction state + break; + } + } +} + +void unregister_client(client_t *client) { + for (int i = 0; i < MAX_CLIENTS; i++) { + if (client_list[i] == client) { + tx_cleanup(client); // Clean up transaction resources + client_list[i] = NULL; + break; + } + } +} + +client_t *get_client_by_socket(int socket) { + for (int i = 0; i < MAX_CLIENTS; i++) { + if (client_list[i] && client_list[i]->socket == socket) { + return client_list[i]; + } + } + return NULL; +} // function prototypes for background threads void *cleanup_expired_keys(void *arg); @@ -88,9 +120,9 @@ void *auto_save_thread(void *arg) { // thread to handle replication tasks void *replication_thread(void *arg) { - (void)arg; // unused parameter + (void)arg; - char buffer[BUFFER_SIZE * 4]; // Larger buffer to handle multiple commands + char buffer[BUFFER_SIZE * 4]; // buffer for incoming data int buffer_pos = 0; while (server_running) { @@ -190,6 +222,8 @@ void *handle_client(void *arg) { printf("new client connected: %s:%d\n", client_addr, ntohs(client->address.sin_port)); + register_client(client); + while (server_running) { int bytes_read = recv(client_sock, buffer, BUFFER_SIZE - 1, 0); if (bytes_read <= 0) { @@ -200,10 +234,10 @@ void *handle_client(void *arg) { buffer[bytes_read] = '\0'; - // process command using the command module execute_command(client_sock, buffer, server_db); } + unregister_client(client); close(client_sock); free(client); return NULL; diff --git a/src/replication.c b/src/replication.c index 1c142c7..0039cce 100644 --- a/src/replication.c +++ b/src/replication.c @@ -100,7 +100,6 @@ void sync_replica(int fd) { if (written == cmd_len) { synced_keys++; - // Small delay to allow command to be processed struct timespec ts = {0, 10000000}; // 10ms nanosleep(&ts, NULL); diff --git a/src/transaction.c b/src/transaction.c new file mode 100644 index 0000000..85fe643 --- /dev/null +++ b/src/transaction.c @@ -0,0 +1,123 @@ +#define _POSIX_C_SOURCE 200809L +#include "transaction.h" +#include "commands.h" +#include +#include +#include +#include + +// initialize transaction state for a client +void tx_init(client_t *client) { + client->in_transaction = 0; + client->transaction_errors = 0; + client->queued_commands = NULL; + client->queue_size = 0; + client->queue_capacity = 0; +} + +// clean up transaction resources +void tx_cleanup(client_t *client) { + printf("Cleaning up transaction state for client %p\n", (void*)client); + + if (!client) { + printf("ERROR: Null client in tx_cleanup\n"); + return; + } + + if (client->queued_commands) { + for (int i = 0; i < client->queue_size; i++) { + if (client->queued_commands[i]) { + free(client->queued_commands[i]); + } + } + free(client->queued_commands); + client->queued_commands = NULL; + } + + // Reset ALL transaction-related fields - CRITICAL + client->queue_size = 0; + client->queue_capacity = 0; + client->in_transaction = 0; // Most important field to reset + client->transaction_errors = 0; + + printf("Transaction cleanup complete, in_transaction=%d\n", client->in_transaction); +} + +// queue a command for later execution +int tx_queue_command(client_t *client, char *cmd) { + // expand queue if needed + if (client->queue_size >= client->queue_capacity) { + int new_capacity = client->queue_capacity == 0 ? 10 : client->queue_capacity * 2; + char **new_queue = realloc(client->queued_commands, new_capacity * sizeof(char*)); + if (!new_queue) { + return 0; // out of memory + } + client->queued_commands = new_queue; + client->queue_capacity = new_capacity; + } + + // add debug output to see the exact command being queued + printf("Queueing full command: '%s'\n", cmd); + + // store the full command + client->queued_commands[client->queue_size] = strdup(cmd); + if (!client->queued_commands[client->queue_size]) { + return 0; // out of memory + } + client->queue_size++; + return 1; +} + +// execute all queued commands +void tx_execute_commands(client_t *client, dict *db) { + if (client->transaction_errors) { + reply_error(client->socket, "EXECABORT Transaction discarded because of previous errors"); + tx_cleanup(client); + return; + } + + int queue_size = client->queue_size; + printf("Executing transaction with %d commands\n", queue_size); + + char **commands = malloc(sizeof(char*) * queue_size); + if (!commands) { + reply_error(client->socket, "ERR out of memory during transaction execution"); + tx_cleanup(client); + return; + } + + for (int i = 0; i < queue_size; i++) { + commands[i] = strdup(client->queued_commands[i]); + if (!commands[i]) { + for (int j = 0; j < i; j++) { + free(commands[j]); + } + free(commands); + reply_error(client->socket, "ERR out of memory during transaction execution"); + tx_cleanup(client); + return; + } + printf("Command to execute: '%s'\n", commands[i]); + } + + char buffer[32]; + snprintf(buffer, sizeof(buffer), "*%d\r\n", queue_size); + write(client->socket, buffer, strlen(buffer)); + + tx_cleanup(client); + + for (int i = 0; i < queue_size; i++) { + printf("Executing full command: '%s'\n", commands[i]); + execute_command(client->socket, commands[i], db); + free(commands[i]); + } + + free(commands); + printf("Transaction execution complete\n"); +} + +// discard all queued commands +void tx_discard_commands(client_t *client) { + tx_cleanup(client); + reply_string(client->socket, "OK"); +} \ No newline at end of file diff --git a/src/transaction.h b/src/transaction.h new file mode 100644 index 0000000..6497d69 --- /dev/null +++ b/src/transaction.h @@ -0,0 +1,21 @@ +#ifndef TRANSACTION_H +#define TRANSACTION_H + +#include "crimsoncache.h" +#include "dict.h" + +void tx_init(client_t *client); + + +void tx_cleanup(client_t *client); + + +int tx_queue_command(client_t *client, char *cmd); + + +void tx_execute_commands(client_t *client, dict *db); + + +void tx_discard_commands(client_t *client); + +#endif /* TRANSACTION_H */ \ No newline at end of file