Browse Source

Rewrite wip

rewrite
Apostolos Fanakis 5 years ago
parent
commit
847d446186
  1. 14
      Makefile
  2. 88
      lib/circ_buff.c
  3. 8
      lib/circ_buff.h
  4. 233
      lib/helpers.c
  5. 51
      lib/helpers.h
  6. 96
      lib/message.c
  7. 35
      lib/message.h
  8. 106
      lib/node.c
  9. 50
      lib/node.h
  10. 10
      lib/types.h
  11. 59
      src/helpers.h
  12. 101
      src/zaqar.c
  13. 29
      src/zaqar.h

14
Makefile

@ -1,17 +1,6 @@
ifeq ($(OS),Windows_NT)
ifeq ($(shell uname -s),) # not in a bash-like shell
CLEANUP = del /F /Q
MKDIR = mkdir
else # in a bash-like shell, like msys
CLEANUP = rm -f
MKDIR = mkdir -p
endif
TARGET_EXTENSION=.exe
else
CLEANUP = rm -f CLEANUP = rm -f
MKDIR = mkdir -p MKDIR = mkdir -p
TARGET_EXTENSION=out TARGET_EXTENSION=out
endif
.PHONY: cleandep .PHONY: cleandep
.PHONY: clean .PHONY: clean
@ -29,7 +18,8 @@ CROSSLINK = arm-linux-gnueabihf-gcc
DEPEND = gcc -MM -MG -MF DEPEND = gcc -MM -MG -MF
CROSSFLAGS = -I. -I$(PATHS) -I$(PATHL) -D_POSIX_C_SOURCE CROSSFLAGS = -I. -I$(PATHS) -I$(PATHL) -D_DEFAULT_SOURCE
# CROSSFLAGS = -I. -I$(PATHS) -I$(PATHL) -D_POSIX_C_SOURCE -D_DEFAULT_SOURCE
CROSSFLAGS += -std=c99 CROSSFLAGS += -std=c99
CROSSFLAGS += -march=armv6 CROSSFLAGS += -march=armv6
CROSSFLAGS += -mfloat-abi=hard CROSSFLAGS += -mfloat-abi=hard

88
lib/circ_buff.c

