From 847d446186a364f81dbd28cc8dcfeee05eb68630 Mon Sep 17 00:00:00 2001 From: Apostolof Date: Tue, 24 Dec 2019 01:55:07 +0200 Subject: [PATCH] Rewrite wip --- Makefile | 20 +--- lib/circ_buff.c | 88 +-------------- lib/circ_buff.h | 8 +- {src => lib}/helpers.c | 245 +++++++++++++++++++++++------------------ lib/helpers.h | 51 +++++++++ lib/message.c | 112 +++++++------------ lib/message.h | 35 +++--- lib/node.c | 120 +++++++++----------- lib/node.h | 50 +++------ lib/types.h | 10 ++ src/helpers.h | 59 ---------- src/zaqar.c | 101 +++++++++++++---- src/zaqar.h | 29 +---- 13 files changed, 410 insertions(+), 518 deletions(-) rename {src => lib}/helpers.c (52%) create mode 100644 lib/helpers.h create mode 100644 lib/types.h delete mode 100644 src/helpers.h diff --git a/Makefile b/Makefile index beeb262..16324e4 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,6 @@ -ifeq ($(OS),Windows_NT) - ifeq ($(shell uname -s),) # not in a bash-like shell - CLEANUP = del /F /Q - MKDIR = mkdir - else # in a bash-like shell, like msys - CLEANUP = rm -f - MKDIR = mkdir -p - endif - TARGET_EXTENSION=.exe -else - CLEANUP = rm -f - MKDIR = mkdir -p - TARGET_EXTENSION=out -endif +CLEANUP = rm -f +MKDIR = mkdir -p +TARGET_EXTENSION=out .PHONY: cleandep .PHONY: clean @@ -29,7 +18,8 @@ CROSSLINK = arm-linux-gnueabihf-gcc DEPEND = gcc -MM -MG -MF -CROSSFLAGS = -I. -I$(PATHS) -I$(PATHL) -D_POSIX_C_SOURCE +CROSSFLAGS = -I. -I$(PATHS) -I$(PATHL) -D_DEFAULT_SOURCE +# CROSSFLAGS = -I. -I$(PATHS) -I$(PATHL) -D_POSIX_C_SOURCE -D_DEFAULT_SOURCE CROSSFLAGS += -std=c99 CROSSFLAGS += -march=armv6 CROSSFLAGS += -mfloat-abi=hard diff --git a/lib/circ_buff.c b/lib/circ_buff.c index d48c13e..2c76f97 100644 --- a/lib/circ_buff.c +++ b/lib/circ_buff.c @@ -1,6 +1,5 @@ #include #include -#include #include #include @@ -94,7 +93,7 @@ size_t circ_buf_capacity(cbuf_handle_t cbuf) { void circ_buf_put(cbuf_handle_t cbuf, const message_handle_t *data) { assert(cbuf && cbuf->buffer); - // TODO: sort and check if new data actually older!! + message_free(cbuf->buffer[cbuf->head]); cbuf->buffer[cbuf->head] = *data; advance_pointer(cbuf); @@ -115,7 +114,7 @@ int circ_buf_get(cbuf_handle_t cbuf, message_handle_t *data) { return r; } -int circ_buf_read(cbuf_handle_t cbuf, size_t position, message_handle_t *data) { +int circ_buf_peek(cbuf_handle_t cbuf, size_t position, message_handle_t *data) { assert(cbuf && data && cbuf->buffer && (position < circ_buf_size(cbuf))); int r = -1; @@ -140,86 +139,3 @@ bool circ_buf_full(cbuf_handle_t cbuf) { return cbuf->full; } - -/*void circ_buf_mul_add(cbuf_handle_t cbuf, char **data, uint8_t size, - int (*compar)(const void *, const void *)) { - assert(cbuf && data && cbuf->buffer); - - qsort(data, size, sizeof(char*), compar); - - char *last_element = (char*) malloc(circ_buf_element_size(cbuf) * sizeof(char)); - for (uint8_t i = 0; i < size; ++i) { - circ_buf_read(cbuf, 0, last_element); - - if (compar(&data[i], &last_element) < 0) { - continue; - } - - circ_buf_put(cbuf, data[i]); - } - - free(last_element); - - int end_buffer_size = circ_buf_size(cbuf); - char **temp_array = (char **) malloc(end_buffer_size * sizeof(char *)); - for (uint8_t buff_el = 0; buff_el < end_buffer_size; ++buff_el) { - temp_array[buff_el] = (char *) malloc(circ_buf_element_size(cbuf) * sizeof(char)); - circ_buf_get(cbuf, temp_array[buff_el]); - } - - qsort(temp_array, end_buffer_size, sizeof(char*), compar); - - for (uint8_t i = 0; i < end_buffer_size; ++i) { - circ_buf_put(cbuf, temp_array[i]); - } - - for (uint8_t buff_el = 0; buff_el < end_buffer_size; ++buff_el) { - free(temp_array[buff_el]); - } - free(temp_array); -}*/ - - -/*int circ_buf_serialize(cbuf_handle_t cbuf, char **serialized) { - char *temp = (char*) malloc(circ_buf_element_size(cbuf) * sizeof(char)); - const char separator[2] = "\r"; - uint8_t char_sum = circ_buf_size(cbuf) - 1; - uint8_t i; - - for (i = 0; i < circ_buf_size(cbuf); ++i) { - circ_buf_read(cbuf, i, temp); - char_sum += strlen(temp); - } - - (*serialized) = (char*) malloc((char_sum + 1) * sizeof(char)); - strcpy((*serialized), ""); - - for (i = 0; i < circ_buf_size(cbuf) - 1; ++i) { - circ_buf_read(cbuf, i, temp); - strcat((*serialized), temp); - strcat((*serialized), separator); - } - - circ_buf_read(cbuf, i, temp); - strcat((*serialized), temp); - - free(temp); - - return strlen((*serialized)); -} - -int circ_buf_deserialize(cbuf_handle_t cbuf, const char *serialized) { - char *str = calloc(strlen(serialized) + 1, sizeof(char)); - strcpy(str, serialized); - const char separator[2] = "\r"; - char *token; - - token = strtok(str, separator); - - while (token != NULL) { - circ_buf_put(cbuf, token); - token = strtok(NULL, separator); - } - - return circ_buf_size(cbuf); -}*/ \ No newline at end of file diff --git a/lib/circ_buff.h b/lib/circ_buff.h index 723c04e..71212fb 100644 --- a/lib/circ_buff.h +++ b/lib/circ_buff.h @@ -41,7 +41,7 @@ void circ_buf_put(cbuf_handle_t cbuf, const message_handle_t *data); int circ_buf_get(cbuf_handle_t cbuf, message_handle_t *data); // Reads a value from the buffer. Does NOT retrieve, size is not reduced! -int circ_buf_read(cbuf_handle_t cbuf, size_t position, message_handle_t *data); +int circ_buf_peek(cbuf_handle_t cbuf, size_t position, message_handle_t *data); // Checks if the buffer is empty. bool circ_buf_empty(cbuf_handle_t cbuf); @@ -55,10 +55,4 @@ size_t circ_buf_capacity(cbuf_handle_t cbuf); // Returns the number of elements stored in the buffer. size_t circ_buf_size(cbuf_handle_t cbuf); -// Serializes the whole buffer to a single string -// int circ_buf_serialize(cbuf_handle_t cbuf, char **serialized); - -// De-serializes a string to a buffer -// int circ_buf_deserialize(cbuf_handle_t cbuf, const char *serialized); - #endif //CIRC_BUFF_H_ diff --git a/src/helpers.c b/lib/helpers.c similarity index 52% rename from src/helpers.c rename to lib/helpers.c index 46f9bba..177079e 100644 --- a/src/helpers.c +++ b/lib/helpers.c @@ -1,65 +1,37 @@ -#include "helpers.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include -/* - * Function based on this example: - * http://man7.org/linux/man-pages/man3/getifaddrs.3.html#EXAMPLE -*/ -int get_own_id(void) { - int id = -1; - struct ifaddrs *ifaddr, *ifa; - int family, s, n; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) == -1) { - perror("Couldn't get network interfaces."); - exit(EXIT_FAILURE); - } - - // Walks through linked list, maintaining head pointer so we can free list later - for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { - if (ifa->ifa_addr == NULL) { - continue; - } - - family = ifa->ifa_addr->sa_family; - - // Gets the address of an AF_INET* interface address - if (family == AF_INET || family == AF_INET6) { - s = getnameinfo(ifa->ifa_addr, - (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), - host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); - if (s != 0) { - printf("getnameinfo() failed: %s\n", gai_strerror(s)); - exit(EXIT_FAILURE); - } - - id = extract_id_from_ip(host); - if (id < 0) { - continue; - } - - break; - } - } - - freeifaddrs(ifaddr); +#include "helpers.h" - return(id); -} +// Private Functions -char get_ip_from_sockaddr(sockaddr_in addr) { +char *get_ip_from_sockaddr(address_t addr) { // Beware: The inet_ntoa() function returns a string in a statically allocated buffer, which // subsequent calls will overwrite! return inet_ntoa(addr.sin_addr); // TODO: maybe change to this instead: - // #define MAKE_FN_NAME(x) void Callback_ ## x (void) + // #define MAKE_FN_NAME(x) void Callback_ ## x (void) // #define FUNCTION_NAME(signal) MAKE_FN_NAME(signal) } -int extract_id_from_ip(const char *ip) { +// APIs + +node_id_t extract_id_from_ip(const ipv4_t ip) { + assert(ip); + const char separator[2] = "."; - int id = 0; + node_id_t id = 0; char *rest, *token, *ip_cp; ip_cp = malloc(strlen(ip) * sizeof(char)); @@ -68,30 +40,34 @@ int extract_id_from_ip(const char *ip) { rest = ip_cp; token = strtok_r(rest, separator, &rest); if (!token || atoi(token) != 10) { - return -1; + return 0; } token = strtok_r(rest, separator, &rest); if (!token || atoi(token) != 0) { - return -1; + return 0; } token = strtok_r(rest, separator, &rest); if (!token) { - return -1; + return 0; } id = atoi(token) * 100; token = strtok_r(rest, separator, &rest); if (!token) { - return -1; + return 0; } id += atoi(token); + free(ip_cp); + return id; } void set_timer_and_handler(void (*handler)(int), long int wakeup_time) { + assert(handler); + struct itimerval timer; struct sigaction signal_action; @@ -116,27 +92,30 @@ void set_timer_and_handler(void (*handler)(int), long int wakeup_time) { } } -void create_message(node_handle_t *neighbors, char *new_message, int own_id, - uint8_t num_neighbors, uint16_t max_message_length) { +uint16_t create_message(char *new_message, uint16_t max_message_length, node_handle_t *neighbors, + uint8_t num_neighbors, node_id_t own_id) { + assert(new_message && neighbors); node_handle_t random_node = neighbors[rand() % (num_neighbors)]; - int peer_id = extract_id_from_ip(get_ip_from_sockaddr(node_get_addr(random_node))); - if (peer_id < 0) { + node_id_t peer_id = node_get_id(random_node); + if (!peer_id) { perror("Couldn't extract peer's ID."); exit(EXIT_FAILURE); } snprintf(new_message, max_message_length, "%04d_%04d_%ld_%s", own_id, peer_id, time(NULL), "It's amazing... It's fantastic!"); + + return strlen(new_message); } /* * Function based on this example: * https://www.gnu.org/software/libc/manual/html_node/Inet-Example.html#Inet-Example */ -int create_socket_and_listen(uint16_t port, uint8_t backlog_size) { +int create_socket_and_listen(port_t port, uint8_t backlog_size) { int in_sock; - struct sockaddr_in own_name; + address_t own_name; // Creates the socket in_sock = socket(PF_INET, SOCK_STREAM, 0); @@ -164,10 +143,11 @@ int create_socket_and_listen(uint16_t port, uint8_t backlog_size) { return in_sock; } -// TODO: change soccaddr_in to node -// TODO: change message to struct -void send_message(struct sockaddr_in peer_name, const char *message) { +void send_message(node_handle_t peer, message_handle_t message) { + assert(peer && message); + int out_sock; + address_t peer_address = node_get_addr(peer); // Creates the socket out_sock = socket(PF_INET, SOCK_STREAM, 0); @@ -177,124 +157,179 @@ void send_message(struct sockaddr_in peer_name, const char *message) { } // Connects to the peer - if (connect(out_sock, (struct sockaddr *) &peer_name, sizeof(peer_name))) { - printf("Couldn't connect to peer : %d.\n", get_ip_from_sockaddr(peer_name)); - // TODO: add timestamp to node + if (connect(out_sock, (struct sockaddr *) &(peer_address), sizeof(node_get_addr(peer)))) { + printf("Couldn't connect to peer : %s.\n", get_ip_from_sockaddr(node_get_addr(peer))); + node_add_timestamp(peer, time(NULL), false); } else { // Sends data to the peer - write_to_peer(out_sock, message); - // TODO: add timestamp to node - // TODO: add sent_to to message + int ignored; + write_to_peer(out_sock, message_get(message, &ignored)); + node_add_timestamp(peer, time(NULL), true); + message_add_sent_to(message, node_get_id(peer)); } close(out_sock); } -// TODO: add neighbors array as arg -void accept_connection(int sock, struct sockaddr_in *peer_name, fd_set *active_fd_set) { +node_id_t accept_connection(int sock, address_t *peer_name, fd_set *active_fd_set, + int *listens_on) { + assert(peer_name && active_fd_set && listens_on); + size_t peer_name_size = sizeof((*peer_name)); int comm_socket = accept(sock, (struct sockaddr *) peer_name, &peer_name_size); if (comm_socket < 0) { perror("Couldn't accept the connection."); + // TODO maye be remove this? exit(EXIT_FAILURE); } + *(listens_on) = comm_socket; + + char *node_addr = get_ip_from_sockaddr(*peer_name); fprintf(stderr, "Connected to host %s, port %hd.\n", - get_ip_from_sockaddr(*peer_name), ntohs((*peer_name).sin_port)); - // TODO: add timestamp to node - // Maybe do that in zaqar (main) + node_addr, ntohs((*peer_name).sin_port)); + FD_SET(comm_socket, active_fd_set); + + return extract_id_from_ip(node_addr); +} + +uint16_t checkAddNode(node_handle_t **neighbors, uint16_t num_neighbors , + address_t peer_name, node_id_t node_id) { + assert(neighbors); + + bool node_exists = false; + + for (int i = 0; i < num_neighbors; ++i) { + if (node_get_id((*neighbors)[i]) == node_id) { + continue; + } + } + + if (!node_exists) { + node_handle_t *r_neighbors = realloc((*neighbors), + (num_neighbors + 1) * sizeof(node_handle_t)); + if (!r_neighbors) { + free(r_neighbors); + perror("Unable to reallocate memory for neighbor IP."); + exit(EXIT_FAILURE); + } + + (*neighbors) = r_neighbors; + (*neighbors)[num_neighbors] = node_init(peer_name, node_id); + + return num_neighbors + 1; + } + + return num_neighbors; } void write_to_peer(int file_desc, const char *message) { + assert(message); + int num_bytes = write(file_desc, message, strlen(message) + 1); if (num_bytes < 0) { perror("Couldn't write to peer."); + // TODO maye be remove this? exit(EXIT_FAILURE); } } -int read_from_peer(int file_des, uint16_t max_line) { +int read_from_peer(int file_des, uint16_t max_line, cbuf_handle_t *message_buffer, + node_handle_t **neighbors, uint16_t num_neighbors) { char buffer[max_line]; int num_bytes; num_bytes = read(file_des, buffer, sizeof(buffer)); if (num_bytes < 0) { perror("Couldn't read from peer."); + // TODO maye be remove this? exit(EXIT_FAILURE); - } else if (num_bytes == 0) + } else if (num_bytes == 0) { // End-of-file return -1; - else { + } else { fprintf(stderr, "Got message: `%s'\n", buffer); - // TODO: create new message struct + + node_id_t received_from = -1; + for (int i = 0; i < num_neighbors; ++i) { + if (node_get_comm_socket((*neighbors)[i])) { + received_from = node_get_id((*neighbors)[i]); + node_set_comm_socket((*neighbors)[i], -1); + break; + } + } + + message_handle_t new_message = message_init(buffer, num_bytes, received_from); + circ_buf_put(*(message_buffer), &new_message); + return 0; } } -void fread_neighbors(char *AEM_file, node_handle_t **neighbors, uint16_t *num_neighbors, uint16_t port) { +uint16_t fread_neighbors(char *AEM_file, node_handle_t **neighbors, port_t port) { + assert(AEM_file && neighbors); + uint16_t num_neighbors = 0; + // Reads the ARP file checking for connected devices FILE *aems_file = fopen(AEM_file, "r"); if (!aems_file) { - perror("Neighbors file: Failed to open file \"" AEM_file "\""); + perror("Neighbors file: Failed to open file containing the AEMs."); exit(EXIT_FAILURE); } // Extracts IP addresses found in the file char ip_addr[AEM_BUFFER_LEN]; - char **neighbors_ips = (char **) malloc(sizeof(char *)); - if (!neighbors_ips) { - perror("Unable to allocate memory for neighbors array."); - exit(EXIT_FAILURE); - } - - int (*num_neighbors) = 0; + char **neighbors_ips; while (1 == fscanf(aems_file, AEM_LINE_FORMAT, ip_addr)) { - ++(*num_neighbors); - if ((*num_neighbors) > 1) { - char **r_neighbors_ips = realloc(neighbors_ips, (*num_neighbors) * sizeof(char *)); - if (!r_neighbors_ips) { - free(r_neighbors_ips); - perror("Unable to reallocate memory for neighbor IP."); - exit(EXIT_FAILURE); - } + ++num_neighbors; - neighbors_ips = r_neighbors_ips; + char **r_neighbors_ips = realloc(neighbors_ips, num_neighbors * sizeof(char *)); + if (!r_neighbors_ips) { + free(r_neighbors_ips); + perror("Unable to reallocate memory for neighbor IP."); + exit(EXIT_FAILURE); } - neighbors_ips[(*num_neighbors) - 1] = (char *) malloc(AEM_BUFFER_LEN * sizeof(char)); - strcpy(neighbors_ips[(*num_neighbors) - 1], ip_addr); + neighbors_ips = r_neighbors_ips; + + neighbors_ips[num_neighbors - 1] = (char *) malloc(AEM_BUFFER_LEN * sizeof(char)); + strcpy(neighbors_ips[num_neighbors - 1], ip_addr); } // Allocates memory for the new neighbors structs - (*neighbors) = (node_handle_t *) malloc((*num_neighbors) * sizeof(node_handle_t)); + (*neighbors) = (node_handle_t *) malloc(num_neighbors * sizeof(node_handle_t)); if (!neighbors_ips) { perror("Unable to allocate memory for nodes."); exit(EXIT_FAILURE); } // Creates the neighbors structs - for (uint8_t i = 0; i < (*num_neighbors); ++i) { - struct sockaddr_in peer_name; + for (uint8_t i = 0; i < num_neighbors; ++i) { + address_t peer_name; init_sockaddr(&peer_name, neighbors_ips[i], port); - (*neighbors)[(*num_neighbors)++] = node_init(peer_name); + (*neighbors)[num_neighbors++] = node_init(peer_name, + extract_id_from_ip(get_ip_from_sockaddr(peer_name))); } fclose(aems_file); + + return num_neighbors; } /* * Function based on this example: * https://www.gnu.org/software/libc/manual/html_node/Inet-Example.html#Inet-Example */ -void init_sockaddr(struct sockaddr_in *peer_name, const char *ipv4, uint16_t port) { +void init_sockaddr(address_t *peer_name, const ipv4_t hostname, port_t port) { + assert(peer_name && hostname); struct hostent *hostinfo; peer_name->sin_family = AF_INET; peer_name->sin_port = htons(port); - hostinfo = gethostbyname(ipv4); + hostinfo = gethostbyname(hostname); if (hostinfo == NULL) { - fprintf(stderr, "Unknown host %s.\n", ipv4); + fprintf(stderr, "Unknown host %s.\n", hostname); exit(EXIT_FAILURE); } diff --git a/lib/helpers.h b/lib/helpers.h new file mode 100644 index 0000000..a6635e0 --- /dev/null +++ b/lib/helpers.h @@ -0,0 +1,51 @@ +#ifndef HELPERS_H_ +#define HELPERS_H_ + +#define _GNU_SOURCE + +#include + +#include "types.h" +#include "node.h" +#include "message.h" +#include "circ_buff.h" + +// Macros to turn a numeric macro into a string literal +#define xstr(s) str(s) +#define str(s) #s + +// AEMs file related definitions +#define AEM_ID_LEN 4 +#define AEM_BUFFER_LEN 5 +// Format for fscanf() to read the AEM numbers +#define AEM_LINE_FORMAT "%" xstr(AEM_ID_LEN) "s\n" + +node_id_t extract_id_from_ip(const ipv4_t ip); + +void set_timer_and_handler(void (*handler)(int), long int timer_interval); + +uint16_t create_message(char *new_message, uint16_t max_message_length, + node_handle_t *neighbors, uint8_t num_neighbors, node_id_t own_id); + +int create_socket_and_listen(port_t port, uint8_t backlog_size); + +void send_message(node_handle_t peer, message_handle_t message); + +node_id_t accept_connection(int sock, address_t *client_name, + fd_set *active_fd_set, int *listens_on); + +uint16_t checkAddNode(node_handle_t **neighbors, uint16_t num_neighbors , + address_t peer_name, node_id_t node_id); + +void write_to_peer(int filedes, const char *message); + +int read_from_peer(int file_des, uint16_t max_line, + cbuf_handle_t *message_buffer, node_handle_t **neighbors, + uint16_t num_neighbors); + +uint16_t fread_neighbors(char *AEM_file, node_handle_t **neighbors, + port_t port); + +void init_sockaddr(address_t *peer_name, const ipv4_t hostname, port_t port); + +#endif //HELPERS_H_ diff --git a/lib/message.c b/lib/message.c index 98c45e3..ad3f979 100644 --- a/lib/message.c +++ b/lib/message.c @@ -5,45 +5,30 @@ #include "message.h" -/* Private functions */ - // Defines the node structure struct message_t { char *message; - uint16_t received_from, *sent_to; - observer_func *observers; - int message_length, num_sent_to, num_observers; + node_id_t received_from, *sent_to; + uint16_t message_length, num_sent_to; }; -unit_static void message_state_changed(message_handle_t message_handle) { - assert(message_handle); - - for (uint16_t i = 0; i < message_handle->num_observers; ++i) { - (message_handle->observers[i])(); - } -} - /* API */ -message_handle_t message_init(const char *message, int message_length, uint16_t received_from) { - //assert(addr); - assert(message_length <= MAX_MESSAGE_LENGTH); +message_handle_t message_init(const char *message, uint16_t length, node_id_t received_from) { + assert(message && length <= MAX_MESSAGE_LENGTH); message_handle_t message_handle = malloc(sizeof(message_t)); assert(message_handle); - - message_handle->message = malloc((message_length + 1) * sizeof(char)); + + message_handle->message = malloc((length + 1) * sizeof(char)); strcpy(message_handle->message, message); - message_handle->message_length = message_length; + message_handle->message_length = length; message_handle->received_from = received_from; message_handle->sent_to = NULL; message_handle->num_sent_to = 0; - message_handle->observers = NULL; - message_handle->num_observers = 0; - return message_handle; } @@ -51,82 +36,59 @@ void message_free(message_handle_t message_handle) { assert(message_handle); free(message_handle->sent_to); - free(message_handle->observers); free(message_handle->message); free(message_handle); } -void message_add_sent_to(message_handle_t message_handle, uint16_t sent_to) { +void message_add_sent_to(message_handle_t message_handle, node_id_t sent_to) { assert(message_handle); - if (message_handle->sent_to == NULL) { - message_handle->sent_to = (uint16_t *) malloc(sizeof(uint16_t)); - if (!message_handle->sent_to) { - perror("Unable to allocate memory for message receivers."); - exit(EXIT_FAILURE); - } - ++(message_handle->num_sent_to); - } else { - uint16_t *r_sent_to = realloc(message_handle->sent_to, ++(message_handle->num_sent_to) * sizeof(uint16_t)); - if (!r_sent_to) { - free(r_sent_to); - perror("Unable to reallocate memory for message receivers."); - exit(EXIT_FAILURE); - } - - message_handle->sent_to = r_sent_to; + // Expands array by re-allocation space + node_id_t *r_sent_to = realloc(message_handle->sent_to, ++(message_handle->num_sent_to) * sizeof(node_id_t)); + if (!r_sent_to) { + free(r_sent_to); + perror("Unable to reallocate memory for message receivers."); + exit(EXIT_FAILURE); } + message_handle->sent_to = r_sent_to; + // ++(message_handle->num_sent_to); + message_handle->sent_to[message_handle->num_sent_to - 1] = sent_to; - message_state_changed(message_handle); } -char *message_get(message_handle_t message_handle, int *message_length) { - assert(message_handle); +char *message_get(message_handle_t message_handle, int *length) { + assert(message_handle && length); - char *message = (char *) malloc((message_handle->message_length + 1) * sizeof(char)); - if (!message) { - perror("Unable to allocate memory for message return."); - exit(EXIT_FAILURE); + static char *message_buffer = NULL; + static int message_buffer_len = 0; + + if (message_buffer_len != message_handle->message_length + 1) { + char *r_message_buffer = (char *) realloc(message_buffer, (message_handle->message_length + 1) * sizeof(char)); + if (!r_message_buffer) { + free(r_message_buffer); + perror("Unable to reallocate memory for message receivers."); + exit(EXIT_FAILURE); + } + + message_buffer = r_message_buffer; + message_buffer_len = message_handle->message_length + 1; } - strcpy(message, message_handle->message); - (*message_length) = message_handle->message_length; + strcpy(message_buffer, message_handle->message); + (*length) = message_handle->message_length; - return message; + return message_buffer; } -bool message_sent_to(message_handle_t message_handle, uint16_t node) { +bool message_sent_to(message_handle_t message_handle, node_id_t id) { assert(message_handle); for (uint16_t i = 0; i < message_handle->num_sent_to; ++i) { - if (message_handle->sent_to[i] == node) { + if (message_handle->sent_to[i] == id) { return true; } } return false; } - -void message_attach_observer(message_handle_t message_handle, observer_func observer) { - assert(message_handle); - - if (message_handle->observers == NULL) { - message_handle->observers = (observer_func *) malloc(sizeof(observer_func)); - if (!message_handle->observers) { - perror("Unable to allocate memory for message observers."); - exit(EXIT_FAILURE); - } - ++(message_handle->num_observers); - } else { - uint16_t *r_observers = realloc(message_handle->observers, - ++(message_handle->num_observers) * sizeof(observer_func)); - if (!r_observers) { - free(r_observers); - perror("Unable to reallocate memory for message observers."); - exit(EXIT_FAILURE); - } - } - - message_handle->observers[message_handle->num_observers - 1] = observer; -} \ No newline at end of file diff --git a/lib/message.h b/lib/message.h index c478b2f..e6f3036 100644 --- a/lib/message.h +++ b/lib/message.h @@ -4,6 +4,16 @@ #include #include +#include "types.h" + +// max_message_length = sender_id 4 chars +// + underscore 1 char +// + receiver_id 4 chars +// + underscore 1 char +// + time-stamp 10 chars +// + underscore 1 char +// + message_text 256 chars (max) +// = 277 chars #define MAX_MESSAGE_LENGTH 277 // Message structure @@ -11,34 +21,21 @@ typedef struct message_t message_t; // and handle type typedef message_t *message_handle_t; -typedef void (*observer_func)(void); - -#ifdef TEST //This is a test build -// Makes private functions reachable by the tester -#define unit_static -// Calls all observer functions attached to this message. -unit_static void message_state_changed(message_handle_t message_handle); -#else -#define unit_static static -#endif - // Initializes a message structure and returns the message handle. User must provide the length of // the message without the null termination character (strlen); -message_handle_t message_init(const char *message, int message_length, uint16_t received_from); +message_handle_t message_init(const char *message, uint16_t length, node_id_t received_from); // Frees a message structure. void message_free(message_handle_t message_handle); // Adds an new receiver to the message. -void message_add_sent_to(message_handle_t message_handle, uint16_t sent_to); +void message_add_sent_to(message_handle_t message_handle, node_id_t sent_to); -// Returns the message. +// Returns the message. This functions returns a string in a statically allocated buffer, which +// subsequent calls will overwrite! char *message_get(message_handle_t message_handle, int *message_length); -// Returns true if the message has been previously sent to this node, false otherwise. -bool message_sent_to(message_handle_t message_handle, uint16_t node); - -// Attaches an observer function to the message. -void message_attach_observer(message_handle_t message_handle, observer_func observer); +// Returns true if the message has been previously sent to this ip, false otherwise. +bool message_sent_to(message_handle_t message_handle, node_id_t id); #endif //MESSAGE_H_ diff --git a/lib/node.c b/lib/node.c index 6d9bf19..4f9e1a7 100644 --- a/lib/node.c +++ b/lib/node.c @@ -1,34 +1,34 @@ #include #include +#include #include #include +#include #include "node.h" // Defines the node structure struct node_t { - struct sockaddr_in addr; - uint64_t **events; - uint64_t appearance_duration; - uint8_t events_size; - node_status _node_status; + address_t addr; + node_id_t id; + timestamp_t *events_timestamps; + node_status *events_status; + uint16_t events_size; + int comm_socket; }; -node_handle_t node_init(struct sockaddr_in addr) { - //assert(addr); - +node_handle_t node_init(address_t addr, node_id_t id) { node_handle_t node = malloc(sizeof(node_t)); assert(node); node->addr = addr; - node->events_size = 0; - node->events = (uint64_t **) malloc(2 * sizeof(uint64_t)); - node->events[0] = (uint64_t *) malloc(sizeof(uint64_t)); - node->events[1] = (uint64_t *) malloc(sizeof(uint64_t)); - node->events[0][0] = 0; - node->events[1][0] = 0; - node->appearance_duration = 0; - node->_node_status = NODE_INITIALIAZED; + node->id = id; + node->events_timestamps = (timestamp_t *) malloc(sizeof(timestamp_t)); + node->events_timestamps[0] = time(NULL); + node->events_status = (node_status *) malloc(sizeof(node_status)); + node->events_status[0] = NODE_INITIALIAZED; + node->events_size = 1; + node->comm_socket = -1; return node; } @@ -36,89 +36,67 @@ node_handle_t node_init(struct sockaddr_in addr) { void node_free(node_handle_t node) { assert(node); - free(node->events[0]); - free(node->events[1]); - free(node->events); + free(node->events_timestamps); + free(node->events_status); free(node); } -void node_add_timestamp(node_handle_t node, time_t timestamp, bool visible) { +void node_add_timestamp(node_handle_t node, timestamp_t timestamp, bool visible) { assert(node && timestamp); - if ((visible && !node->events[1][node->events_size - 1]) || - (!visible && node->events[1][node->events_size - 1])) { - return; + int *realloc_r = realloc(node->events_timestamps, node->events_size + 1); + if (!realloc_r) { + node_free(node); + perror("Error trying to reallocate memory for event timestamps."); + exit(EXIT_FAILURE); } - if (visible) { - int *realloc_r = realloc(node->events[0], node->events_size + 1); - if (!realloc_r) { - node_free(node); - perror("Error trying to reallocate memory for event timestamps!"); - exit(EXIT_FAILURE); - } - realloc_r = realloc(node->events[1], node->events_size + 1); - if (!realloc_r) { - node_free(node); - perror("Error trying to reallocate memory for event timestamps!"); - exit(EXIT_FAILURE); - } - - node->events[0][node->events_size] = timestamp; - node->events[1][node->events_size] = 0; - node->_node_status = NODE_PRESENT; - ++node->events_size; - } else { - node->events[1][node->events_size - 1] = timestamp; - node->_node_status = NODE_GONE; - node->appearance_duration += node->events[1][node->events_size - 1] - - node->events[0][node->events_size - 1]; + realloc_r = realloc(node->events_status, node->events_size + 1); + if (!realloc_r) { + node_free(node); + perror("Error trying to reallocate memory for event statuses."); + exit(EXIT_FAILURE); } + + node->events_timestamps[node->events_size] = timestamp; + node->events_status[node->events_size] = visible ? NODE_PRESENT : NODE_GONE; + ++node->events_size; } -struct sockaddr_in node_get_addr(node_handle_t node) { +address_t node_get_addr(node_handle_t node) { assert(node); return node->addr; } -enum node_status node_get_status(node_handle_t node) { +node_id_t node_get_id(node_handle_t node) { assert(node); - return node->_node_status; + return node->id; } -uint8_t node_get_latest_appearance_duration(node_handle_t node) { +node_status node_get_status(node_handle_t node) { assert(node); - if (node->_node_status == NODE_INITIALIAZED) { - return 0; - } else if (node->events[1][node->events_size - 1] == 0) { - return (uint64_t)time(NULL) - node->events[0][node->events_size - 1]; - } else { - return node->events[1][node->events_size - 1] - node->events[0][node->events_size - 1]; - } + return node->events_status[node->events_size]; } -uint8_t node_get_total_appearance_duration(node_handle_t node) { - assert(node); - - return node->appearance_duration; +int node_get_comm_socket(node_handle_t node) { + return node->comm_socket; } -uint8_t node_get_event_table(node_handle_t node, time_t ***event_table) { - assert(node && event_table); +void node_set_comm_socket(node_handle_t node, int comm_socket) { + node->comm_socket = comm_socket; +} - if (node->events_size < 1) { - return 0; - } +uint16_t node_get_events(node_handle_t node, timestamp_t **events_timestamps, node_status **events_status) { + assert(node && events_timestamps && events_status); - (*event_table) = (time_t **) malloc(2 * sizeof(time_t *)); - (*event_table)[0] = (time_t *) malloc(node->events_size * sizeof(time_t)); - (*event_table)[1] = (time_t *) malloc(node->events_size * sizeof(time_t)); + (*events_timestamps) = (timestamp_t *) malloc(node->events_size * sizeof(timestamp_t)); + (*events_status) = (node_status *) malloc(node->events_size * sizeof(node_status)); - memcpy((*event_table)[0], node->events[0], node->events_size * sizeof(time_t)); - memcpy((*event_table)[1], node->events[1], node->events_size * sizeof(time_t)); + memcpy((*events_timestamps), node->events_timestamps, node->events_size * sizeof(timestamp_t)); + memcpy((*events_status), node->events_status, node->events_size * sizeof(node_status)); return node->events_size; } \ No newline at end of file diff --git a/lib/node.h b/lib/node.h index b506445..2a0acaf 100644 --- a/lib/node.h +++ b/lib/node.h @@ -2,8 +2,9 @@ #define NODE_H_ #include -#include -#include +#include + +#include "types.h" // Node structure typedef struct node_t node_t; @@ -12,48 +13,33 @@ typedef node_t *node_handle_t; typedef enum node_status { NODE_INITIALIAZED, NODE_PRESENT, NODE_GONE } node_status; -#ifdef TEST //This is a test build -// Makes private functions reachable by the tester -#define unit_static -#else -#define unit_static static -#endif - // Initializes a node structure and returns the node handle. -node_handle_t node_init(struct sockaddr_in addr); +node_handle_t node_init(address_t addr, node_id_t id); // Frees a node structure. void node_free(node_handle_t node); -// Adds an event timestamp to the node. Either the (re)appearance or the disappearance of the node. -void node_add_timestamp(node_handle_t node, time_t timestamp, bool visible); +// Adds an event timestamp to the node. +void node_add_timestamp(node_handle_t node, timestamp_t timestamp, bool visible); // Returns the address of the node -struct sockaddr_in node_get_addr(node_handle_t node); - -node_status node_get_status(node_handle_t node); +address_t node_get_addr(node_handle_t node); -//uint8_t node_get_appear_count(node_handle_t node); +// Returns the id of the node +node_id_t node_get_id(node_handle_t node); -//uint8_t node_get_disappear_count(node_handle_t node); - -//void node_get_latest_appear(node_handle_t node, ); - -//void node_get_latest_disappear(node_handle_t node, ); - -// Returns the duration (in seconds) of the latest stretch of time that this node has been visible. -uint8_t node_get_latest_appearance_duration(node_handle_t node); +// Returns the latest node status. +node_status node_get_status(node_handle_t node); -// Returns the total duration (in seconds) of time that this node was visible. -uint8_t node_get_total_appearance_duration(node_handle_t node); +// Returns the current communication socket id of the node +int node_get_comm_socket(node_handle_t node); -// Returns the event timestamps table for this node. -uint8_t node_get_event_table(node_handle_t node, time_t ***event_table); +// Sets the current communication socket id of the node +void node_set_comm_socket(node_handle_t node, int comm_socket); -// Serializes the whole node to a single string -//int circ_buf_serialize(node_handle_t node, char **serialized); +// Returns the event tables for this node. +uint16_t node_get_events(node_handle_t node, timestamp_t **events_timestamps, node_status **events_status); -// De-serializes a string to a node -//int circ_buf_deserialize(node_handle_t node, const char *serialized); +//TODO node save thingy #endif //NODE_H_ diff --git a/lib/types.h b/lib/types.h new file mode 100644 index 0000000..d63ed8b --- /dev/null +++ b/lib/types.h @@ -0,0 +1,10 @@ +#ifndef CUSTOM_TYPES_H_ +#define CUSTOM_TYPES_H_ + +typedef uint16_t node_id_t; +typedef struct sockaddr_in address_t; +typedef uint64_t timestamp_t; +typedef char *ipv4_t; +typedef uint16_t port_t; + +#endif //CUSTOM_TYPES_H_ diff --git a/src/helpers.h b/src/helpers.h deleted file mode 100644 index 7181f9b..0000000 --- a/src/helpers.h +++ /dev/null @@ -1,59 +0,0 @@ -#ifndef HELPERS_H_ -#define HELPERS_H_ - -#define _GNU_SOURCE - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "node.h" - -// Macros to turn a numeric macro into a string literal -#define xstr(s) str(s) -#define str(s) #s - -// AEMs file related definitions -#define AEM_BUFFER_LEN 5 - -// Format for fscanf() to read the AEM numbers -#define ARP_LINE_FORMAT "%" xstr(ARP_STRING_LEN) "s\n" - - -int get_own_id(void); - -int extract_id_from_ip(const char *ip); - -void set_timer_and_handler(void (*handler)(int), long int timer_interval); - -void create_message(char *new_message, uint16_t max_message_length, node_handle_t *neighbors, - uint8_t num_neighbors, int own_id); - -int create_socket_and_listen(uint16_t port, uint8_t backlog_size); - -void send_message(struct sockaddr_in peer_name, const char *message); - -void accept_connection(int sock, struct sockaddr_in *client_name, fd_set *active_fd_set); - -void write_to_peer(int filedes, const char *message); - -int read_from_peer(int file_des, uint16_t max_line); - -void fread_neighbors(char *AEM_file, node_handle_t **neighbors, uint16_t *num_neighbors, uint16_t port); - -void init_sockaddr(struct sockaddr_in *name, const char *hostname, uint16_t port); - -#endif //HELPERS_H_ diff --git a/src/zaqar.c b/src/zaqar.c index 1358f39..166b224 100644 --- a/src/zaqar.c +++ b/src/zaqar.c @@ -1,3 +1,15 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "types.h" +#include "message.h" +#include "circ_buff.h" +#include "helpers.h" #include "zaqar.h" #define MESSAGE "You got mail!!! xo xo xo" @@ -8,22 +20,24 @@ int main(int argc, char **argv) { check_args(argc, argv); // Socket communication variables - int in_sock, own_id; + int in_sock; fd_set active_fd_set, read_fd_set; - struct sockaddr_in peer_name; + address_t peer_name; + node_id_t own_id = atoi(argv[1]); // Business logic variables node_handle_t *neighbors; uint16_t num_neighbors = 0; + message_handle_t *buffer; + cbuf_handle_t message_buffer; - own_id = get_own_id(); - if (own_id < 0) { - perror("Couldn't extract own ID."); - exit(EXIT_FAILURE); - } + bool exhaustive_send = false; + + buffer = (message_handle_t *) malloc(MESSAGE_BUFFER_SIZE * sizeof(message_handle_t)); + message_buffer = circ_buf_init(buffer, MESSAGE_BUFFER_SIZE); // Reads neighbors' IPs from file - void fread_neighbors(argv[1], &neighbors, &num_neighbors, PORT); + num_neighbors = fread_neighbors(argv[1], &neighbors, PORT); // Sets a timer and handler to produce an interrupt and send a message set_timer_and_handler(handle_alarm, MIN_TIMER_VAL + rand() % (MAX_TIMER_VAL - MIN_TIMER_VAL)); @@ -40,18 +54,37 @@ int main(int argc, char **argv) { while (1) { if (sigalrm_flag) { // It's time to send a message! - char new_message[MAX_MESSAGE_LENGTH]; - create_message(new_message, MAX_MESSAGE_LENGTH, neighbors, num_neighbors, own_id); + char new_message_payload[MAX_MESSAGE_LENGTH]; + uint16_t new_message_length = create_message(new_message_payload, MAX_MESSAGE_LENGTH, + neighbors, num_neighbors, own_id); - for (uint8_t i = 0; i < num_neighbors; ++i) { - if (node_get_status(neighbors[i]) == NODE_PRESENT) { - send_message(node_get_addr(neighbors[i]), new_message); - } + message_handle_t new_message = message_init(new_message_payload, new_message_length, own_id); + circ_buf_put(message_buffer, &new_message); + + for (uint16_t i = 0; i < num_neighbors; ++i) { + send_message(neighbors[i], new_message); } // Resets timer sigalrm_flag = false; set_timer_and_handler(handle_alarm, MIN_TIMER_VAL + rand() % (MAX_TIMER_VAL - MIN_TIMER_VAL)); + + // Checks for new neighbors/unsent messages while awake + exhaustive_send = true; + } + + if (exhaustive_send) { + for (uint16_t node_index = 0; node_index < num_neighbors; ++node_index) { + node_id_t current_node_id = node_get_id(neighbors[node_index]); + + for (uint message_index = 0; message_index < circ_buf_size(message_buffer); ++message_index) { + message_handle_t current_message; + + if (!circ_buf_peek(message_buffer, message_index, ¤t_message) && + !message_sent_to(current_message, current_node_id)) + send_message(neighbors[node_index], current_message); + } + } } // Shallow copies the readset @@ -68,33 +101,59 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } - // TODO: sent new messages to all neighbors - // TODO: if found new neighbor, sent all messages - // Services all the sockets with input pending for (int i = 0; i < FD_SETSIZE; ++i) { if (FD_ISSET(i, &read_fd_set)) { if (i == in_sock) { // Connection request on original socket - accept_connection(in_sock, &peer_name, &active_fd_set); + int comm_socket; + node_id_t peer_id = accept_connection(in_sock, &peer_name, &active_fd_set, + &comm_socket); + + // Updates neighbors array + num_neighbors = checkAddNode(&neighbors, num_neighbors , peer_name, peer_id); + if (comm_socket >= 0) { + for (int i = 0; i < num_neighbors; ++i) { + if(node_get_id(neighbors[i]) == peer_id) { + node_set_comm_socket(neighbors[i], comm_socket); + break; + } + } + } } else { // Data arriving on an already-connected socket - if (read_from_peer(i, MAX_MESSAGE_LENGTH) < 0) { + if (read_from_peer(i, MAX_MESSAGE_LENGTH + 1, &message_buffer, &neighbors, num_neighbors) < 0) { close(i); FD_CLR(i, &active_fd_set); + + // Checks for new neighbors/unsent messages while awake + exhaustive_send = true; } } } } } + //Frees memory + for (uint i = 0; i < circ_buf_size(message_buffer); ++i) { + message_handle_t current_message; + + circ_buf_get(message_buffer, ¤t_message); + message_free(current_message); + } + circ_buf_free(message_buffer); + for (int i = 0; i < num_neighbors; ++i) { + node_free(neighbors[i]); + } + free(buffer); + return 0; } void check_args(int argc, char **argv) { - if (argc < 2) { + if (argc != 3) { printf("Correct usage is:\n"); - printf("\t%s \n", argv[0]); + printf("\t%s \n", argv[0]); exit(EXIT_FAILURE); } } diff --git a/src/zaqar.h b/src/zaqar.h index b64fb50..bb62fc5 100644 --- a/src/zaqar.h +++ b/src/zaqar.h @@ -1,38 +1,11 @@ #ifndef ZAQAR_H_ #define ZAQAR_H_ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "helpers.h" -#include "message.h" -#include "circ_buff.h" - #define MIN_TIMER_VAL 60 #define MAX_TIMER_VAL 300 #define PORT 5000 -// max_message_length = sender_id 4 chars -// + underscore 1 char -// + receiver_id 4 chars -// + underscore 1 char -// + time-stamp 10 chars -// + underscore 1 char -// + message_text 256 chars (max) -// + null_terminator 1 char -// = 278 chars -#define MAX_MESSAGE_LENGTH 278 #define BACKLOG_SIZE 2 +#define MESSAGE_BUFFER_SIZE 2000 void check_args(int argc, char **argv);