Multi-ApplicationOnlineProfiling  2.1
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
nsc.c
Go to the documentation of this file.
1 #include "nsc.h"
2 
3 #include <string.h>
4 #include <errno.h>
5 #include <unistd.h>
6 #include <sys/types.h>
7 #include <sys/socket.h>
8 #include <netdb.h>
9 #include <openssl/sha.h>
10 #include <signal.h>
11 
12 #include "CRC64.h"
13 
14 /* ############ Simple Context MGT ############ */
16 {
17  char *raw_addr = getenv( "NODE_SHARED_CACHE" );
18 
19  if( !raw_addr )
20  {
21  raw_addr = "127.0.0.1:9874";
22  }
23 
24  return raw_addr;
25 }
26 
28 {
29  char *pswd = getenv( "NODE_SHARED_CACHE_PWD" );
30 
31  if( !pswd )
32  {
33  pswd = "";
34  }
35 
36  return pswd;
37 }
38 
39 
40 int Context_init_data( struct Context *ctx, char *phost_port, char *password )
41 {
42  if( phost_port )
43  {
44 
45  char * host_port = strdup( phost_port );
46 
47  char *sep = strstr( host_port , ":");
48 
49  if( !sep )
50  {
51  printf("Error address must be in the 'ip:port' format\n");
52  return 1;
53  }
54 
55  *sep = '\0';
56 
57  ctx->addr = strdup(host_port);
58  ctx->s_port = strdup(sep + 1);
59 
60  free( host_port );
61 
62  char *endptr;
63 
64  ctx->port = strtol(ctx->s_port, &endptr, 10);
65 
66 
67  if( endptr == ctx->s_port )
68  {
69  printf("Error parsing port number\n");
70  return 1;
71  }
72 
73  }
74  else
75  {
76  ctx->addr = NULL;
77  ctx->s_port = 0;
78  }
79 
80  if( password )
81  {
82  ctx->passwd = strdup(password);
83  }
84  else
85  {
86  ctx->passwd = strdup("");
87  }
88 
89  return 0;
90 }
91 
92 int Context_init( struct Context *ctx )
93 {
95 }
96 
97 void Context_release( struct Context *ctx )
98 {
99  free( ctx->addr );
100  ctx->addr = NULL;
101 
102  free( ctx->s_port );
103  ctx->s_port = NULL;
104 
105  free( ctx->passwd );
106  ctx->passwd = NULL;
107 
108  ctx->port = -1;
109 }
110 
111 /*
112  NODE SHARED CACHE
113 */
114 
115 
116 /* Internals */
117 static inline int _nodeSharedCache_send( int socket, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action );
118 
119 static inline int _nodeSharedCache_send_up( struct nodeSharedCache *nsc , char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action );
120 int nodeSharedCache_send_up( struct nodeSharedCache *nsc , char *key, char *json, uint32_t action );
121 
122 static inline int _nodeSharedCache_send_down( struct nodeSharedCache *nsc , char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action );
123 int nodeSharedCache_send_down( struct nodeSharedCache *nsc , char *key, char *json, uint32_t action );
124 
125 void socket_flag_timeouts( int socket, int *socket_flagged_read, int *socket_flagged_write)
126 {
127 
128  struct timeval tv;
129  tv.tv_sec = 5;
130  tv.tv_usec = 0;
131 
132 
133  /* Try to set some time out flags */
134  if( setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval) ) != -1 )
135  {
136  if( socket_flagged_read )
137  *socket_flagged_read = 1;
138  }
139 
140  if( setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval) ) != -1 )
141  {
142  if( socket_flagged_write )
143  *socket_flagged_write = 1;
144  }
145 }
146 
147 
148 int nodeSharedCache_incoming_set( struct nodeSharedCache *nsc , char * key_buff, char *read_buff, uint32_t data_len )
149 {
150  read_buff[data_len] = '\0';
151 
152  //printf("==> SET { %s : %s }\n", key_buff, read_buff );
153 
154  jsonCache_set_json_nocommit( &nsc->cache, key_buff, read_buff );
155 
156  read_buff[0] = '\0';
157 
158  return 0;
159 }
160 
161 
162 
163 int safe_write( int fd, void *src, size_t size )
164 {
165  errno = 0;
166  size_t left_to_write = size;
167 
168  do
169  {
170  struct sigaction new, old;
171 
172  new.sa_handler = SIG_IGN;
173  sigemptyset (&new.sa_mask);
174  new.sa_flags = 0;
175 
176  sigaction( SIGPIPE, &new, &old );
177 
178  int ret = write( fd, src, size );
179 
180  sigaction( SIGPIPE, &old, NULL );
181 
182  if( ret <= 0 )
183  {
184  if( errno == EAGAIN )
185  continue;
186 
187  else if( errno != 0 )
188  {
189  //perror("write");
190  return -1;
191  }
192 
193  return 0;
194  }
195  else
196  {
197  left_to_write -= ret;
198  }
199 
200 
201  }while( left_to_write );
202 
203 
204  return size;
205 }
206 
207 static inline int _nodeSharedCache_send( int socket, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action )
208 {
209  uint32_t sizes[4];
210  sizes[0] = 0x47; /* Sync byte (just in case) */
211  sizes[1] = action;
212  sizes[2] = len_key;
213  sizes[3] = len_json;
214 
215  /* Send sizes */
216  if( safe_write( socket, sizes, sizeof(uint32_t) * 4 ) < 0 )
217  {
218  return 1;
219  }
220 
221  /* Send key */
222  if( safe_write( socket, key, sizes[2]) < 0 )
223  {
224  return 1;
225  }
226 
227  /* Send data */
228  if( safe_write( socket, json, sizes[3]) < 0 )
229  {
230  return 1;
231  }
232 
233  return 0;
234 }
235 
236 static inline int _nodeSharedCache_send_up( struct nodeSharedCache *nsc , char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action )
237 {
238  int ret = 0;
239 
240  if( !nsc->connected )
241  {
242  /* In this case we are the root so we have nothing to send up */
243  return 0;
244  }
245 
246  pthread_mutex_lock( &nsc->socket_w_lock );
247 
248  ret = _nodeSharedCache_send( nsc->socket, key, len_key, json, len_json, action );
249 
250  pthread_mutex_unlock( &nsc->socket_w_lock );
251 
252  return ret;
253 }
254 
255 
256 int nodeSharedCache_send_up( struct nodeSharedCache *nsc , char *key, char *json, uint32_t action )
257 {
258  int len_key = strlen( key );
259  int len_json = strlen( json );
260 
261  if( !len_key || !len_json )
262  return 1;
263 
264  return _nodeSharedCache_send_up( nsc, key, len_key, json, len_json, action );
265 }
266 
267 
268 static inline int _nodeSharedCache_send_down( struct nodeSharedCache *nsc , char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action )
269 {
270  int ret = 0;
271  int i;
272 
273  for( i = 0 ; i < nsc->connected_clients ; i++ )
274  {
275  //printf("Sending to %d\n", i );
276  pthread_mutex_lock( &nsc->client_socket_w_lock[i] );
277 
278  ret = _nodeSharedCache_send( nsc->clients[i], key, len_key, json, len_json, action );
279 
280  if( ret )
281  {
282  pthread_mutex_unlock( &nsc->client_socket_w_lock[i] );
283  break;
284  }
285 
286  pthread_mutex_unlock( &nsc->client_socket_w_lock[i] );
287  }
288 
289  return ret;
290 }
291 
292 
293 int nodeSharedCache_send_down( struct nodeSharedCache *nsc , char *key, char *json, uint32_t action )
294 {
295  int len_key = strlen( key );
296  int len_json = strlen( json );
297 
298  if( !len_key || !len_json )
299  return 1;
300 
301  return _nodeSharedCache_send_down( nsc, key, len_key, json, len_json, action );
302 }
303 
304 
305 
306 static inline int nodeSharedCacheRead_Header( int socket, uint32_t *action, uint32_t *key_len, uint32_t *data_len )
307 {
308  uint32_t header[4];
309 
310  errno = 0;
311  int ret = read( socket, header, sizeof(uint32_t) * 4 );
312 
313  if( ret != sizeof(uint32_t) * 4 )
314  {
315  if( ret == 0 || errno == EBADF )
316  return 1;
317 
318  printf("BAD protocol : Could not read header\n");
319  //perror("read");
320  return 1;
321  }
322 
323  if( header[0] != 0x47 )
324  {
325  printf("BAD protocol : sync byte is erroneous\n");
326  return 1;
327  }
328 
329  *action = header[1];
330  *key_len = header[2];
331  *data_len = header[3];
332 
333  return 0;
334 }
335 
336 static inline int nodeSharedCacheRead_Key( int socket, char *key_buff, uint32_t key_len )
337 {
338  int ret = 0;
339  errno = 0;
340  ret = read( socket, key_buff, key_len );
341 
342  if( ret != key_len )
343  {
344  if( ret == 0 || errno == EBADF )
345  return 1;
346 
347  printf("Could not read KEY\n");
348  //perror("read");
349  return 1;
350  }
351 
352  key_buff[key_len] = '\0';
353 
354  return 0;
355 }
356 
357 static inline int nodeSharedCacheRead_Payload( int socket, char *read_buff, uint32_t data_len, uint32_t max_buffer_size )
358 {
359  if( ( max_buffer_size - 1 ) <= data_len )
360  {
361  printf("ERROR Message too large BAD PROTOCOL");
362  return 1;
363  }
364  else
365  {
366  errno = 0;
367  int ret = read( socket, read_buff, data_len );
368 
369  if( ret != data_len )
370  {
371  if( ret == 0 || errno == EBADF )
372  return 1;
373 
374  printf("COULD not read payload");
375  //perror("read");
376  return 1;
377  }
378 
379  }
380 
381  return 0;
382 }
383 
384 
385 
386 
387 void * nodeSharedCache_read_loop( void *pnsc )
388 {
389  struct nodeSharedCache *nsc = (struct nodeSharedCache *) pnsc;
390 
391  char *key_buff = malloc( nsc->max_key_len + 1 );
392 
393  if( !key_buff )
394  {
395  perror("malloc");
396  abort();
397  }
398 
399  char *read_buff = malloc( nsc->max_buffer_size );
400 
401  if( !read_buff )
402  {
403  perror("malloc");
404  abort();
405  }
406 
407 
408  while( 1 )
409  {
410  uint32_t action = 0;
411  uint32_t key_len = 0;
412  uint32_t data_len = 0;
413 
414  if( nodeSharedCacheRead_Header( nsc->socket, &action, &key_len, &data_len ) )
415  {
416  break;
417  }
418 
419  //printf("INCOMING NSC UP %d\n", action );
420 
421  if( nodeSharedCacheRead_Key( nsc->socket, key_buff, key_len ) )
422  {
423  break;
424  }
425 
426  if( nodeSharedCacheRead_Payload( nsc->socket, read_buff, data_len, nsc->max_buffer_size ) )
427  {
428  break;
429  }
430 
431 
432  if( action == NSC_CLOSE )
433  {
434  break;
435  }
436 
437  switch( action )
438  {
439  case NSC_BCAST:
440  printf("BCAST %s\n", key_buff);
441  /* Send DOWN the tree and set LOCAL */
442  if( _nodeSharedCache_send_down( nsc , key_buff, key_len, read_buff, data_len, NSC_BCAST ) )
443  {
444  break;
445  }
446  case NSC_REDUCE:
447  printf("REDUCE %s\n", key_buff);
448  /* As reduce are just supposed to go up just set local */
449  case NSC_SET :
450  printf("=> SET %s\n", key_buff);
451  /* Set local from up the tree */
452  nodeSharedCache_incoming_set( nsc , key_buff, read_buff, data_len );
453  break;
454  case NSC_DELETE:
455  printf("DEL %s\n", key_buff);
456  /* Delete local from up the tree */
457  jsonCache_delete_nocommit( &nsc->cache, key_buff );
458  break;
459  }
460 
461  sched_yield();
462  }
463 
464  free( read_buff );
465  free( key_buff );
466 
467  close( nsc->socket );
468 
469  //printf("Closed connection\n");
470 
471  return NULL;
472 }
473 
474 
475 uint64_t nodeSharedCache_compute_auth_digest( char *passwd, char *challenge )
476 {
478 
479  int pwdlen = strlen(passwd);
480 
481  uint64_t crc1, crc2;
482 
483  crc1 = MALP_Trace_crc64(passwd, pwdlen );
484  crc2 = MALP_Trace_crc64(challenge, 512 );
485 
486  unsigned char CRCS[64];
487 
488  sprintf(CRCS, "%016llx%016llx",crc1, crc2 );
489 
490  return MALP_Trace_crc64(CRCS, 32 );
491 }
492 
493 
494 
496 {
497  char challenge[512];
498 
499  if( read( nsc->socket, challenge, 512 ) != 512 )
500  {
501  //perror("read");
502  return 1;
503  }
504 
505  uint64_t hex_digest = nodeSharedCache_compute_auth_digest( nsc->ctx.passwd, challenge );
506 
507  if( safe_write( nsc->socket, &hex_digest, sizeof(uint64_t) ) != sizeof(uint64_t) )
508  {
509  //perror("write");
510  return 1;
511  }
512 
513  uint32_t ret;
514 
515  if( read( nsc->socket, &ret, sizeof(uint32_t) ) != sizeof(uint32_t) )
516  {
517  //perror("read");
518  return 1;
519  }
520 
521  return ret;
522 }
523 
524 
526 {
527  if( !nsc )
528  return 1;
529 
530  nsc->been_waited = 0;
531 
532  nsc->client_socket_w_lock = NULL;
533  nsc->client_threads = NULL;
534  nsc->clients = NULL;
535  nsc->connected = 0;
536 
537  /* ############ Setup Datastructures ############ */
539 
540  /* If we do not connect to a server just return */
541  if( !nsc->ctx.addr )
542  {
543  /* Set a default config for future clients */
544  nsc->max_key_len = 1024;
545  nsc->max_buffer_size = 1024*1024;
546  return 0;
547  }
548  /* ############ Connect to NODE.js ############ */
549 
550  errno = 0;
551 
552  struct addrinfo *results;
553 
554  int ret = getaddrinfo( nsc->ctx.addr, nsc->ctx.s_port, NULL, &results);
555 
556  if( ret != 0 )
557  {
558  printf("Error resolving name : %s\n", gai_strerror(ret) );
559  return 1;
560  }
561 
562  struct addrinfo *current = results;
563 
564  int connected = 0;
565 
566  while( current )
567  {
568  errno = 0;
569  nsc->socket =socket( current->ai_family, current->ai_socktype, current->ai_protocol );
570 
571  if( nsc->socket < 0 )
572  {
573  perror( "socket" );
574  }
575  else
576  {
577  errno = 0;
578  if (connect(nsc->socket, current->ai_addr, current->ai_addrlen) != -1)
579  {
580  //printf("Connected to %s:%d\n", nsc->ctx.addr, nsc->ctx.port );
581  connected = 1;
582  break;
583  }
584  else
585  {
586  perror("connect");
587  }
588 
589  }
590 
591  close( nsc->socket );
592 
593  current = current->ai_next;
594  }
595 
596  freeaddrinfo(results);
597 
598  if( connected == 0 )
599  {
600  printf("Failed to connect to %s:%d\n", nsc->ctx.addr, nsc->ctx.port);
601  return 1;
602  }
603 
604  /* SAY HELLO and LOAD REMOTE CONFIG */
605  char *greetings = "Hello NodeSharedCache";
606  int greetings_len = strlen( greetings );
607 
608  if( safe_write( nsc->socket, greetings, greetings_len ) != greetings_len )
609  {
610  printf("Failed to register to node.js server\n");
611  close( nsc->socket );
612  return 1;
613  }
614 
615  /* Authenticate */
616 
617  if( nodeSharedCache_do_auth( nsc ) )
618  {
619  printf("Server refused password\n");
620  close( nsc->socket );
621  return 1;
622  }
623 
624  /* Retrieve remote config */
625  uint32_t remote_config[3] = { 0 , 0 , 0};
626 
627  if( read( nsc->socket, remote_config, 3 * sizeof( uint32_t ) ) != (3 * sizeof( uint32_t )) )
628  {
629  perror("Failed to read remote configuration...\n");
630  close( nsc->socket );
631  return 1;
632  }
633 
634  if( remote_config[0] != 777 )
635  {
636  printf("Error bad server protocol\n");
637  close( nsc->socket );
638  return 1;
639  }
640 
641  nsc->max_key_len = remote_config[1];
642  nsc->max_buffer_size = remote_config[2];
643 
644  pthread_mutex_init( &nsc->socket_w_lock, NULL);
645 
646 
647  /* ############ Start Reading/writing Threads ############ */
648  pthread_create( &nsc->th, NULL, nodeSharedCache_read_loop, (void *)nsc);
649 
650  //pthread_detach( nsc->th );
651 
652  nsc->connected = 1;
653 
654  return 0;
655 }
656 
658 {
659  if( !nsc )
660  return 1;
661 
662  /* Fill in ctx with default values */
663  if( Context_init( &nsc->ctx ) )
664  return 1;
665 
666  nsc->expected_clients = 0;
667  nsc->connected_clients = 0;
668 
669  /* Now connect to the server/proxy */
670  if( _nodeSharedCache_init( nsc) )
671  return 1;
672 
673  return 0;
674 }
675 
676 
678 {
679  int ret = 0;
680  char *greetings = "Hello NodeSharedCache";
681  int greetings_len = strlen( greetings );
682 
683  char buff[50];
684 
685  if( (ret = recv( socket, buff, greetings_len, 0 ) ) == -1 )
686  {
687  return -1;
688  }
689 
690  /* Check greetings format */
691  if( ret != greetings_len )
692  {
693  printf("Bad protocol in proxy (BAD LENGTH got %d)\n", ret);
694  return -1;
695  }
696 
697  if( memcmp( buff, greetings, greetings_len ) )
698  {
699  printf("Bad protocol in proxy (BAD greetings)\n");
700  return -1;
701  }
702 
703  return 0;
704 }
705 
706 int nodeSharedCache_proxy_auth( struct nodeSharedCache *nsc, int socket )
707 {
708  char challenge[512];
709  int i;
710 
711  /* Prepare a challenge */
712  for( i = 0 ; i < 512 ; i++ )
713  challenge[i] = rand();
714 
715  uint64_t auth_digest = nodeSharedCache_compute_auth_digest( nsc->ctx.passwd, challenge );
716 
717  /* Send the challenge */
718  if( send( socket, challenge, 512, 0 ) == -1 )
719  {
720  return -1;
721  }
722 
723  uint64_t buff;
724  /* Read the digest */
725  if( recv( socket, &buff, sizeof(uint64_t), 0 ) == -1 )
726  {
727  return -1;
728  }
729 
730  uint32_t bad_password = 0;
731 
732  if( memcmp(&auth_digest, &buff, sizeof(uint64_t)) )
733  {
734  printf("Bad password\n");
735  bad_password = 1;
736  }
737 
738  /* Inform the client of our decision */
739  if( send( socket, (char *)&bad_password, sizeof(uint32_t), 0 ) == -1 )
740  {
741  return -1;
742  }
743 
744 
745 
746  return !bad_password;
747 }
748 
750 {
751  /* Retrieve remote config */
752  uint32_t remote_config[3] = { 777 , nsc->max_key_len , nsc->max_buffer_size};
753 
754  if( send( socket, (char *)remote_config, 3*sizeof(uint32_t), 0 ) == -1 )
755  {
756  printf("Failed to send nodeSharedCache config\n");
757  return -1;
758  }
759 
760  return 0;
761 }
762 
763 
765 {
767  int id;
768 };
769 
770 
772 {
773  struct nscRoutingThreadCtx *ctx = (struct nscRoutingThreadCtx *)arg;
774 
775  int my_socket = ctx->nsc->clients[ ctx->id ];
776 
777  char *key_buff = malloc( ctx->nsc->max_key_len + 1 );
778 
779  if( !key_buff )
780  {
781  perror("malloc");
782  abort();
783  }
784 
785  char *read_buff = malloc( ctx->nsc->max_buffer_size );
786 
787  if( !read_buff )
788  {
789  perror("malloc");
790  abort();
791  }
792 
793 
794  while( 1 )
795  {
796  uint32_t action = 0;
797  uint32_t key_len = 0;
798  uint32_t data_len = 0;
799 
800  if( nodeSharedCacheRead_Header( my_socket, &action, &key_len, &data_len ) )
801  {
802  break;
803  }
804 
805 
806  if( nodeSharedCacheRead_Key( my_socket, key_buff, key_len ) )
807  {
808  break;
809  }
810 
811  if( nodeSharedCacheRead_Payload( my_socket, read_buff, data_len, ctx->nsc->max_buffer_size ) )
812  {
813  break;
814  }
815 
816 
817  //printf("[DOWN %d] (%s : %s) over %d\n", action, key_buff, read_buff, ctx->id );
818  /* Here we want to speedup the closing process */
819  if( action == NSC_CLOSE )
820  {
821  _nodeSharedCache_send( my_socket, "EOF", 4, "{}", 3, NSC_CLOSE );
822  break;
823  }
824 
825  switch( action )
826  {
827  case NSC_REDUCE:
828  /* Send UP the tree */
829  if( _nodeSharedCache_send_up( ctx->nsc , key_buff, key_len, read_buff, data_len, NSC_REDUCE ) )
830  {
831  break;
832  }
833  case NSC_BCAST:
834  /* As bcast are just supposed to go down just set local */
835  case NSC_SET :
836  /* Set local from down the tree */
837  nodeSharedCache_incoming_set( ctx->nsc , key_buff, read_buff, data_len );
838  break;
839  case NSC_DELETE:
840  /* Delete local from down the tree */
841  jsonCache_delete_nocommit( &ctx->nsc->cache, key_buff );
842  break;
843  }
844 
845  sched_yield();
846  }
847 
848  free( read_buff );
849  free( key_buff );
850  free( ctx );
851 
852  close( my_socket );
853 
854  //printf("Proxy client disconnected\n");
855 
856  return NULL;
857 }
858 
859 
861 {
862 
863  if( nsc->expected_clients != 0 )
864  {
865  /* We now accept clients */
866  while( nsc->connected_clients < nsc->expected_clients )
867  {
868  socklen_t len = sizeof( struct sockaddr );
869  struct sockaddr remote_infos;
870 
871  int client_socket = accept(nsc->listening_socket, &remote_infos, &len);
872 
873  if( client_socket == -1 )
874  {
875  perror("accept");
876  continue;
877  }
878  else
879  {
880  /* Here a client is connected */
881 
882  /* Receive protocol greetings */
883  if( nodeSharedCache_proxy_greetings( client_socket ) == -1 )
884  {
885  close(client_socket);
886  continue;
887  }
888 
889  /* Here the client seems to be a proxy
890  * Lets do the authentification */
891  int did_auth = 0;
892 
893  if( (did_auth = nodeSharedCache_proxy_auth( nsc, client_socket )) == -1 )
894  {
895  close(client_socket);
896  continue;
897  }
898 
899  if( did_auth == 0 )
900  {
901  printf("Client failed authentification\n");
902  close(client_socket);
903  continue;
904  }
905 
906  /* We now send the config */
907  if( nodeSharedCache_proxy_send_config( nsc, client_socket ) == -1 )
908  {
909  close(client_socket);
910  continue;
911  }
912 
913  /* Client is sucessfully authentified
914  * start routing loop */
915  nsc->clients[nsc->connected_clients] = client_socket;
916  nsc->connected_clients ++;
917  }
918 
919  }
920 
921  /* Here we close the listening socket
922  * as every clients are connected */
923  close(nsc->listening_socket);
924 
925 
926  /* We now create the threads for clients */
927  nsc->client_threads = malloc( nsc->expected_clients * sizeof( pthread_t ) );
928 
929  if( !nsc->client_threads )
930  {
931  perror( "malloc" );
932  return -1;
933  }
934 
935  int i;
936  for( i = 0 ; i < nsc->expected_clients ; i++ )
937  {
938  struct nscRoutingThreadCtx *ctx = malloc( sizeof( struct nscRoutingThreadCtx ) );
939 
940  if( !ctx )
941  {
942  perror( "malloc" );
943  return -1;
944  }
945 
946  ctx->nsc = nsc;
947  ctx->id = i;
948 
949  pthread_create( &nsc->client_threads[i], NULL, nodeSharedCache_proxy_routing_thread, (void *) ctx );
950 
951  }
952 
953  }
954 
955  return 0;
956 }
957 
958 
959 
960 
961 
962 int nodeSharedCache_proxy_init( struct nodeSharedCache *nsc, char *destination_server, int expected_clients )
963 {
964  if( Context_init_data( &nsc->ctx, destination_server, Context_passwd_from_env() ) )
965  return -1;
966 
967  /* Fill in the suplementary expected clients attribute */
968  nsc->expected_clients = expected_clients;
969  nsc->connected_clients = 0;
970 
971  /* Connect to server/relay */
972  if( _nodeSharedCache_init( nsc) )
973  return -1;
974 
975  nsc->client_socket_w_lock = malloc( nsc->expected_clients * sizeof( pthread_mutex_t ) + sizeof(int) );
976 
977  if( !nsc->client_socket_w_lock )
978  {
979  perror("malloc");
980  return -1;
981  }
982 
983  int i;
984  for( i = 0 ; i < nsc->expected_clients ; i++ )
985  pthread_mutex_init( &nsc->client_socket_w_lock[i], NULL);
986 
987 
988 
989  int proxy_port = 0;
990 
991 
992  /* Start the proxy server to receive #expected_clients connections*/
993  if( expected_clients != 0 )
994  {
995  errno = 0;
996 
997  struct addrinfo hints, *results;
998 
999  memset( &hints, 0, sizeof( struct addrinfo ) );
1000  hints.ai_family = AF_UNSPEC;
1001  hints.ai_socktype = SOCK_STREAM;
1002  hints.ai_flags = AI_PASSIVE;
1003 
1004  /* Note that we bind ourselves to port 0 in order to get a free port */
1005  int ret = getaddrinfo( NULL, "0", &hints, &results);
1006 
1007  if( ret != 0 )
1008  {
1009  printf("Could not find a binding method : %s\n", gai_strerror(ret) );
1010  return -1;
1011  }
1012 
1013  struct addrinfo *current = results;
1014 
1015  nsc->listening_socket = 0;
1016 
1017  nsc->clients = malloc( expected_clients * sizeof( int ) );
1018 
1019  if( !nsc->clients )
1020  {
1021  perror("malloc");
1022  return -1;
1023  }
1024 
1025  while( current )
1026  {
1027  errno = 0;
1028  nsc->listening_socket =socket( current->ai_family, current->ai_socktype, current->ai_protocol );
1029 
1030  if( nsc->listening_socket < 0 )
1031  {
1032  perror( "socket" );
1033  continue;
1034  }
1035 
1036  errno = 0;
1037 
1038  /* Bind the socket to the port */
1039  if (bind(nsc->listening_socket, current->ai_addr, current->ai_addrlen) != -1)
1040  {
1041  /* Start listening on that port */
1042  if (listen(nsc->listening_socket, expected_clients) != -1)
1043  {
1044  /* As we are bound to "0" we want to get the actual port */
1045  struct sockaddr_in socket_infos;
1046  socklen_t infolen = sizeof(struct sockaddr);
1047 
1048  if(getsockname(nsc->listening_socket, (struct sockaddr*)&socket_infos, &infolen) == -1 )
1049  {
1050  perror( "getsockname" );
1051  return -1;
1052  }
1053  else
1054  {
1055  if( infolen == sizeof(struct sockaddr_in) )
1056  proxy_port = ntohs(socket_infos.sin_port);
1057  }
1058 
1059  //printf("Proxy listening on port %d\n", proxy_port);
1060 
1061  /* Proxy server is now up and listening for clients */
1062  break;
1063 
1064  }
1065  else
1066  {
1067  perror("listen");
1068  close(nsc->listening_socket);
1069  return -1;
1070 
1071  }
1072  }
1073  else
1074  {
1075  perror("bind");
1076  close(nsc->listening_socket);
1077  return -1;
1078  }
1079 
1080  current = current->ai_next;
1081  }
1082 
1083  freeaddrinfo(results);
1084 
1085  if( nsc->listening_socket == 0 )
1086  {
1087  printf("Failed to set a proxy server and establish connections\n");
1088  return -1;
1089  }
1090  }
1091 
1092  return proxy_port;
1093 }
1094 
1095 
1097 {
1098  if( ! nsc->been_waited )
1099  {
1100  if( nsc->expected_clients == 0 )
1101  nodeSharedCache_send_up( nsc , "EOF", "{}", NSC_CLOSE );
1102 
1103  int i;
1104 
1105  for( i = 0 ; i < nsc->expected_clients ; i++ )
1106  {
1107  pthread_join( nsc->client_threads[i], NULL );
1108  }
1109 
1110  if( nsc->expected_clients != 0 )
1111  nodeSharedCache_send_up( nsc , "EOF", "{}", NSC_CLOSE );
1112 
1113  if( nsc->connected )
1114  {
1115  pthread_join( nsc->th, NULL );
1116  }
1117 
1118 
1119  nsc->been_waited = 1;
1120  }
1121 }
1122 
1124 {
1125  nodeSharedCache_wait( nsc );
1126 
1127  jsonCache_release( &nsc->cache );
1128 
1129  Context_release( &nsc->ctx );
1130  pthread_mutex_destroy( &nsc->socket_w_lock );
1131 
1132  int i;
1133  for( i = 0 ; i < nsc->expected_clients ; i++ )
1134  pthread_mutex_destroy( &nsc->client_socket_w_lock[i] );
1135 
1136  free( nsc->client_socket_w_lock );
1137  free( nsc->client_threads );
1138  free( nsc->clients );
1139 }
1140 
1141 
1142 
1143 void nodeSharedCache_set_handler( char *key, json_t *elem, void *ctx )
1144 {
1145  struct nodeSharedCache *nsc = (struct nodeSharedCache *)ctx;
1146 
1147  char *json = json_dump( elem, JSON_COMPACT | JSON_NO_LOCK );
1148 
1149  if( json )
1150  {
1151  nodeSharedCache_send_up( nsc, key, json, NSC_SET );
1152  }
1153 
1154  free( json );
1155 
1156 
1157 }
1158 
1159 void nodeSharedCache_delete_handler( char *key, void *ctx )
1160 {
1161  struct nodeSharedCache *nsc = (struct nodeSharedCache *)ctx;
1162 
1163  nodeSharedCache_send_up( nsc, key, "", NSC_DELETE );
1164 }
1165 
1166 
1167 void nodeSharedCache_set_json( struct nodeSharedCache *nsc, char *key, char *json )
1168 {
1169  /* Here we directly send to avoid a sumplementary DUMPING */
1170  nodeSharedCache_send_up( nsc, key, json, NSC_SET );
1171 
1172  jsonCache_set_json_nocommit( &nsc->cache, key, json );
1173 }
1174 
1175 void nodeSharedCache_set( struct nodeSharedCache *nsc, char *key, json_t *json )
1176 {
1177  jsonCache_set( &nsc->cache, key, json );
1178 }
1179 
1180 
1181 void nodeSharedCache_reduce_json( struct nodeSharedCache *nsc, char *key, char *json )
1182 {
1183  /* Here we directly send to avoid a sumplementary DUMPING */
1184  nodeSharedCache_send_up( nsc, key, json, NSC_REDUCE );
1185 
1186  jsonCache_set_json_nocommit( &nsc->cache, key, json );
1187 }
1188 
1189 void nodeSharedCache_reduce( struct nodeSharedCache *nsc, char *key, json_t *elem )
1190 {
1191  /* Here we explicitly do the parsing as we have no way to change the flag
1192  * to NSC_REDUCE in the set_handler */
1193  char *json = json_dump( elem, JSON_COMPACT | JSON_NO_LOCK );
1194 
1195  //printf("%d REDUCING KEY %s\n =============== \n %s \n ===================\n", getpid(), key, json);
1196 
1197  if( json )
1198  {
1199  nodeSharedCache_send_up( nsc, key, json, NSC_REDUCE );
1200  }
1201 
1202  free( json );
1203 
1204  jsonCache_set_nocommit( &nsc->cache, key, elem );
1205 }
1206 
1207 
1208 json_t * nodeSharedCache_get( struct nodeSharedCache *nsc, char *key )
1209 {
1210  return jsonCache_get( &nsc->cache, key );
1211 }
1212 
1213 
1214 
1215 char * nodeSharedCache_get_json( struct nodeSharedCache *nsc, char *key )
1216 {
1217  json_t *ret = jsonCache_get( &nsc->cache, key );
1218 
1219  if( !ret )
1220  return strdup("{}");
1221 
1222  char *json = json_dump( ret, JSON_COMPACT );
1223 
1224  json_decref( ret );
1225 
1226  return json;
1227 }
1228 
1229 
1230 void nodeSharedCache_delete( struct nodeSharedCache *nsc, char *key )
1231 {
1232  jsonCache_delete( &nsc->cache, key );
1233 }
1234 
1235 void nodeSharedCache_commit( struct nodeSharedCache *nsc, char *key )
1236 {
1237  jsonCache_commit( &nsc->cache, key );
1238 }
1239 
1240 
static int nodeSharedCacheRead_Payload(int socket, char *read_buff, uint32_t data_len, uint32_t max_buffer_size)
Definition: nsc.c:357
void jsonCache_set(struct jsonCache *c, char *key, json_t *elem)
Definition: jsonCache.c:1619
json_t * nodeSharedCache_get(struct nodeSharedCache *nsc, char *key)
Definition: nsc.c:1208
struct Context ctx
Definition: nsc.h:32
void nodeSharedCache_delete(struct nodeSharedCache *nsc, char *key)
Definition: nsc.c:1230
void jsonCache_delete(struct jsonCache *c, char *key)
Definition: jsonCache.c:1565
static uint64_t MALP_Trace_crc64(char *source, uint64_t size)
Computes the hash of a given data.
Definition: CRC64.h:54
int nodeSharedCache_proxy_init(struct nodeSharedCache *nsc, char *destination_server, int expected_clients)
Definition: nsc.c:962
void nodeSharedCache_commit(struct nodeSharedCache *nsc, char *key)
Definition: nsc.c:1235
void nodeSharedCache_reduce(struct nodeSharedCache *nsc, char *key, json_t *elem)
Definition: nsc.c:1189
int connected_clients
Definition: nsc.h:47
static int nodeSharedCacheRead_Header(int socket, uint32_t *action, uint32_t *key_len, uint32_t *data_len)
Definition: nsc.c:306
void nodeSharedCache_reduce_json(struct nodeSharedCache *nsc, char *key, char *json)
Definition: nsc.c:1181
int nodeSharedCache_init(struct nodeSharedCache *nsc)
Definition: nsc.c:657
void nodeSharedCache_set_handler(char *key, json_t *elem, void *ctx)
Definition: nsc.c:1143
void jsonCache_init(struct jsonCache *c, void(*set_handler)(char *, json_t *, void *), void(*delete_handler)(char *, void *), void *ctx)
Definition: jsonCache.c:1522
pthread_mutex_t socket_w_lock
Definition: nsc.h:41
void json_decref(json_t *json)
Definition: jsonCache.c:23
int port
Definition: nsc.h:12
int socket
Definition: nsc.h:40
Definition: nsc.h:24
static int _nodeSharedCache_send(int socket, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action)
Definition: nsc.c:207
int * clients
Definition: nsc.h:45
int nodeSharedCache_proxy_auth(struct nodeSharedCache *nsc, int socket)
Definition: nsc.c:706
char * nodeSharedCache_get_json(struct nodeSharedCache *nsc, char *key)
Definition: nsc.c:1215
Definition: nsc.h:22
uint32_t max_key_len
Definition: nsc.h:36
uint64_t nodeSharedCache_compute_auth_digest(char *passwd, char *challenge)
Definition: nsc.c:475
char * s_port
Definition: nsc.h:11
void * nodeSharedCache_read_loop(void *pnsc)
Definition: nsc.c:387
void nodeSharedCache_set(struct nodeSharedCache *nsc, char *key, json_t *json)
Definition: nsc.c:1175
void socket_flag_timeouts(int socket, int *socket_flagged_read, int *socket_flagged_write)
Definition: nsc.c:125
Definition: nsc.h:25
void nodeSharedCache_delete_handler(char *key, void *ctx)
Definition: nsc.c:1159
void jsonCache_commit(struct jsonCache *c, char *key)
Definition: jsonCache.c:1578
static int nodeSharedCacheRead_Key(int socket, char *key_buff, uint32_t key_len)
Definition: nsc.c:336
int nodeSharedCache_send_down(struct nodeSharedCache *nsc, char *key, char *json, uint32_t action)
Definition: nsc.c:293
int connected
Definition: nsc.h:42
char * passwd
Definition: nsc.h:9
void jsonCache_set_json_nocommit(struct jsonCache *c, char *key, char *json)
Definition: jsonCache.c:1653
void * nodeSharedCache_proxy_routing_thread(void *arg)
Definition: nsc.c:771
void jsonCache_release(struct jsonCache *c)
Definition: jsonCache.c:1531
uint32_t max_buffer_size
Definition: nsc.h:37
pthread_mutex_t * client_socket_w_lock
Definition: nsc.h:48
json_t * jsonCache_get(struct jsonCache *c, char *key)
Definition: jsonCache.c:1539
int listening_socket
Definition: nsc.h:44
int nodeSharedCache_incoming_set(struct nodeSharedCache *nsc, char *key_buff, char *read_buff, uint32_t data_len)
Definition: nsc.c:148
int expected_clients
Definition: nsc.h:46
pthread_t th
Definition: nsc.h:39
int nodeSharedCache_send_up(struct nodeSharedCache *nsc, char *key, char *json, uint32_t action)
Definition: nsc.c:256
struct jsonCache cache
Definition: nsc.h:51
int been_waited
Definition: nsc.h:34
static int _nodeSharedCache_send_down(struct nodeSharedCache *nsc, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action)
Definition: nsc.c:268
char * json_dump(json_t *json, json_format mode)
Definition: jsonCache.c:938
pthread_t * client_threads
Definition: nsc.h:49
Definition: nsc.h:7
int nodeSharedCache_proxy_send_config(struct nodeSharedCache *nsc, int socket)
Definition: nsc.c:749
int nodeSharedCache_proxy_accept_clients(struct nodeSharedCache *nsc)
Definition: nsc.c:860
int safe_write(int fd, void *src, size_t size)
Definition: nsc.c:163
void jsonCache_delete_nocommit(struct jsonCache *c, char *key)
Definition: jsonCache.c:1570
void jsonCache_set_nocommit(struct jsonCache *c, char *key, json_t *elem)
Definition: jsonCache.c:1625
void nodeSharedCache_release(struct nodeSharedCache *nsc)
Definition: nsc.c:1123
char * Context_host_from_env()
Definition: nsc.c:15
void nodeSharedCache_wait(struct nodeSharedCache *nsc)
Definition: nsc.c:1096
char * Context_passwd_from_env()
Definition: nsc.c:27
int Context_init_data(struct Context *ctx, char *phost_port, char *password)
Definition: nsc.c:40
void nodeSharedCache_set_json(struct nodeSharedCache *nsc, char *key, char *json)
Definition: nsc.c:1167
int nodeSharedCache_do_auth(struct nodeSharedCache *nsc)
Definition: nsc.c:495
char * addr
Definition: nsc.h:10
static int _nodeSharedCache_send_up(struct nodeSharedCache *nsc, char *key, uint32_t len_key, char *json, uint32_t len_json, uint32_t action)
Definition: nsc.c:236
void Context_release(struct Context *ctx)
Definition: nsc.c:97
Definition: nsc.h:23
Definition: nsc.h:21
int Context_init(struct Context *ctx)
Definition: nsc.c:92
int nodeSharedCache_proxy_greetings(int socket)
Definition: nsc.c:677
void MALP_Trace_crc64_init()
This initializes the CRC64 internals.
Definition: CRC64.c:26
int _nodeSharedCache_init(struct nodeSharedCache *nsc)
Definition: nsc.c:525
struct nodeSharedCache * nsc
Definition: nsc.c:766