Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -Wall -Wextra -pedantic -std=c11 -g
CFLAGS = -Wall -Wextra -pedantic -g
LDFLAGS = -pthread

SRC_DIR = src
Expand Down
173 changes: 147 additions & 26 deletions src/commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
#include <arpa/inet.h>
#include <pthread.h>
#include "crimsoncache.h"
#include "transaction.h"

extern void track_command_change(void);
extern volatile sig_atomic_t server_running;
extern client_t *get_client_by_socket(int socket);

// fallback implementations if not available
#ifndef HAVE_STRDUP
Expand Down Expand Up @@ -56,6 +58,9 @@ static command_def commands[] = {
{"role", role_command, 1, 1},
{"incr", incr_command, 2, 2},
{"replconf", replconf_command, 2, -1},
{"multi", multi_command, 1, 1},
{"exec", exec_command, 1, 1},
{"discard", discard_command, 1, 1},
{NULL, NULL, 0, 0} // sentinel to mark end of array
};

Expand Down Expand Up @@ -197,58 +202,102 @@ void free_tokens(char **tokens, int count) {

extern void track_command_change();

// this is where we figure out what the client wants to do
cmd_result execute_command(int client_sock, char *input, dict *db) {
int argc = 0;
char **argv = tokenize_command(input, &argc);
cmd_result result = CMD_UNKNOWN;

int argc = 0; // to count how many parts the command has
char *input_copy = strdup(input); // make a copy so strtok doesn't mess up the original
if (!input_copy) {
// oops, couldn't make a copy, something's wrong
if (client_sock >= 0) {
reply_error(client_sock, "err out of memory");
}
return CMD_ERR;
}

char **argv = tokenize_command(input_copy, &argc); // break the command into pieces
cmd_result result = CMD_UNKNOWN; // let's assume we don't know the command yet

// if we didn't get any pieces, or something went wrong tokenizing
if (!argv || argc == 0) {
if (argv) free_tokens(argv, argc);
reply_error(client_sock, "ERR empty command");
if (argv) free_tokens(argv, argc); // clean up if argv was allocated
free(input_copy); // clean up the copy
if (client_sock >= 0) {
reply_error(client_sock, "err empty command");
}
return CMD_ERR;
}

for (size_t i = 0; i < strlen(argv[0]); i++) {

// make the command name lowercase so "SET" and "set" are the same
for (size_t i = 0; argv[0][i]; i++) {
argv[0][i] = tolower(argv[0][i]);
}

// see if this client is in a transaction
client_t *client = client_sock >= 0 ? get_client_by_socket(client_sock) : NULL;

// is this a command that controls transactions, like multi, exec, or discard?
int is_tx_command = strcmp(argv[0], "multi") == 0 ||
strcmp(argv[0], "exec") == 0 ||
strcmp(argv[0], "discard") == 0;

// if we're in a transaction and this isn't a transaction control command, just queue it
if (client && client->in_transaction && !is_tx_command) {
// use the original input string for queuing to preserve quotes and stuff
if (tx_queue_command(client, input)) {
reply_string(client_sock, "queued");
} else {
reply_error(client_sock, "err queue command failed");
client->transaction_errors = 1; // mark that something went wrong
}
free_tokens(argv, argc); // clean up the pieces
free(input_copy); // clean up the copy
return CMD_OK; // we're done for now, it's queued
}

// okay, not queuing, let's find the actual command to run
for (int i = 0; commands[i].name != NULL; i++) {
if (strcmp(argv[0], commands[i].name) == 0) {
// check argument count
// found it! now check if they gave the right number of arguments
if ((commands[i].min_args > 0 && argc < commands[i].min_args) ||
(commands[i].max_args > 0 && argc > commands[i].max_args)) {
reply_error(client_sock, "ERR wrong number of arguments");
(commands[i].max_args > 0 && argc > commands[i].max_args && commands[i].max_args != -1)) { // -1 means any number of args is fine
if (client_sock >= 0) {
reply_error(client_sock, "err wrong number of arguments");
}
result = CMD_ERR;
} else {
// looks good, run the command's handler function
result = commands[i].handler(client_sock, argc, argv, db);
}
break;
break; // no need to check other commands
}
}


// if we went through all commands and didn't find it
if (result == CMD_UNKNOWN) {
reply_error(client_sock, "ERR unknown command");
result = CMD_ERR;
if (client_sock >= 0) { // only send error if it's a real client connection
reply_error(client_sock, "err unknown command");
}
result = CMD_ERR; // make sure we return an error status
}

// Add this after applying the commands but before free_tokens
// Propagate write commands to replicas
if (result == CMD_OK && server_repl.role == ROLE_PRIMARY && client_sock >= 0) {
// Commands that modify data
// if the command was okay, and we're the primary server, and it was a write command...
// then we need to tell our replicas about it
// but only if we're not in a transaction (exec will handle propagation for transactions)
if (result == CMD_OK && (!client || !client->in_transaction) && server_repl.role == ROLE_PRIMARY && client_sock >= 0) {
if (strcasecmp(argv[0], "set") == 0 ||
strcasecmp(argv[0], "del") == 0 ||
strcasecmp(argv[0], "expire") == 0 ||
strcasecmp(argv[0], "incr") == 0) {

// Track for persistence
track_command_change();

// Propagate to replicas with proper formatting

track_command_change(); // for persistence, like auto-saving
// use the original input for replication to keep quotes and exact format
replication_feed_slaves(input, strlen(input));
}
}

free_tokens(argv, argc);
return result;
free_tokens(argv, argc); // always clean up the token pieces
free(input_copy); // and the copy we made
return result; // tell the caller how it went
}

// response formatters
Expand Down Expand Up @@ -624,6 +673,78 @@ cmd_result replconf_command(int client_sock, int argc, char **argv, dict *db) {
return CMD_OK;
}

// MULTI command - begin transaction
cmd_result multi_command(int client_sock, int argc, char **argv, dict *db) {
(void)argc;
(void)argv;
(void)db;

client_t *client = get_client_by_socket(client_sock);
if (!client) {
reply_error(client_sock, "ERR client not found");
return CMD_ERR;
}

if (client->in_transaction) {
reply_error(client_sock, "ERR MULTI calls can not be nested");
return CMD_ERR;
}

client->in_transaction = 1;
client->transaction_errors = 0;
reply_string(client_sock, "OK");
return CMD_OK;
}

// EXEC command - execute transaction
cmd_result exec_command(int client_sock, int argc, char **argv, dict *db) {
(void)argc;
(void)argv;

client_t *client = get_client_by_socket(client_sock);
if (!client) {
reply_error(client_sock, "ERR client not found");
return CMD_ERR;
}

if (!client->in_transaction) {
reply_error(client_sock, "ERR EXEC without MULTI");
return CMD_ERR;
}

printf("Executing transaction with %d commands\n", client->queue_size);
tx_execute_commands(client, db);

if (client->in_transaction != 0) {
printf("WARNING: Transaction state not properly reset, forcing to 0\n");
client->in_transaction = 0;
}

printf("Transaction execution complete, in_transaction=%d\n", client->in_transaction);
return CMD_OK;
}

// DISCARD command - discard transaction
cmd_result discard_command(int client_sock, int argc, char **argv, dict *db) {
(void)argc;
(void)argv;
(void)db;

client_t *client = get_client_by_socket(client_sock);
if (!client) {
reply_error(client_sock, "ERR client not found");
return CMD_ERR;
}

if (!client->in_transaction) {
reply_error(client_sock, "ERR DISCARD without MULTI");
return CMD_ERR;
}

tx_discard_commands(client);
return CMD_OK;
}




3 changes: 3 additions & 0 deletions src/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ cmd_result replicaof_command(int client_sock, int argc, char **argv, dict *db);
cmd_result role_command(int client_sock, int argc, char **argv, dict *db);
cmd_result incr_command(int client_sock, int argc, char **argv, dict *db);
cmd_result replconf_command(int client_sock, int argc, char **argv, dict *db);
cmd_result multi_command(int client_sock, int argc, char **argv, dict *db);
cmd_result exec_command(int client_sock, int argc, char **argv, dict *db);
cmd_result discard_command(int client_sock, int argc, char **argv, dict *db);

#endif /* COMMANDS_H */
9 changes: 9 additions & 0 deletions src/crimsoncache.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ typedef struct client {
struct sockaddr_in address;
char buffer[BUFFER_SIZE];
int buffer_pos;

int in_transaction; // flag to indicate if in MULTI state
int transaction_errors; // tracks if any errors occurred during MULTI
char **queued_commands;
int queue_size; // current size of queue
int queue_capacity; // allocated capacity of queue
} client_t;

// function prototypes
void handle_signal(int sig);
void *handle_client(void *arg);
void register_client(client_t *client);
void unregister_client(client_t *client);
client_t *get_client_by_socket(int socket);

// global dictionary
extern dict *server_db;
Expand Down
40 changes: 37 additions & 3 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,39 @@
#include "commands.h"
#include "persistence.h"
#include "replication.h"
#include "transaction.h"

// #define MAX_CLIENTS 1000
client_t *client_list[MAX_CLIENTS];

void register_client(client_t *client) {
for (int i = 0; i < MAX_CLIENTS; i++) {
if (client_list[i] == NULL) {
client_list[i] = client;
tx_init(client); // Initialize transaction state
break;
}
}
}

void unregister_client(client_t *client) {
for (int i = 0; i < MAX_CLIENTS; i++) {
if (client_list[i] == client) {
tx_cleanup(client); // Clean up transaction resources
client_list[i] = NULL;
break;
}
}
}

client_t *get_client_by_socket(int socket) {
for (int i = 0; i < MAX_CLIENTS; i++) {
if (client_list[i] && client_list[i]->socket == socket) {
return client_list[i];
}
}
return NULL;
}

// function prototypes for background threads
void *cleanup_expired_keys(void *arg);
Expand Down Expand Up @@ -88,9 +120,9 @@ void *auto_save_thread(void *arg) {

// thread to handle replication tasks
void *replication_thread(void *arg) {
(void)arg; // unused parameter
(void)arg;

char buffer[BUFFER_SIZE * 4]; // Larger buffer to handle multiple commands
char buffer[BUFFER_SIZE * 4]; // buffer for incoming data
int buffer_pos = 0;

while (server_running) {
Expand Down Expand Up @@ -190,6 +222,8 @@ void *handle_client(void *arg) {
printf("new client connected: %s:%d\n",
client_addr, ntohs(client->address.sin_port));

register_client(client);

while (server_running) {
int bytes_read = recv(client_sock, buffer, BUFFER_SIZE - 1, 0);
if (bytes_read <= 0) {
Expand All @@ -200,10 +234,10 @@ void *handle_client(void *arg) {

buffer[bytes_read] = '\0';

// process command using the command module
execute_command(client_sock, buffer, server_db);
}

unregister_client(client);
close(client_sock);
free(client);
return NULL;
Expand Down
1 change: 0 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ void sync_replica(int fd) {
if (written == cmd_len) {
synced_keys++;

// Small delay to allow command to be processed
struct timespec ts = {0, 10000000}; // 10ms
nanosleep(&ts, NULL);

Expand Down
Loading