@ -1,6 +1,5 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h> #include <stdint.h>
#include <stddef.h>
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
@ -94,7 +93,7 @@ size_t circ_buf_capacity(cbuf_handle_t cbuf) {
void circ_buf_put(cbuf_handle_t cbuf, const message_handle_t *data) { void circ_buf_put(cbuf_handle_t cbuf, const message_handle_t *data) {
assert(cbuf && cbuf->buffer); assert(cbuf && cbuf->buffer);
// TODO: sort and check if new data actually older!! message_free(cbuf->buffer[cbuf->head]);
cbuf->buffer[cbuf->head] = *data; cbuf->buffer[cbuf->head] = *data;
advance_pointer(cbuf); advance_pointer(cbuf);
@ -115,7 +114,7 @@ int circ_buf_get(cbuf_handle_t cbuf, message_handle_t *data) {
return r; return r;
} }
int circ_buf_read(cbuf_handle_t cbuf, size_t position, message_handle_t *data) { int circ_buf_peek(cbuf_handle_t cbuf, size_t position, message_handle_t *data) {
assert(cbuf && data && cbuf->buffer && (position < circ_buf_size(cbuf))); assert(cbuf && data && cbuf->buffer && (position < circ_buf_size(cbuf)));
int r = -1; int r = -1;
@ -140,86 +139,3 @@ bool circ_buf_full(cbuf_handle_t cbuf) {
return cbuf->full; return cbuf->full;
} }
/*void circ_buf_mul_add(cbuf_handle_t cbuf, char **data, uint8_t size,
int (*compar)(const void *, const void *)) {
assert(cbuf && data && cbuf->buffer);
qsort(data, size, sizeof(char*), compar);
char *last_element = (char*) malloc(circ_buf_element_size(cbuf) * sizeof(char));
for (uint8_t i = 0; i < size; ++i) {
circ_buf_read(cbuf, 0, last_element);
if (compar(&data[i], &last_element) < 0) {
continue;
}
circ_buf_put(cbuf, data[i]);
}
free(last_element);
int end_buffer_size = circ_buf_size(cbuf);
char **temp_array = (char **) malloc(end_buffer_size * sizeof(char *));
for (uint8_t buff_el = 0; buff_el < end_buffer_size; ++buff_el) {
temp_array[buff_el] = (char *) malloc(circ_buf_element_size(cbuf) * sizeof(char));
circ_buf_get(cbuf, temp_array[buff_el]);
}
qsort(temp_array, end_buffer_size, sizeof(char*), compar);
for (uint8_t i = 0; i < end_buffer_size; ++i) {
circ_buf_put(cbuf, temp_array[i]);
}
for (uint8_t buff_el = 0; buff_el < end_buffer_size; ++buff_el) {
free(temp_array[buff_el]);
}
free(temp_array);
}*/
/*int circ_buf_serialize(cbuf_handle_t cbuf, char **serialized) {
char *temp = (char*) malloc(circ_buf_element_size(cbuf) * sizeof(char));
const char separator[2] = "\r";
uint8_t char_sum = circ_buf_size(cbuf) - 1;
uint8_t i;
for (i = 0; i < circ_buf_size(cbuf); ++i) {
circ_buf_read(cbuf, i, temp);
char_sum += strlen(temp);
}
(*serialized) = (char*) malloc((char_sum + 1) * sizeof(char));
strcpy((*serialized), "");
for (i = 0; i < circ_buf_size(cbuf) - 1; ++i) {
circ_buf_read(cbuf, i, temp);
strcat((*serialized), temp);
strcat((*serialized), separator);
}
circ_buf_read(cbuf, i, temp);
strcat((*serialized), temp);
free(temp);
return strlen((*serialized));
}
int circ_buf_deserialize(cbuf_handle_t cbuf, const char *serialized) {
char *str = calloc(strlen(serialized) + 1, sizeof(char));
strcpy(str, serialized);
const char separator[2] = "\r";
char *token;
token = strtok(str, separator);
while (token != NULL) {
circ_buf_put(cbuf, token);
token = strtok(NULL, separator);
}
return circ_buf_size(cbuf);
}*/

8
lib/circ_buff.h

@ -41,7 +41,7 @@ void circ_buf_put(cbuf_handle_t cbuf, const message_handle_t *data);
int circ_buf_get(cbuf_handle_t cbuf, message_handle_t *data); int circ_buf_get(cbuf_handle_t cbuf, message_handle_t *data);
// Reads a value from the buffer. Does NOT retrieve, size is not reduced! // Reads a value from the buffer. Does NOT retrieve, size is not reduced!
int circ_buf_read(cbuf_handle_t cbuf, size_t position, message_handle_t *data); int circ_buf_peek(cbuf_handle_t cbuf, size_t position, message_handle_t *data);
// Checks if the buffer is empty. // Checks if the buffer is empty.
bool circ_buf_empty(cbuf_handle_t cbuf); bool circ_buf_empty(cbuf_handle_t cbuf);
@ -55,10 +55,4 @@ size_t circ_buf_capacity(cbuf_handle_t cbuf);
// Returns the number of elements stored in the buffer. // Returns the number of elements stored in the buffer.
size_t circ_buf_size(cbuf_handle_t cbuf); size_t circ_buf_size(cbuf_handle_t cbuf);
// Serializes the whole buffer to a single string
// int circ_buf_serialize(cbuf_handle_t cbuf, char **serialized);
// De-serializes a string to a buffer
// int circ_buf_deserialize(cbuf_handle_t cbuf, const char *serialized);
#endif //CIRC_BUFF_H_ #endif //CIRC_BUFF_H_

233
src/helpers.c → lib/helpers.c

@ -1,53 +1,21 @@
#include "helpers.h" #include <unistd.h>
#include <ifaddrs.h>
/* #include <arpa/inet.h>
* Function based on this example: #include <signal.h>
* http://man7.org/linux/man-pages/man3/getifaddrs.3.html#EXAMPLE #include <stdio.h>
*/ #include <stdlib.h>
int get_own_id(void) { #include <string.h>
int id = -1; #include <sys/select.h>
struct ifaddrs *ifaddr, *ifa; #include <sys/time.h>
int family, s, n; #include <netdb.h>
char host[NI_MAXHOST]; #include <time.h>
#include <assert.h>
if (getifaddrs(&ifaddr) == -1) {
perror("Couldn't get network interfaces.");
exit(EXIT_FAILURE);
}
// Walks through linked list, maintaining head pointer so we can free list later
for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) {
if (ifa->ifa_addr == NULL) {
continue;
}
family = ifa->ifa_addr->sa_family;
// Gets the address of an AF_INET* interface address
if (family == AF_INET || family == AF_INET6) {
s = getnameinfo(ifa->ifa_addr,
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (s != 0) {
printf("getnameinfo() failed: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
}
id = extract_id_from_ip(host);
if (id < 0) {
continue;
}
break;
}
}
freeifaddrs(ifaddr); #include "helpers.h"
return(id); // Private Functions
}
char get_ip_from_sockaddr(sockaddr_in addr) { char *get_ip_from_sockaddr(address_t addr) {
// Beware: The inet_ntoa() function returns a string in a statically allocated buffer, which // Beware: The inet_ntoa() function returns a string in a statically allocated buffer, which
// subsequent calls will overwrite! // subsequent calls will overwrite!
return inet_ntoa(addr.sin_addr); return inet_ntoa(addr.sin_addr);
@ -57,9 +25,13 @@ char get_ip_from_sockaddr(sockaddr_in addr) {
// #define FUNCTION_NAME(signal) MAKE_FN_NAME(signal) // #define FUNCTION_NAME(signal) MAKE_FN_NAME(signal)
} }
int extract_id_from_ip(const char *ip) { // APIs
node_id_t extract_id_from_ip(const ipv4_t ip) {
assert(ip);
const char separator[2] = "."; const char separator[2] = ".";
int id = 0; node_id_t id = 0;
char *rest, *token, *ip_cp; char *rest, *token, *ip_cp;
ip_cp = malloc(strlen(ip) * sizeof(char)); ip_cp = malloc(strlen(ip) * sizeof(char));
@ -68,30 +40,34 @@ int extract_id_from_ip(const char *ip) {
rest = ip_cp; rest = ip_cp;
token = strtok_r(rest, separator, &rest); token = strtok_r(rest, separator, &rest);
if (!token || atoi(token) != 10) { if (!token || atoi(token) != 10) {
return -1; return 0;
} }
token = strtok_r(rest, separator, &rest); token = strtok_r(rest, separator, &rest);
if (!token || atoi(token) != 0) { if (!token || atoi(token) != 0) {
return -1; return 0;
} }
token = strtok_r(rest, separator, &rest); token = strtok_r(rest, separator, &rest);
if (!token) { if (!token) {
return -1; return 0;
} }
id = atoi(token) * 100; id = atoi(token) * 100;
token = strtok_r(rest, separator, &rest); token = strtok_r(rest, separator, &rest);
if (!token) { if (!token) {
return -1; return 0;
} }
id += atoi(token); id += atoi(token);
free(ip_cp);
return id; return id;
} }
void set_timer_and_handler(void (*handler)(int), long int wakeup_time) { void set_timer_and_handler(void (*handler)(int), long int wakeup_time) {
assert(handler);
struct itimerval timer; struct itimerval timer;
struct sigaction signal_action; struct sigaction signal_action;
@ -116,27 +92,30 @@ void set_timer_and_handler(void (*handler)(int), long int wakeup_time) {
} }
} }
void create_message(node_handle_t *neighbors, char *new_message, int own_id, uint16_t create_message(char *new_message, uint16_t max_message_length, node_handle_t *neighbors,
uint8_t num_neighbors, uint16_t max_message_length) { uint8_t num_neighbors, node_id_t own_id) {
assert(new_message && neighbors);
node_handle_t random_node = neighbors[rand() % (num_neighbors)]; node_handle_t random_node = neighbors[rand() % (num_neighbors)];
int peer_id = extract_id_from_ip(get_ip_from_sockaddr(node_get_addr(random_node))); node_id_t peer_id = node_get_id(random_node);
if (peer_id < 0) { if (!peer_id) {
perror("Couldn't extract peer's ID."); perror("Couldn't extract peer's ID.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
snprintf(new_message, max_message_length, "%04d_%04d_%ld_%s", own_id, peer_id, time(NULL), snprintf(new_message, max_message_length, "%04d_%04d_%ld_%s", own_id, peer_id, time(NULL),
"It's amazing... It's fantastic!"); "It's amazing... It's fantastic!");
return strlen(new_message);
} }
/* /*
* Function based on this example: * Function based on this example:
* https://www.gnu.org/software/libc/manual/html_node/Inet-Example.html#Inet-Example * https://www.gnu.org/software/libc/manual/html_node/Inet-Example.html#Inet-Example
*/ */
int create_socket_and_listen(uint16_t port, uint8_t backlog_size) { int create_socket_and_listen(port_t port, uint8_t backlog_size) {
int in_sock; int in_sock;
struct sockaddr_in own_name; address_t own_name;
// Creates the socket // Creates the socket
in_sock = socket(PF_INET, SOCK_STREAM, 0); in_sock = socket(PF_INET, SOCK_STREAM, 0);
@ -164,10 +143,11 @@ int create_socket_and_listen(uint16_t port, uint8_t backlog_size) {
return in_sock; return in_sock;
} }
// TODO: change soccaddr_in to node void send_message(node_handle_t peer, message_handle_t message) {
// TODO: change message to struct assert(peer && message);
void send_message(struct sockaddr_in peer_name, const char *message) {
int out_sock; int out_sock;
address_t peer_address = node_get_addr(peer);
// Creates the socket // Creates the socket
out_sock = socket(PF_INET, SOCK_STREAM, 0); out_sock = socket(PF_INET, SOCK_STREAM, 0);
@ -177,82 +157,134 @@ void send_message(struct sockaddr_in peer_name, const char *message) {
} }
// Connects to the peer // Connects to the peer
if (connect(out_sock, (struct sockaddr *) &peer_name, sizeof(peer_name))) { if (connect(out_sock, (struct sockaddr *) &(peer_address), sizeof(node_get_addr(peer)))) {
printf("Couldn't connect to peer : %d.\n", get_ip_from_sockaddr(peer_name)); printf("Couldn't connect to peer : %s.\n", get_ip_from_sockaddr(node_get_addr(peer)));
// TODO: add timestamp to node node_add_timestamp(peer, time(NULL), false);
} else { } else {
// Sends data to the peer // Sends data to the peer
write_to_peer(out_sock, message); int ignored;
// TODO: add timestamp to node write_to_peer(out_sock, message_get(message, &ignored));
// TODO: add sent_to to message node_add_timestamp(peer, time(NULL), true);
message_add_sent_to(message, node_get_id(peer));
} }
close(out_sock); close(out_sock);
} }
// TODO: add neighbors array as arg node_id_t accept_connection(int sock, address_t *peer_name, fd_set *active_fd_set,
void accept_connection(int sock, struct sockaddr_in *peer_name, fd_set *active_fd_set) { int *listens_on) {
assert(peer_name && active_fd_set && listens_on);
size_t peer_name_size = sizeof((*peer_name)); size_t peer_name_size = sizeof((*peer_name));
int comm_socket = accept(sock, (struct sockaddr *) peer_name, &peer_name_size); int comm_socket = accept(sock, (struct sockaddr *) peer_name, &peer_name_size);
if (comm_socket < 0) { if (comm_socket < 0) {
perror("Couldn't accept the connection."); perror("Couldn't accept the connection.");
// TODO maye be remove this?
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
*(listens_on) = comm_socket;
char *node_addr = get_ip_from_sockaddr(*peer_name);
fprintf(stderr, "Connected to host %s, port %hd.\n", fprintf(stderr, "Connected to host %s, port %hd.\n",
get_ip_from_sockaddr(*peer_name), ntohs((*peer_name).sin_port)); node_addr, ntohs((*peer_name).sin_port));
// TODO: add timestamp to node
// Maybe do that in zaqar (main)
FD_SET(comm_socket, active_fd_set); FD_SET(comm_socket, active_fd_set);
return extract_id_from_ip(node_addr);
}
uint16_t checkAddNode(node_handle_t **neighbors, uint16_t num_neighbors ,
address_t peer_name, node_id_t node_id) {
assert(neighbors);
bool node_exists = false;
for (int i = 0; i < num_neighbors; ++i) {
if (node_get_id((*neighbors)[i]) == node_id) {
continue;
}
}
if (!node_exists) {
node_handle_t *r_neighbors = realloc((*neighbors),
(num_neighbors + 1) * sizeof(node_handle_t));
if (!r_neighbors) {
free(r_neighbors);
perror("Unable to reallocate memory for neighbor IP.");
exit(EXIT_FAILURE);
}
(*neighbors) = r_neighbors;
(*neighbors)[num_neighbors] = node_init(peer_name, node_id);
return num_neighbors + 1;
}
return num_neighbors;
} }
void write_to_peer(int file_desc, const char *message) { void write_to_peer(int file_desc, const char *message) {
assert(message);
int num_bytes = write(file_desc, message, strlen(message) + 1); int num_bytes = write(file_desc, message, strlen(message) + 1);
if (num_bytes < 0) { if (num_bytes < 0) {
perror("Couldn't write to peer."); perror("Couldn't write to peer.");
// TODO maye be remove this?
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
int read_from_peer(int file_des, uint16_t max_line) { int read_from_peer(int file_des, uint16_t max_line, cbuf_handle_t *message_buffer,
node_handle_t **neighbors, uint16_t num_neighbors) {
char buffer[max_line]; char buffer[max_line];
int num_bytes; int num_bytes;
num_bytes = read(file_des, buffer, sizeof(buffer)); num_bytes = read(file_des, buffer, sizeof(buffer));
if (num_bytes < 0) { if (num_bytes < 0) {
perror("Couldn't read from peer."); perror("Couldn't read from peer.");
// TODO maye be remove this?
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} else if (num_bytes == 0) } else if (num_bytes == 0) {
// End-of-file // End-of-file
return -1; return -1;
else { } else {
fprintf(stderr, "Got message: `%s'\n", buffer); fprintf(stderr, "Got message: `%s'\n", buffer);
// TODO: create new message struct
node_id_t received_from = -1;
for (int i = 0; i < num_neighbors; ++i) {
if (node_get_comm_socket((*neighbors)[i])) {
received_from = node_get_id((*neighbors)[i]);
node_set_comm_socket((*neighbors)[i], -1);
break;
}
}
message_handle_t new_message = message_init(buffer, num_bytes, received_from);
circ_buf_put(*(message_buffer), &new_message);
return 0; return 0;
} }
} }
void fread_neighbors(char *AEM_file, node_handle_t **neighbors, uint16_t *num_neighbors, uint16_t port) { uint16_t fread_neighbors(char *AEM_file, node_handle_t **neighbors, port_t port) {
assert(AEM_file && neighbors);
uint16_t num_neighbors = 0;
// Reads the ARP file checking for connected devices // Reads the ARP file checking for connected devices
FILE *aems_file = fopen(AEM_file, "r"); FILE *aems_file = fopen(AEM_file, "r");
if (!aems_file) { if (!aems_file) {
perror("Neighbors file: Failed to open file \"" AEM_file "\""); perror("Neighbors file: Failed to open file containing the AEMs.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// Extracts IP addresses found in the file // Extracts IP addresses found in the file
char ip_addr[AEM_BUFFER_LEN]; char ip_addr[AEM_BUFFER_LEN];
char **neighbors_ips = (char **) malloc(sizeof(char *)); char **neighbors_ips;
if (!neighbors_ips) {
perror("Unable to allocate memory for neighbors array.");
exit(EXIT_FAILURE);
}
int (*num_neighbors) = 0;
while (1 == fscanf(aems_file, AEM_LINE_FORMAT, ip_addr)) { while (1 == fscanf(aems_file, AEM_LINE_FORMAT, ip_addr)) {
++(*num_neighbors); ++num_neighbors;
if ((*num_neighbors) > 1) {
char **r_neighbors_ips = realloc(neighbors_ips, (*num_neighbors) * sizeof(char *)); char **r_neighbors_ips = realloc(neighbors_ips, num_neighbors * sizeof(char *));
if (!r_neighbors_ips) { if (!r_neighbors_ips) {
free(r_neighbors_ips); free(r_neighbors_ips);
perror("Unable to reallocate memory for neighbor IP."); perror("Unable to reallocate memory for neighbor IP.");
@ -260,41 +292,44 @@ void fread_neighbors(char *AEM_file, node_handle_t **neighbors, uint16_t *num_ne
} }
neighbors_ips = r_neighbors_ips; neighbors_ips = r_neighbors_ips;
}
neighbors_ips[(*num_neighbors) - 1] = (char *) malloc(AEM_BUFFER_LEN * sizeof(char)); neighbors_ips[num_neighbors - 1] = (char *) malloc(AEM_BUFFER_LEN * sizeof(char));
strcpy(neighbors_ips[(*num_neighbors) - 1], ip_addr); strcpy(neighbors_ips[num_neighbors - 1], ip_addr);
} }
// Allocates memory for the new neighbors structs // Allocates memory for the new neighbors structs
(*neighbors) = (node_handle_t *) malloc((*num_neighbors) * sizeof(node_handle_t)); (*neighbors) = (node_handle_t *) malloc(num_neighbors * sizeof(node_handle_t));
if (!neighbors_ips) { if (!neighbors_ips) {
perror("Unable to allocate memory for nodes."); perror("Unable to allocate memory for nodes.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// Creates the neighbors structs // Creates the neighbors structs
for (uint8_t i = 0; i < (*num_neighbors); ++i) { for (uint8_t i = 0; i < num_neighbors; ++i) {
struct sockaddr_in peer_name; address_t peer_name;
init_sockaddr(&peer_name, neighbors_ips[i], port); init_sockaddr(&peer_name, neighbors_ips[i], port);
(*neighbors)[(*num_neighbors)++] = node_init(peer_name); (*neighbors)[num_neighbors++] = node_init(peer_name,
extract_id_from_ip(get_ip_from_sockaddr(peer_name)));
} }
fclose(aems_file); fclose(aems_file);
return num_neighbors;
} }
/* /*
* Function based on this example: * Function based on this example:
* https://www.gnu.org/software/libc/manual/html_node/Inet-Example.html#Inet-Example * https://www.gnu.org/software/libc/manual/html_node/Inet-Example.html#Inet-Example
*/ */
void init_sockaddr(struct sockaddr_in *peer_name, const char *ipv4, uint16_t port) { void init_sockaddr(address_t *peer_name, const ipv4_t hostname, port_t port) {
assert(peer_name && hostname);
struct hostent *hostinfo; struct hostent *hostinfo;
peer_name->sin_family = AF_INET; peer_name->sin_family = AF_INET;
peer_name->sin_port = htons(port); peer_name->sin_port = htons(port);
hostinfo = gethostbyname(ipv4); hostinfo = gethostbyname(hostname);
if (hostinfo == NULL) { if (hostinfo == NULL) {
fprintf(stderr, "Unknown host %s.\n", ipv4); fprintf(stderr, "Unknown host %s.\n", hostname);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }

51
lib/helpers.h

@ -0,0 +1,51 @@
#ifndef HELPERS_H_
#define HELPERS_H_
#define _GNU_SOURCE
#include <stdint.h>
#include "types.h"
#include "node.h"
#include "message.h"
#include "circ_buff.h"
// Macros to turn a numeric macro into a string literal
#define xstr(s) str(s)
#define str(s) #s
// AEMs file related definitions
#define AEM_ID_LEN 4
#define AEM_BUFFER_LEN 5
// Format for fscanf() to read the AEM numbers
#define AEM_LINE_FORMAT "%" xstr(AEM_ID_LEN) "s\n"
node_id_t extract_id_from_ip(const ipv4_t ip);
void set_timer_and_handler(void (*handler)(int), long int timer_interval);
uint16_t create_message(char *new_message, uint16_t max_message_length,
node_handle_t *neighbors, uint8_t num_neighbors, node_id_t own_id);
int create_socket_and_listen(port_t port, uint8_t backlog_size);
void send_message(node_handle_t peer, message_handle_t message);
node_id_t accept_connection(int sock, address_t *client_name,
fd_set *active_fd_set, int *listens_on);
uint16_t checkAddNode(node_handle_t **neighbors, uint16_t num_neighbors ,
address_t peer_name, node_id_t node_id);
void write_to_peer(int filedes, const char *message);
int read_from_peer(int file_des, uint16_t max_line,
cbuf_handle_t *message_buffer, node_handle_t **neighbors,
uint16_t num_neighbors);
uint16_t fread_neighbors(char *AEM_file, node_handle_t **neighbors,
port_t port);
void init_sockaddr(address_t *peer_name, const ipv4_t hostname, port_t port);
#endif //HELPERS_H_

96
lib/message.c

@ -5,45 +5,30 @@
#include "message.h" #include "message.h"
/* Private functions */
// Defines the node structure // Defines the node structure
struct message_t { struct message_t {
char *message; char *message;
uint16_t received_from, *sent_to; node_id_t received_from, *sent_to;
observer_func *observers; uint16_t message_length, num_sent_to;
int message_length, num_sent_to, num_observers;
}; };
unit_static void message_state_changed(message_handle_t message_handle) {
assert(message_handle);
for (uint16_t i = 0; i < message_handle->num_observers; ++i) {
(message_handle->observers[i])();
}
}
/* API */ /* API */
message_handle_t message_init(const char *message, int message_length, uint16_t received_from) { message_handle_t message_init(const char *message, uint16_t length, node_id_t received_from) {
//assert(addr); assert(message && length <= MAX_MESSAGE_LENGTH);
assert(message_length <= MAX_MESSAGE_LENGTH);
message_handle_t message_handle = malloc(sizeof(message_t)); message_handle_t message_handle = malloc(sizeof(message_t));
assert(message_handle); assert(message_handle);
message_handle->message = malloc((message_length + 1) * sizeof(char)); message_handle->message = malloc((length + 1) * sizeof(char));
strcpy(message_handle->message, message); strcpy(message_handle->message, message);
message_handle->message_length = message_length; message_handle->message_length = length;
message_handle->received_from = received_from; message_handle->received_from = received_from;
message_handle->sent_to = NULL; message_handle->sent_to = NULL;
message_handle->num_sent_to = 0; message_handle->num_sent_to = 0;
message_handle->observers = NULL;
message_handle->num_observers = 0;
return message_handle; return message_handle;
} }
@ -51,23 +36,15 @@ void message_free(message_handle_t message_handle) {
assert(message_handle); assert(message_handle);
free(message_handle->sent_to); free(message_handle->sent_to);
free(message_handle->observers);
free(message_handle->message); free(message_handle->message);
free(message_handle); free(message_handle);
} }
void message_add_sent_to(message_handle_t message_handle, uint16_t sent_to) { void message_add_sent_to(message_handle_t message_handle, node_id_t sent_to) {
assert(message_handle); assert(message_handle);
if (message_handle->sent_to == NULL) { // Expands array by re-allocation space
message_handle->sent_to = (uint16_t *) malloc(sizeof(uint16_t)); node_id_t *r_sent_to = realloc(message_handle->sent_to, ++(message_handle->num_sent_to) * sizeof(node_id_t));
if (!message_handle->sent_to) {
perror("Unable to allocate memory for message receivers.");
exit(EXIT_FAILURE);
}
++(message_handle->num_sent_to);
} else {
uint16_t *r_sent_to = realloc(message_handle->sent_to, ++(message_handle->num_sent_to) * sizeof(uint16_t));
if (!r_sent_to) { if (!r_sent_to) {
free(r_sent_to); free(r_sent_to);
perror("Unable to reallocate memory for message receivers."); perror("Unable to reallocate memory for message receivers.");
@ -75,58 +52,43 @@ void message_add_sent_to(message_handle_t message_handle, uint16_t sent_to) {
} }
message_handle->sent_to = r_sent_to; message_handle->sent_to = r_sent_to;
} // ++(message_handle->num_sent_to);
message_handle->sent_to[message_handle->num_sent_to - 1] = sent_to; message_handle->sent_to[message_handle->num_sent_to - 1] = sent_to;
message_state_changed(message_handle);
} }
char *message_get(message_handle_t message_handle, int *message_length) { char *message_get(message_handle_t message_handle, int *length) {
assert(message_handle); assert(message_handle && length);
static char *message_buffer = NULL;
static int message_buffer_len = 0;
char *message = (char *) malloc((message_handle->message_length + 1) * sizeof(char)); if (message_buffer_len != message_handle->message_length + 1) {
if (!message) { char *r_message_buffer = (char *) realloc(message_buffer, (message_handle->message_length + 1) * sizeof(char));
perror("Unable to allocate memory for message return."); if (!r_message_buffer) {
free(r_message_buffer);
perror("Unable to reallocate memory for message receivers.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
strcpy(message, message_handle->message); message_buffer = r_message_buffer;
(*message_length) = message_handle->message_length; message_buffer_len = message_handle->message_length + 1;
}
strcpy(message_buffer, message_handle->message);
(*length) = message_handle->message_length;
return message; return message_buffer;
} }
bool message_sent_to(message_handle_t message_handle, uint16_t node) { bool message_sent_to(message_handle_t message_handle, node_id_t id) {
assert(message_handle); assert(message_handle);
for (uint16_t i = 0; i < message_handle->num_sent_to; ++i) { for (uint16_t i = 0; i < message_handle->num_sent_to; ++i) {
if (message_handle->sent_to[i] == node) { if (message_handle->sent_to[i] == id) {
return true; return true;
} }
} }
return false; return false;
} }
void message_attach_observer(message_handle_t message_handle, observer_func observer) {
assert(message_handle);
if (message_handle->observers == NULL) {
message_handle->observers = (observer_func *) malloc(sizeof(observer_func));
if (!message_handle->observers) {
perror("Unable to allocate memory for message observers.");
exit(EXIT_FAILURE);
}
++(message_handle->num_observers);
} else {
uint16_t *r_observers = realloc(message_handle->observers,
++(message_handle->num_observers) * sizeof(observer_func));
if (!r_observers) {
free(r_observers);
perror("Unable to reallocate memory for message observers.");
exit(EXIT_FAILURE);
}
}
message_handle->observers[message_handle->num_observers - 1] = observer;
}

35
lib/message.h

@ -4,6 +4,16 @@
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include "types.h"
// max_message_length = sender_id 4 chars
// + underscore 1 char
// + receiver_id 4 chars
// + underscore 1 char
// + time-stamp 10 chars
// + underscore 1 char
// + message_text 256 chars (max)
// = 277 chars
#define MAX_MESSAGE_LENGTH 277 #define MAX_MESSAGE_LENGTH 277
// Message structure // Message structure
@ -11,34 +21,21 @@ typedef struct message_t message_t;
// and handle type // and handle type
typedef message_t *message_handle_t; typedef message_t *message_handle_t;
typedef void (*observer_func)(void);
#ifdef TEST //This is a test build
// Makes private functions reachable by the tester
#define unit_static
// Calls all observer functions attached to this message.
unit_static void message_state_changed(message_handle_t message_handle);
#else
#define unit_static static
#endif
// Initializes a message structure and returns the message handle. User must provide the length of // Initializes a message structure and returns the message handle. User must provide the length of
// the message without the null termination character (strlen); // the message without the null termination character (strlen);
message_handle_t message_init(const char *message, int message_length, uint16_t received_from); message_handle_t message_init(const char *message, uint16_t length, node_id_t received_from);
// Frees a message structure. // Frees a message structure.
void message_free(message_handle_t message_handle); void message_free(message_handle_t message_handle);
// Adds an new receiver to the message. // Adds an new receiver to the message.
void message_add_sent_to(message_handle_t message_handle, uint16_t sent_to); void message_add_sent_to(message_handle_t message_handle, node_id_t sent_to);
// Returns the message. // Returns the message. This functions returns a string in a statically allocated buffer, which
// subsequent calls will overwrite!
char *message_get(message_handle_t message_handle, int *message_length); char *message_get(message_handle_t message_handle, int *message_length);
// Returns true if the message has been previously sent to this node, false otherwise. // Returns true if the message has been previously sent to this ip, false otherwise.
bool message_sent_to(message_handle_t message_handle, uint16_t node); bool message_sent_to(message_handle_t message_handle, node_id_t id);
// Attaches an observer function to the message.
void message_attach_observer(message_handle_t message_handle, observer_func observer);
#endif //MESSAGE_H_ #endif //MESSAGE_H_

106
lib/node.c

@ -1,34 +1,34 @@
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <time.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <arpa/inet.h>
#include "node.h" #include "node.h"
// Defines the node structure // Defines the node structure
struct node_t { struct node_t {
struct sockaddr_in addr; address_t addr;
uint64_t **events; node_id_t id;
uint64_t appearance_duration; timestamp_t *events_timestamps;
uint8_t events_size; node_status *events_status;
node_status _node_status; uint16_t events_size;
int comm_socket;
}; };
node_handle_t node_init(struct sockaddr_in addr) { node_handle_t node_init(address_t addr, node_id_t id) {
//assert(addr);
node_handle_t node = malloc(sizeof(node_t)); node_handle_t node = malloc(sizeof(node_t));
assert(node); assert(node);
node->addr = addr; node->addr = addr;
node->events_size = 0; node->id = id;
node->events = (uint64_t **) malloc(2 * sizeof(uint64_t)); node->events_timestamps = (timestamp_t *) malloc(sizeof(timestamp_t));
node->events[0] = (uint64_t *) malloc(sizeof(uint64_t)); node->events_timestamps[0] = time(NULL);
node->events[1] = (uint64_t *) malloc(sizeof(uint64_t)); node->events_status = (node_status *) malloc(sizeof(node_status));
node->events[0][0] = 0; node->events_status[0] = NODE_INITIALIAZED;
node->events[1][0] = 0; node->events_size = 1;
node->appearance_duration = 0; node->comm_socket = -1;
node->_node_status = NODE_INITIALIAZED;
return node; return node;
} }
@ -36,89 +36,67 @@ node_handle_t node_init(struct sockaddr_in addr) {
void node_free(node_handle_t node) { void node_free(node_handle_t node) {
assert(node); assert(node);
free(node->events[0]); free(node->events_timestamps);
free(node->events[1]); free(node->events_status);
free(node->events);
free(node); free(node);
} }
void node_add_timestamp(node_handle_t node, time_t timestamp, bool visible) { void node_add_timestamp(node_handle_t node, timestamp_t timestamp, bool visible) {
assert(node && timestamp); assert(node && timestamp);
if ((visible && !node->events[1][node->events_size - 1]) || int *realloc_r = realloc(node->events_timestamps, node->events_size + 1);
(!visible && node->events[1][node->events_size - 1])) {
return;
}
if (visible) {
int *realloc_r = realloc(node->events[0], node->events_size + 1);
if (!realloc_r) { if (!realloc_r) {
node_free(node); node_free(node);
perror("Error trying to reallocate memory for event timestamps!"); perror("Error trying to reallocate memory for event timestamps.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
realloc_r = realloc(node->events[1], node->events_size + 1);
realloc_r = realloc(node->events_status, node->events_size + 1);
if (!realloc_r) { if (!realloc_r) {
node_free(node); node_free(node);
perror("Error trying to reallocate memory for event timestamps!"); perror("Error trying to reallocate memory for event statuses.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
node->events[0][node->events_size] = timestamp; node->events_timestamps[node->events_size] = timestamp;
node->events[1][node->events_size] = 0; node->events_status[node->events_size] = visible ? NODE_PRESENT : NODE_GONE;
node->_node_status = NODE_PRESENT;
++node->events_size; ++node->events_size;
} else {
node->events[1][node->events_size - 1] = timestamp;
node->_node_status = NODE_GONE;
node->appearance_duration += node->events[1][node->events_size - 1] -
node->events[0][node->events_size - 1];
}
} }
struct sockaddr_in node_get_addr(node_handle_t node) { address_t node_get_addr(node_handle_t node) {
assert(node); assert(node);
return node->addr; return node->addr;
} }
enum node_status node_get_status(node_handle_t node) { node_id_t node_get_id(node_handle_t node) {
assert(node); assert(node);
return node->_node_status; return node->id;
} }
uint8_t node_get_latest_appearance_duration(node_handle_t node) { node_status node_get_status(node_handle_t node) {
assert(node); assert(node);
if (node->_node_status == NODE_INITIALIAZED) { return node->events_status[node->events_size];
return 0;
} else if (node->events[1][node->events_size - 1] == 0) {
return (uint64_t)time(NULL) - node->events[0][node->events_size - 1];
} else {
return node->events[1][node->events_size - 1] - node->events[0][node->events_size - 1];
} }
}
uint8_t node_get_total_appearance_duration(node_handle_t node) {
assert(node);
return node->appearance_duration; int node_get_comm_socket(node_handle_t node) {
return node->comm_socket;
} }
uint8_t node_get_event_table(node_handle_t node, time_t ***event_table) { void node_set_comm_socket(node_handle_t node, int comm_socket) {
assert(node && event_table); node->comm_socket = comm_socket;
if (node->events_size < 1) {
return 0;
} }
(*event_table) = (time_t **) malloc(2 * sizeof(time_t *)); uint16_t node_get_events(node_handle_t node, timestamp_t **events_timestamps, node_status **events_status) {
(*event_table)[0] = (time_t *) malloc(node->events_size * sizeof(time_t)); assert(node && events_timestamps && events_status);
(*event_table)[1] = (time_t *) malloc(node->events_size * sizeof(time_t));
(*events_timestamps) = (timestamp_t *) malloc(node->events_size * sizeof(timestamp_t));
(*events_status) = (node_status *) malloc(node->events_size * sizeof(node_status));
memcpy((*event_table)[0], node->events[0], node->events_size * sizeof(time_t)); memcpy((*events_timestamps), node->events_timestamps, node->events_size * sizeof(timestamp_t));
memcpy((*event_table)[1], node->events[1], node->events_size * sizeof(time_t)); memcpy((*events_status), node->events_status, node->events_size * sizeof(node_status));
return node->events_size; return node->events_size;
} }

50
lib/node.h

@ -2,8 +2,9 @@
#define NODE_H_ #define NODE_H_
#include <stdbool.h> #include <stdbool.h>
#include <time.h> #include <stdint.h>
#include <arpa/inet.h>
#include "types.h"
// Node structure // Node structure
typedef struct node_t node_t; typedef struct node_t node_t;
@ -12,48 +13,33 @@ typedef node_t *node_handle_t;
typedef enum node_status { NODE_INITIALIAZED, NODE_PRESENT, NODE_GONE } node_status; typedef enum node_status { NODE_INITIALIAZED, NODE_PRESENT, NODE_GONE } node_status;
#ifdef TEST //This is a test build
// Makes private functions reachable by the tester
#define unit_static
#else
#define unit_static static
#endif
// Initializes a node structure and returns the node handle. // Initializes a node structure and returns the node handle.
node_handle_t node_init(struct sockaddr_in addr); node_handle_t node_init(address_t addr, node_id_t id);
// Frees a node structure. // Frees a node structure.
void node_free(node_handle_t node); void node_free(node_handle_t node);
// Adds an event timestamp to the node. Either the (re)appearance or the disappearance of the node. // Adds an event timestamp to the node.
void node_add_timestamp(node_handle_t node, time_t timestamp, bool visible); void node_add_timestamp(node_handle_t node, timestamp_t timestamp, bool visible);
// Returns the address of the node // Returns the address of the node
struct sockaddr_in node_get_addr(node_handle_t node); address_t node_get_addr(node_handle_t node);
node_status node_get_status(node_handle_t node);
//uint8_t node_get_appear_count(node_handle_t node); // Returns the id of the node
node_id_t node_get_id(node_handle_t node);
//uint8_t node_get_disappear_count(node_handle_t node); // Returns the latest node status.
node_status node_get_status(node_handle_t node);
//void node_get_latest_appear(node_handle_t node, );
//void node_get_latest_disappear(node_handle_t node, );
// Returns the duration (in seconds) of the latest stretch of time that this node has been visible.
uint8_t node_get_latest_appearance_duration(node_handle_t node);
// Returns the total duration (in seconds) of time that this node was visible. // Returns the current communication socket id of the node
uint8_t node_get_total_appearance_duration(node_handle_t node); int node_get_comm_socket(node_handle_t node);
// Returns the event timestamps table for this node. // Sets the current communication socket id of the node
uint8_t node_get_event_table(node_handle_t node, time_t ***event_table); void node_set_comm_socket(node_handle_t node, int comm_socket);
// Serializes the whole node to a single string // Returns the event tables for this node.
//int circ_buf_serialize(node_handle_t node, char **serialized); uint16_t node_get_events(node_handle_t node, timestamp_t **events_timestamps, node_status **events_status);
// De-serializes a string to a node //TODO node save thingy
//int circ_buf_deserialize(node_handle_t node, const char *serialized);
#endif //NODE_H_ #endif //NODE_H_

10
lib/types.h

@ -0,0 +1,10 @@
#ifndef CUSTOM_TYPES_H_
#define CUSTOM_TYPES_H_
typedef uint16_t node_id_t;
typedef struct sockaddr_in address_t;
typedef uint64_t timestamp_t;
typedef char *ipv4_t;
typedef uint16_t port_t;
#endif //CUSTOM_TYPES_H_

59
src/helpers.h

@ -1,59 +0,0 @@
#ifndef HELPERS_H_
#define HELPERS_H_
#define _GNU_SOURCE
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <ifaddrs.h>
#include <linux/if_link.h>
#include <sys/time.h>
#include <signal.h>
#include <string.h>
#include <sys/wait.h>
#include "node.h"
// Macros to turn a numeric macro into a string literal
#define xstr(s) str(s)
#define str(s) #s
// AEMs file related definitions
#define AEM_BUFFER_LEN 5
// Format for fscanf() to read the AEM numbers
#define ARP_LINE_FORMAT "%" xstr(ARP_STRING_LEN) "s\n"
int get_own_id(void);
int extract_id_from_ip(const char *ip);
void set_timer_and_handler(void (*handler)(int), long int timer_interval);
void create_message(char *new_message, uint16_t max_message_length, node_handle_t *neighbors,
uint8_t num_neighbors, int own_id);
int create_socket_and_listen(uint16_t port, uint8_t backlog_size);
void send_message(struct sockaddr_in peer_name, const char *message);
void accept_connection(int sock, struct sockaddr_in *client_name, fd_set *active_fd_set);
void write_to_peer(int filedes, const char *message);
int read_from_peer(int file_des, uint16_t max_line);
void fread_neighbors(char *AEM_file, node_handle_t **neighbors, uint16_t *num_neighbors, uint16_t port);
void init_sockaddr(struct sockaddr_in *name, const char *hostname, uint16_t port);
#endif //HELPERS_H_

101
src/zaqar.c

@ -1,3 +1,15 @@
#include <arpa/inet.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>
#include <unistd.h>
#include "types.h"
#include "message.h"
#include "circ_buff.h"
#include "helpers.h"
#include "zaqar.h" #include "zaqar.h"
#define MESSAGE "You got mail!!! xo xo xo" #define MESSAGE "You got mail!!! xo xo xo"
@ -8,22 +20,24 @@ int main(int argc, char **argv) {
check_args(argc, argv); check_args(argc, argv);
// Socket communication variables // Socket communication variables
int in_sock, own_id; int in_sock;
fd_set active_fd_set, read_fd_set; fd_set active_fd_set, read_fd_set;
struct sockaddr_in peer_name; address_t peer_name;
node_id_t own_id = atoi(argv[1]);
// Business logic variables // Business logic variables
node_handle_t *neighbors; node_handle_t *neighbors;
uint16_t num_neighbors = 0; uint16_t num_neighbors = 0;
message_handle_t *buffer;
cbuf_handle_t message_buffer;
own_id = get_own_id(); bool exhaustive_send = false;
if (own_id < 0) {
perror("Couldn't extract own ID."); buffer = (message_handle_t *) malloc(MESSAGE_BUFFER_SIZE * sizeof(message_handle_t));
exit(EXIT_FAILURE); message_buffer = circ_buf_init(buffer, MESSAGE_BUFFER_SIZE);
}
// Reads neighbors' IPs from file // Reads neighbors' IPs from file
void fread_neighbors(argv[1], &neighbors, &num_neighbors, PORT); num_neighbors = fread_neighbors(argv[1], &neighbors, PORT);
// Sets a timer and handler to produce an interrupt and send a message // Sets a timer and handler to produce an interrupt and send a message
set_timer_and_handler(handle_alarm, MIN_TIMER_VAL + rand() % (MAX_TIMER_VAL - MIN_TIMER_VAL)); set_timer_and_handler(handle_alarm, MIN_TIMER_VAL + rand() % (MAX_TIMER_VAL - MIN_TIMER_VAL));
@ -40,18 +54,37 @@ int main(int argc, char **argv) {
while (1) { while (1) {
if (sigalrm_flag) { if (sigalrm_flag) {
// It's time to send a message! // It's time to send a message!
char new_message[MAX_MESSAGE_LENGTH]; char new_message_payload[MAX_MESSAGE_LENGTH];
create_message(new_message, MAX_MESSAGE_LENGTH, neighbors, num_neighbors, own_id); uint16_t new_message_length = create_message(new_message_payload, MAX_MESSAGE_LENGTH,
neighbors, num_neighbors, own_id);
for (uint8_t i = 0; i < num_neighbors; ++i) { message_handle_t new_message = message_init(new_message_payload, new_message_length, own_id);
if (node_get_status(neighbors[i]) == NODE_PRESENT) { circ_buf_put(message_buffer, &new_message);
send_message(node_get_addr(neighbors[i]), new_message);
} for (uint16_t i = 0; i < num_neighbors; ++i) {
send_message(neighbors[i], new_message);
} }
// Resets timer // Resets timer
sigalrm_flag = false; sigalrm_flag = false;
set_timer_and_handler(handle_alarm, MIN_TIMER_VAL + rand() % (MAX_TIMER_VAL - MIN_TIMER_VAL)); set_timer_and_handler(handle_alarm, MIN_TIMER_VAL + rand() % (MAX_TIMER_VAL - MIN_TIMER_VAL));
// Checks for new neighbors/unsent messages while awake
exhaustive_send = true;
}
if (exhaustive_send) {
for (uint16_t node_index = 0; node_index < num_neighbors; ++node_index) {
node_id_t current_node_id = node_get_id(neighbors[node_index]);
for (uint message_index = 0; message_index < circ_buf_size(message_buffer); ++message_index) {
message_handle_t current_message;
if (!circ_buf_peek(message_buffer, message_index, &current_message) &&
!message_sent_to(current_message, current_node_id))
send_message(neighbors[node_index], current_message);
}
}
} }
// Shallow copies the readset // Shallow copies the readset
@ -68,33 +101,59 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// TODO: sent new messages to all neighbors
// TODO: if found new neighbor, sent all messages
// Services all the sockets with input pending // Services all the sockets with input pending
for (int i = 0; i < FD_SETSIZE; ++i) { for (int i = 0; i < FD_SETSIZE; ++i) {
if (FD_ISSET(i, &read_fd_set)) { if (FD_ISSET(i, &read_fd_set)) {
if (i == in_sock) { if (i == in_sock) {
// Connection request on original socket // Connection request on original socket
accept_connection(in_sock, &peer_name, &active_fd_set); int comm_socket;
node_id_t peer_id = accept_connection(in_sock, &peer_name, &active_fd_set,
&comm_socket);
// Updates neighbors array
num_neighbors = checkAddNode(&neighbors, num_neighbors , peer_name, peer_id);
if (comm_socket >= 0) {
for (int i = 0; i < num_neighbors; ++i) {
if(node_get_id(neighbors[i]) == peer_id) {
node_set_comm_socket(neighbors[i], comm_socket);
break;
}
}
}
} else { } else {
// Data arriving on an already-connected socket // Data arriving on an already-connected socket
if (read_from_peer(i, MAX_MESSAGE_LENGTH) < 0) { if (read_from_peer(i, MAX_MESSAGE_LENGTH + 1, &message_buffer, &neighbors, num_neighbors) < 0) {
close(i); close(i);
FD_CLR(i, &active_fd_set); FD_CLR(i, &active_fd_set);
// Checks for new neighbors/unsent messages while awake
exhaustive_send = true;
}
}
} }
} }
} }
//Frees memory
for (uint i = 0; i < circ_buf_size(message_buffer); ++i) {
message_handle_t current_message;
circ_buf_get(message_buffer, &current_message);
message_free(current_message);
} }
circ_buf_free(message_buffer);
for (int i = 0; i < num_neighbors; ++i) {
node_free(neighbors[i]);
} }
free(buffer);
return 0; return 0;
} }
void check_args(int argc, char **argv) { void check_args(int argc, char **argv) {
if (argc < 2) { if (argc != 3) {
printf("Correct usage is:\n"); printf("Correct usage is:\n");
printf("\t%s <aems_file> \n", argv[0]); printf("\t%s <node_id> <aems_file> \n", argv[0]);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }

29
src/zaqar.h

@ -1,38 +1,11 @@
#ifndef ZAQAR_H_ #ifndef ZAQAR_H_
#define ZAQAR_H_ #define ZAQAR_H_
#include <stdbool.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/signalfd.h>
#include "helpers.h"
#include "message.h"
#include "circ_buff.h"
#define MIN_TIMER_VAL 60 #define MIN_TIMER_VAL 60
#define MAX_TIMER_VAL 300 #define MAX_TIMER_VAL 300
#define PORT 5000 #define PORT 5000
// max_message_length = sender_id 4 chars
// + underscore 1 char
// + receiver_id 4 chars
// + underscore 1 char
// + time-stamp 10 chars
// + underscore 1 char
// + message_text 256 chars (max)
// + null_terminator 1 char
// = 278 chars
#define MAX_MESSAGE_LENGTH 278
#define BACKLOG_SIZE 2 #define BACKLOG_SIZE 2
#define MESSAGE_BUFFER_SIZE 2000
void check_args(int argc, char **argv); void check_args(int argc, char **argv);

Loading…
Cancel
Save