7 #include <sys/socket.h>
9 #include <openssl/sha.h>
17 char *raw_addr = getenv(
"NODE_SHARED_CACHE" );
21 raw_addr =
"127.0.0.1:9874";
29 char *pswd = getenv(
"NODE_SHARED_CACHE_PWD" );
45 char * host_port = strdup( phost_port );
47 char *sep = strstr( host_port ,
":");
51 printf(
"Error address must be in the 'ip:port' format\n");
57 ctx->
addr = strdup(host_port);
58 ctx->
s_port = strdup(sep + 1);
67 if( endptr == ctx->
s_port )
69 printf(
"Error parsing port number\n");
82 ctx->
passwd = strdup(password);
117 static inline int _nodeSharedCache_send(
int socket,
char *key, uint32_t len_key,
char *json, uint32_t len_json, uint32_t action );
134 if( setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (
char *)&tv,
sizeof(
struct timeval) ) != -1 )
136 if( socket_flagged_read )
137 *socket_flagged_read = 1;
140 if( setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (
char *)&tv,
sizeof(
struct timeval) ) != -1 )
142 if( socket_flagged_write )
143 *socket_flagged_write = 1;
150 read_buff[data_len] =
'\0';
166 size_t left_to_write = size;
170 struct sigaction new, old;
172 new.sa_handler = SIG_IGN;
173 sigemptyset (&
new.sa_mask);
176 sigaction( SIGPIPE, &
new, &old );
178 int ret = write( fd, src, size );
180 sigaction( SIGPIPE, &old, NULL );
184 if( errno == EAGAIN )
187 else if( errno != 0 )
197 left_to_write -= ret;
201 }
while( left_to_write );
207 static inline int _nodeSharedCache_send(
int socket,
char *key, uint32_t len_key,
char *json, uint32_t len_json, uint32_t action )
216 if(
safe_write( socket, sizes,
sizeof(uint32_t) * 4 ) < 0 )
258 int len_key = strlen( key );
259 int len_json = strlen( json );
261 if( !len_key || !len_json )
295 int len_key = strlen( key );
296 int len_json = strlen( json );
298 if( !len_key || !len_json )
311 int ret = read( socket, header,
sizeof(uint32_t) * 4 );
313 if( ret !=
sizeof(uint32_t) * 4 )
315 if( ret == 0 || errno == EBADF )
318 printf(
"BAD protocol : Could not read header\n");
323 if( header[0] != 0x47 )
325 printf(
"BAD protocol : sync byte is erroneous\n");
330 *key_len = header[2];
331 *data_len = header[3];
340 ret = read( socket, key_buff, key_len );
344 if( ret == 0 || errno == EBADF )
347 printf(
"Could not read KEY\n");
352 key_buff[key_len] =
'\0';
359 if( ( max_buffer_size - 1 ) <= data_len )
361 printf(
"ERROR Message too large BAD PROTOCOL");
367 int ret = read( socket, read_buff, data_len );
369 if( ret != data_len )
371 if( ret == 0 || errno == EBADF )
374 printf(
"COULD not read payload");
411 uint32_t key_len = 0;
412 uint32_t data_len = 0;
440 printf(
"BCAST %s\n", key_buff);
447 printf(
"REDUCE %s\n", key_buff);
450 printf(
"=> SET %s\n", key_buff);
455 printf(
"DEL %s\n", key_buff);
479 int pwdlen = strlen(passwd);
486 unsigned char CRCS[64];
488 sprintf(CRCS,
"%016llx%016llx",crc1, crc2 );
499 if( read( nsc->
socket, challenge, 512 ) != 512 )
507 if(
safe_write( nsc->
socket, &hex_digest,
sizeof(uint64_t) ) !=
sizeof(uint64_t) )
515 if( read( nsc->
socket, &ret,
sizeof(uint32_t) ) !=
sizeof(uint32_t) )
552 struct addrinfo *results;
558 printf(
"Error resolving name : %s\n", gai_strerror(ret) );
562 struct addrinfo *current = results;
569 nsc->
socket =socket( current->ai_family, current->ai_socktype, current->ai_protocol );
578 if (connect(nsc->
socket, current->ai_addr, current->ai_addrlen) != -1)
593 current = current->ai_next;
596 freeaddrinfo(results);
600 printf(
"Failed to connect to %s:%d\n", nsc->
ctx.
addr, nsc->
ctx.
port);
605 char *greetings =
"Hello NodeSharedCache";
606 int greetings_len = strlen( greetings );
610 printf(
"Failed to register to node.js server\n");
619 printf(
"Server refused password\n");
625 uint32_t remote_config[3] = { 0 , 0 , 0};
627 if( read( nsc->
socket, remote_config, 3 *
sizeof( uint32_t ) ) != (3 *
sizeof( uint32_t )) )
629 perror(
"Failed to read remote configuration...\n");
634 if( remote_config[0] != 777 )
636 printf(
"Error bad server protocol\n");
680 char *greetings =
"Hello NodeSharedCache";
681 int greetings_len = strlen( greetings );
685 if( (ret = recv( socket, buff, greetings_len, 0 ) ) == -1 )
691 if( ret != greetings_len )
693 printf(
"Bad protocol in proxy (BAD LENGTH got %d)\n", ret);
697 if( memcmp( buff, greetings, greetings_len ) )
699 printf(
"Bad protocol in proxy (BAD greetings)\n");
712 for( i = 0 ; i < 512 ; i++ )
713 challenge[i] = rand();
718 if( send( socket, challenge, 512, 0 ) == -1 )
725 if( recv( socket, &buff,
sizeof(uint64_t), 0 ) == -1 )
730 uint32_t bad_password = 0;
732 if( memcmp(&auth_digest, &buff,
sizeof(uint64_t)) )
734 printf(
"Bad password\n");
739 if( send( socket, (
char *)&bad_password,
sizeof(uint32_t), 0 ) == -1 )
746 return !bad_password;
754 if( send( socket, (
char *)remote_config, 3*
sizeof(uint32_t), 0 ) == -1 )
756 printf(
"Failed to send nodeSharedCache config\n");
797 uint32_t key_len = 0;
798 uint32_t data_len = 0;
868 socklen_t len =
sizeof(
struct sockaddr );
869 struct sockaddr remote_infos;
873 if( client_socket == -1 )
885 close(client_socket);
895 close(client_socket);
901 printf(
"Client failed authentification\n");
902 close(client_socket);
909 close(client_socket);
993 if( expected_clients != 0 )
997 struct addrinfo hints, *results;
999 memset( &hints, 0,
sizeof(
struct addrinfo ) );
1000 hints.ai_family = AF_UNSPEC;
1001 hints.ai_socktype = SOCK_STREAM;
1002 hints.ai_flags = AI_PASSIVE;
1005 int ret = getaddrinfo( NULL,
"0", &hints, &results);
1009 printf(
"Could not find a binding method : %s\n", gai_strerror(ret) );
1013 struct addrinfo *current = results;
1017 nsc->
clients = malloc( expected_clients *
sizeof(
int ) );
1028 nsc->
listening_socket =socket( current->ai_family, current->ai_socktype, current->ai_protocol );
1039 if (bind(nsc->
listening_socket, current->ai_addr, current->ai_addrlen) != -1)
1045 struct sockaddr_in socket_infos;
1046 socklen_t infolen =
sizeof(
struct sockaddr);
1048 if(getsockname(nsc->
listening_socket, (
struct sockaddr*)&socket_infos, &infolen) == -1 )
1050 perror(
"getsockname" );
1055 if( infolen ==
sizeof(
struct sockaddr_in) )
1056 proxy_port = ntohs(socket_infos.sin_port);
1080 current = current->ai_next;
1083 freeaddrinfo(results);
1087 printf(
"Failed to set a proxy server and establish connections\n");
1115 pthread_join( nsc->
th, NULL );
1220 return strdup(
"{}");
static int nodeSharedCacheRead_Payload(int socket, char *read_buff, uint32_t data_len, uint32_t max_buffer_size)
void jsonCache_set(struct jsonCache *c, char *key, json_t *elem)
json_t * nodeSharedCache_get(struct nodeSharedCache *nsc, char *key)
void nodeSharedCache_delete(struct nodeSharedCache *nsc, char *key)
void jsonCache_delete(struct jsonCache *c, char *key)
static uint64_t MALP_Trace_crc64(char *source, uint64_t size)
Computes the hash of a given data.
int nodeSharedCache_proxy_init(struct nodeSharedCache *nsc, char *destination_server, int expected_clients)
void nodeSharedCache_commit(struct nodeSharedCache *nsc, char *key)
void nodeSharedCache_reduce(struct nodeSharedCache *nsc, char *key, json_t *elem)
static int nodeSharedCacheRead_Header(int socket, uint32_t *action, uint32_t *key_len, uint32_t *data_len)
void nodeSharedCache_reduce_json(struct nodeSharedCache *nsc, char *key, char *json)
int nodeSharedCache_init(struct nodeSharedCache *nsc)
void nodeSharedCache_set_handler(char *key, json_t *elem, void *ctx)
void jsonCache_init(struct jsonCache *c, void(*set_handler)(char *, json_t *, void *), void(*delete_handler)(char *, void *), void *ctx)
pthread_mutex_t socket_w_lock
void json_decref(json_t *json)
static int _nodeSharedCache_send(int socket, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action)
int nodeSharedCache_proxy_auth(struct nodeSharedCache *nsc, int socket)
char * nodeSharedCache_get_json(struct nodeSharedCache *nsc, char *key)
uint64_t nodeSharedCache_compute_auth_digest(char *passwd, char *challenge)
void * nodeSharedCache_read_loop(void *pnsc)
void nodeSharedCache_set(struct nodeSharedCache *nsc, char *key, json_t *json)
void socket_flag_timeouts(int socket, int *socket_flagged_read, int *socket_flagged_write)
void nodeSharedCache_delete_handler(char *key, void *ctx)
void jsonCache_commit(struct jsonCache *c, char *key)
static int nodeSharedCacheRead_Key(int socket, char *key_buff, uint32_t key_len)
int nodeSharedCache_send_down(struct nodeSharedCache *nsc, char *key, char *json, uint32_t action)
void jsonCache_set_json_nocommit(struct jsonCache *c, char *key, char *json)
void * nodeSharedCache_proxy_routing_thread(void *arg)
void jsonCache_release(struct jsonCache *c)
pthread_mutex_t * client_socket_w_lock
json_t * jsonCache_get(struct jsonCache *c, char *key)
int nodeSharedCache_incoming_set(struct nodeSharedCache *nsc, char *key_buff, char *read_buff, uint32_t data_len)
int nodeSharedCache_send_up(struct nodeSharedCache *nsc, char *key, char *json, uint32_t action)
static int _nodeSharedCache_send_down(struct nodeSharedCache *nsc, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action)
char * json_dump(json_t *json, json_format mode)
pthread_t * client_threads
int nodeSharedCache_proxy_send_config(struct nodeSharedCache *nsc, int socket)
int nodeSharedCache_proxy_accept_clients(struct nodeSharedCache *nsc)
int safe_write(int fd, void *src, size_t size)
void jsonCache_delete_nocommit(struct jsonCache *c, char *key)
void jsonCache_set_nocommit(struct jsonCache *c, char *key, json_t *elem)
void nodeSharedCache_release(struct nodeSharedCache *nsc)
char * Context_host_from_env()
void nodeSharedCache_wait(struct nodeSharedCache *nsc)
char * Context_passwd_from_env()
int Context_init_data(struct Context *ctx, char *phost_port, char *password)
void nodeSharedCache_set_json(struct nodeSharedCache *nsc, char *key, char *json)
int nodeSharedCache_do_auth(struct nodeSharedCache *nsc)
static int _nodeSharedCache_send_up(struct nodeSharedCache *nsc, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action)
void Context_release(struct Context *ctx)
int Context_init(struct Context *ctx)
int nodeSharedCache_proxy_greetings(int socket)
void MALP_Trace_crc64_init()
This initializes the CRC64 internals.
int _nodeSharedCache_init(struct nodeSharedCache *nsc)
struct nodeSharedCache * nsc