Branch data Line data Source code
1 : : // $Id: RemoteSerializer.cc 6951 2009-12-04 22:23:28Z vern $
2 : : //
3 : : // Processes involved in the communication:
4 : : //
5 : : // (Local-Parent) <-> (Local-Child) <-> (Remote-Child) <-> (Remote-Parent)
6 : : //
7 : : // Message types (for parent<->child communication the CMsg's peer indicates
8 : : // about whom we're talking).
9 : : //
10 : : // Communication protocol version
11 : : // VERSION <version> <cache_size> <data-format-version>
12 : : // <run-time> [<class:string>]
13 : : //
14 : : // Send serialization
15 : : // SERIAL <serialization>
16 : : //
17 : : // Terminate(d) connection
18 : : // CLOSE
19 : : //
20 : : // Close(d) all connections
21 : : // CLOSE_ALL
22 : : //
23 : : // Connect to remote side
24 : : // CONNECT_TO <id-of-new-peer> <ip> <port> <retry-interval> <use-ssl>
25 : : //
26 : : // Connected to remote side
27 : : // CONNECTED <ip> <port>
28 : : //
29 : : // Request events from remote side
30 : : // REQUEST_EVENTS <list of events>
31 : : //
32 : : // Request synchronization of IDs with remote side
33 : : // REQUEST_SYNC <authorative:bool>
34 : : //
35 : : // Listen for connection on ip/port (ip may be INADDR_ANY)
36 : : // LISTEN <ip> <port> <use_ssl>
37 : : //
38 : : // Close listen ports.
39 : : // LISTEN_STOP
40 : : //
41 : : // Error caused by host
42 : : // ERROR <msg>
43 : : //
44 : : // Some statistics about the given peer connection
45 : : // STATS <string>
46 : : //
47 : : // Requests to set a new capture_filter
48 : : // CAPTURE_FILTER <string>
49 : : //
50 : : // Ping to peer
51 : : // PING <struct ping_args>
52 : : //
53 : : // Pong from peer
54 : : // PONG <struct ping_args>
55 : : //
56 : : // Announce our capabilities
57 : : // CAPS <flags> <reserved> <reserved>
58 : : //
59 : : // Activate compression (parent->child)
60 : : // COMPRESS <level>
61 : : //
62 : : // Indicate that all following blocks are compressed (child->child)
63 : : // COMPRESS
64 : : //
65 : : // Synchronize for pseudo-realtime processing.
66 : : // Signals that we have reached sync-point number <count>.
67 : : // SYNC_POINT <count>
68 : : //
69 : : // Signals the child that we want to terminate. Anything sent after this may
70 : : // get lost. When the child answers with another TERMINATE it is safe to
71 : : // shutdown.
72 : : // TERMINATE
73 : : //
74 : : // Debug-only: tell child to dump recently received/sent data to disk.
75 : : // DEBUG_DUMP
76 : : //
77 : : // Valid messages between processes:
78 : : //
79 : : // Main -> Child
80 : : // CONNECT_TO
81 : : // REQUEST_EVENTS
82 : : // SERIAL
83 : : // CLOSE
84 : : // CLOSE_ALL
85 : : // LISTEN
86 : : // LISTEN_STOP
87 : : // CAPTURE_FILTER
88 : : // VERSION
89 : : // REQUEST_SYNC
90 : : // PHASE_DONE
91 : : // PING
92 : : // PONG
93 : : // CAPS
94 : : // COMPRESS
95 : : // SYNC_POINT
96 : : // DEBUG_DUMP
97 : : // REMOTE_PRINT
98 : : //
99 : : // Child -> Main
100 : : // CONNECTED
101 : : // REQUEST_EVENTS
102 : : // SERIAL
103 : : // CLOSE
104 : : // ERROR
105 : : // STATS
106 : : // VERSION
107 : : // CAPTURE_FILTER
108 : : // REQUEST_SYNC
109 : : // PHASE_DONE
110 : : // PING
111 : : // PONG
112 : : // CAPS
113 : : // LOG
114 : : // SYNC_POINT
115 : : // REMOTE_PRINT
116 : : //
117 : : // Child <-> Child
118 : : // VERSION
119 : : // SERIAL
120 : : // REQUEST_EVENTS
121 : : // CAPTURE_FILTER
122 : : // REQUEST_SYNC
123 : : // PHASE_DONE
124 : : // PING
125 : : // PONG
126 : : // CAPS
127 : : // COMPRESS
128 : : // SYNC_POINT
129 : : // REMOTE_PRINT
130 : : //
131 : : // A connection between two peers has four phases:
132 : : //
133 : : // Setup:
134 : : // Initial phase.
135 : : // VERSION messages must be exchanged.
136 : : // Ends when both peers have sent VERSION.
137 : : // Handshake:
138 : : // REQUEST_EVENTS/REQUEST_SYNC/CAPTURE_FILTER/CAPS/selected SERIALs
139 : : // may be exchanged.
140 : : // Phase ends when both peers have sent PHASE_DONE.
141 : : // State synchronization:
142 : : // Entered iff at least one of the peers has sent REQUEST_SYNC.
143 : : // The peer with the smallest runtime (incl. in VERSION msg) sends
144 : : // SERIAL messages compromising all of its state.
145 : : // Phase ends when peer sends another PHASE_DONE.
146 : : // Running:
147 : : // Peers exchange SERIAL (and PING/PONG) messages.
148 : : // Phase ends with connection tear-down by one of the peers.
149 : :
150 : : #include <sys/types.h>
151 : : #include <sys/socket.h>
152 : : #include <sys/wait.h>
153 : : #include <netinet/in.h>
154 : : #include <unistd.h>
155 : : #include <errno.h>
156 : : #include <signal.h>
157 : : #include <arpa/inet.h>
158 : : #include <fcntl.h>
159 : : #include <signal.h>
160 : : #include <strings.h>
161 : : #include <stdarg.h>
162 : :
163 : : #include "config.h"
164 : : #ifdef TIME_WITH_SYS_TIME
165 : : # include <sys/time.h>
166 : : # include <time.h>
167 : : #else
168 : : # ifdef HAVE_SYS_TIME_H
169 : : # include <sys/time.h>
170 : : # else
171 : : # include <time.h>
172 : : # endif
173 : : #endif
174 : : #include <sys/resource.h>
175 : :
176 : : #include "RemoteSerializer.h"
177 : : #include "Func.h"
178 : : #include "EventRegistry.h"
179 : : #include "Event.h"
180 : : #include "Net.h"
181 : : #include "NetVar.h"
182 : : #include "Scope.h"
183 : : #include "Sessions.h"
184 : : #include "File.h"
185 : : #include "Conn.h"
186 : :
187 : : extern "C" {
188 : : #include "setsignal.h"
189 : : };
190 : :
191 : : // Gets incremented each time there's an incompatible change
192 : : // to the communication internals.
193 : : static const unsigned short PROTOCOL_VERSION = 0x06;
194 : :
195 : : static const char MSG_NONE = 0x00;
196 : : static const char MSG_VERSION = 0x01;
197 : : static const char MSG_SERIAL = 0x02;
198 : : static const char MSG_CLOSE = 0x03;
199 : : static const char MSG_CLOSE_ALL = 0x04;
200 : : static const char MSG_ERROR = 0x05;
201 : : static const char MSG_CONNECT_TO = 0x06;
202 : : static const char MSG_CONNECTED = 0x07;
203 : : static const char MSG_REQUEST_EVENTS = 0x08;
204 : : static const char MSG_LISTEN = 0x09;
205 : : static const char MSG_LISTEN_STOP = 0x0a;
206 : : static const char MSG_STATS = 0x0b;
207 : : static const char MSG_CAPTURE_FILTER = 0x0c;
208 : : static const char MSG_REQUEST_SYNC = 0x0d;
209 : : static const char MSG_PHASE_DONE = 0x0e;
210 : : static const char MSG_PING = 0x0f;
211 : : static const char MSG_PONG = 0x10;
212 : : static const char MSG_CAPS = 0x11;
213 : : static const char MSG_COMPRESS = 0x12;
214 : : static const char MSG_LOG = 0x13;
215 : : static const char MSG_SYNC_POINT = 0x14;
216 : : static const char MSG_TERMINATE = 0x15;
217 : : static const char MSG_DEBUG_DUMP = 0x16;
218 : : static const char MSG_REMOTE_PRINT = 0x17;
219 : :
220 : : // Update this one whenever adding a new ID:
221 : : static const char MSG_ID_MAX = MSG_REMOTE_PRINT;
222 : :
223 : : static const uint32 FINAL_SYNC_POINT = /* UINT32_MAX */ 4294967295U;
224 : :
225 : : // Buffer size for remote-print data
226 : : static const int PRINT_BUFFER_SIZE = 10 * 1024;
227 : : static const int SOCKBUF_SIZE = 1024 * 1024;
228 : :
229 : : struct ping_args {
230 : : uint32 seq;
231 : : double time1; // Round-trip time parent1<->parent2
232 : : double time2; // Round-trip time child1<->parent2
233 : : double time3; // Round-trip time child2<->parent2
234 : : };
235 : :
236 : : #ifdef DEBUG
237 : : # define DEBUG_COMM(msg) DBG_LOG(DBG_COMM, msg)
238 : : #else
239 : : # define DEBUG_COMM(msg)
240 : : #endif
241 : :
242 : : #define READ_CHUNK(i, c, do_if_eof) \
243 : : { \
244 : : if ( ! i->Read(&c) ) \
245 : : { \
246 : : if ( i->Eof() ) \
247 : : { \
248 : : do_if_eof; \
249 : : } \
250 : : else \
251 : : Error(fmt("can't read data chunk: %s", io->Error()), i == io); \
252 : : return false; \
253 : : } \
254 : : \
255 : : if ( ! c ) \
256 : : return true; \
257 : : }
258 : :
259 : : #define READ_CHUNK_FROM_CHILD(c) \
260 : : { \
261 : : if ( ! io->Read(&c) ) \
262 : : { \
263 : : if ( io->Eof() ) \
264 : : ChildDied(); \
265 : : else \
266 : : Error(fmt("can't read data chunk: %s", io->Error())); \
267 : : return false; \
268 : : } \
269 : : \
270 : : if ( ! c ) \
271 : : { \
272 : : idle = io->IsIdle();\
273 : : return true; \
274 : : } \
275 : : idle = false; \
276 : : }
277 : :
278 : 0 : static const char* msgToStr(int msg)
279 : : {
280 : : # define MSG_STR(x) case x: return #x;
281 [ # # # # # : 0 : switch ( msg ) {
# # # # #
# # # # #
# # # # #
# # # # # ]
282 : 0 : MSG_STR(MSG_VERSION)
283 : 0 : MSG_STR(MSG_NONE)
284 : 0 : MSG_STR(MSG_SERIAL)
285 : 0 : MSG_STR(MSG_CLOSE)
286 : 0 : MSG_STR(MSG_CLOSE_ALL)
287 : 0 : MSG_STR(MSG_ERROR)
288 : 0 : MSG_STR(MSG_CONNECT_TO)
289 : 0 : MSG_STR(MSG_CONNECTED)
290 : 0 : MSG_STR(MSG_REQUEST_EVENTS)
291 : 0 : MSG_STR(MSG_LISTEN)
292 : 0 : MSG_STR(MSG_LISTEN_STOP)
293 : 0 : MSG_STR(MSG_STATS)
294 : 0 : MSG_STR(MSG_CAPTURE_FILTER)
295 : 0 : MSG_STR(MSG_REQUEST_SYNC)
296 : 0 : MSG_STR(MSG_PHASE_DONE)
297 : 0 : MSG_STR(MSG_PING)
298 : 0 : MSG_STR(MSG_PONG)
299 : 0 : MSG_STR(MSG_CAPS)
300 : 0 : MSG_STR(MSG_COMPRESS)
301 : 0 : MSG_STR(MSG_LOG)
302 : 0 : MSG_STR(MSG_SYNC_POINT)
303 : 0 : MSG_STR(MSG_TERMINATE)
304 : 0 : MSG_STR(MSG_DEBUG_DUMP)
305 : 0 : MSG_STR(MSG_REMOTE_PRINT)
306 : : default:
307 : 0 : return "UNKNOWN_MSG";
308 : : }
309 : : }
310 : :
311 : : // Start of every message between two processes. We do the low-level work
312 : : // ourselves to make this 64-bit safe. (The actual layout is an artifact of
313 : : // an earlier design that depended on how a 32-bit GCC lays out its structs ...)
314 : : class CMsg {
315 : : public:
316 : 0 : CMsg(char type, RemoteSerializer::PeerID peer)
317 : : {
318 : 0 : buffer[0] = type;
319 : 0 : uint32 tmp = htonl(peer);
320 : 0 : memcpy(buffer + 4, &tmp, sizeof(tmp));
321 : 0 : }
322 : :
323 : 0 : char Type() { return buffer[0]; }
324 : 0 : RemoteSerializer::PeerID Peer()
325 : : {
326 : : // Wow, is this ugly...
327 : 0 : return ntohl(*(uint32*)(buffer + 4));
328 : : }
329 : :
330 : : const char* Raw() { return buffer; }
331 : :
332 : : private:
333 : : char buffer[8];
334 : : };
335 : :
336 : 0 : static bool sendCMsg(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id)
337 : : {
338 : : // We use the new[] operator here to avoid mismatches
339 : : // when deleting the data.
340 : 0 : CMsg* msg = (CMsg*) new char[sizeof(CMsg)];
341 [ # # ]: 0 : new (msg) CMsg(msg_type, id);
342 : :
343 : 0 : ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
344 : 0 : c->len = sizeof(CMsg);
345 : 0 : c->data = (char*) msg;
346 : :
347 : 0 : return io->Write(c);
348 : : }
349 : :
350 : 0 : static ChunkedIO::Chunk* makeSerialMsg(RemoteSerializer::PeerID id)
351 : : {
352 : : // We use the new[] operator here to avoid mismatches
353 : : // when deleting the data.
354 : 0 : CMsg* msg = (CMsg*) new char[sizeof(CMsg)];
355 [ # # ]: 0 : new (msg) CMsg(MSG_SERIAL, id);
356 : :
357 : 0 : ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
358 : 0 : c->len = sizeof(CMsg);
359 : 0 : c->data = (char*) msg;
360 : :
361 : 0 : return c;
362 : : }
363 : :
364 : 0 : inline void RemoteSerializer::SetupSerialInfo(SerialInfo* info, Peer* peer)
365 : : {
366 : 0 : info->chunk = makeSerialMsg(peer->id);
367 [ # # ]: 0 : if ( peer->caps & Peer::NO_CACHING )
368 : 0 : info->cache = false;
369 : :
370 [ # # ][ # # ]: 0 : if ( ! (peer->caps & Peer::PID_64BIT) || peer->phase != Peer::RUNNING )
371 : 0 : info->pid_32bit = true;
372 : :
373 [ # # ][ # # ]: 0 : if ( (peer->caps & Peer::NEW_CACHE_STRATEGY) &&
374 : : peer->phase == Peer::RUNNING )
375 : 0 : info->new_cache_strategy = true;
376 : :
377 : 0 : info->include_locations = false;
378 : 0 : }
379 : :
380 : 0 : static bool sendToIO(ChunkedIO* io, ChunkedIO::Chunk* c)
381 : : {
382 [ # # ]: 0 : if ( ! io->Write(c) )
383 : : {
384 : 0 : warn(fmt("can't send chunk: %s", io->Error()));
385 : 0 : return false;
386 : : }
387 : :
388 : 0 : return true;
389 : : }
390 : :
391 : : static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
392 : 0 : const char* str, int len = -1)
393 : : {
394 [ # # ]: 0 : if ( ! sendCMsg(io, msg_type, id) )
395 : : {
396 : 0 : warn(fmt("can't send message of type %d: %s", msg_type, io->Error()));
397 : 0 : return false;
398 : : }
399 : :
400 : 0 : ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
401 [ # # ]: 0 : c->len = len >= 0 ? len : strlen(str) + 1;
402 : 0 : c->data = const_cast<char*>(str);
403 : 0 : return sendToIO(io, c);
404 : : }
405 : :
406 : : static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
407 : 0 : int nargs, va_list ap)
408 : : {
409 [ # # ]: 0 : if ( ! sendCMsg(io, msg_type, id) )
410 : : {
411 : 0 : warn(fmt("can't send message of type %d: %s", msg_type, io->Error()));
412 : 0 : return false;
413 : : }
414 : :
415 [ # # ]: 0 : if ( nargs == 0 )
416 : 0 : return true;
417 : :
418 : 0 : uint32* args = new uint32[nargs];
419 : :
420 [ # # ]: 0 : for ( int i = 0; i < nargs; i++ )
421 [ # # ]: 0 : args[i] = htonl(va_arg(ap, uint32));
422 : :
423 : 0 : ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
424 : 0 : c->len = sizeof(uint32) * nargs;
425 : 0 : c->data = (char*) args;
426 : :
427 : 0 : return sendToIO(io, c);
428 : : }
429 : :
430 : : #ifdef DEBUG
431 : 0 : static inline char* fmt_uint32s(int nargs, va_list ap)
432 : : {
433 : : static char buf[512];
434 : 0 : char* p = buf;
435 : 0 : *p = '\0';
436 [ # # ]: 0 : for ( int i = 0; i < nargs; i++ )
437 : : p += snprintf(p, sizeof(buf) - (p - buf),
438 [ # # ]: 0 : " 0x%08x", va_arg(ap, uint32));
439 : 0 : buf[511] = '\0';
440 : 0 : return buf;
441 : : }
442 : : #endif
443 : :
444 : :
445 : 0 : static inline const char* ip2a(uint32 ip)
446 : : {
447 : : static char buffer[32];
448 : : struct in_addr addr;
449 : :
450 : 0 : addr.s_addr = htonl(ip);
451 : :
452 : 0 : return inet_ntop(AF_INET, &addr, buffer, 32);
453 : : }
454 : :
455 : : static pid_t child_pid = 0;
456 : :
457 : : // Return true if message type is sent by a peer (rather than the child
458 : : // process itself).
459 : 0 : static inline bool is_peer_msg(int msg)
460 : : {
461 : : return msg == MSG_VERSION ||
462 : : msg == MSG_SERIAL ||
463 : : msg == MSG_REQUEST_EVENTS ||
464 : : msg == MSG_REQUEST_SYNC ||
465 : : msg == MSG_CAPTURE_FILTER ||
466 : : msg == MSG_PHASE_DONE ||
467 : : msg == MSG_PING ||
468 : : msg == MSG_PONG ||
469 : : msg == MSG_CAPS ||
470 : : msg == MSG_COMPRESS ||
471 : : msg == MSG_SYNC_POINT ||
472 [ # # ][ # # ]: 0 : msg == MSG_REMOTE_PRINT;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
473 : : }
474 : :
475 : 0 : bool RemoteSerializer::IsConnectedPeer(PeerID id)
476 : : {
477 [ # # ]: 0 : if ( id == PEER_NONE )
478 : 0 : return true;
479 : :
480 : 0 : return LookupPeer(id, true) != 0;
481 : : }
482 : :
483 [ # # ][ # # ]: 0 : class IncrementalSendTimer : public Timer {
484 : : public:
485 : 0 : IncrementalSendTimer(double t, RemoteSerializer::Peer* p, SerialInfo* i)
486 : 0 : : Timer(t, TIMER_INCREMENTAL_SEND), info(i), peer(p) {}
487 : 0 : virtual void Dispatch(double t, int is_expire)
488 : : {
489 : : // Never suspend when we're finishing up.
490 [ # # ]: 0 : if ( terminating )
491 : 0 : info->may_suspend = false;
492 : :
493 : 0 : remote_serializer->SendAllSynchronized(peer, info);
494 : 0 : }
495 : :
496 : : SerialInfo* info;
497 : : RemoteSerializer::Peer* peer;
498 : : };
499 : :
500 : 3 : RemoteSerializer::RemoteSerializer()
501 : : {
502 : 3 : initialized = false;
503 : 3 : current_peer = 0;
504 : 3 : msgstate = TYPE;
505 : 3 : id_counter = 1;
506 : 3 : listening = false;
507 : 3 : ignore_accesses = false;
508 : 3 : propagate_accesses = 1;
509 : 3 : current_sync_point = 0;
510 : 3 : syncing_times = false;
511 : 3 : io = 0;
512 : 3 : closed = false;
513 : 3 : terminating = false;
514 : 3 : in_sync = 0;
515 : 3 : }
516 : :
517 : 1 : RemoteSerializer::~RemoteSerializer()
518 : : {
519 [ - + ][ # # ]: 1 : if ( child_pid )
[ # # ]
520 : : {
521 : 0 : kill(child_pid, SIGKILL);
522 : 0 : waitpid(child_pid, 0, 0);
523 : : }
524 : :
525 [ - + ][ # # ]: 1 : delete io;
[ # # ]
526 [ + - ][ # # ]: 1 : }
[ # # ]
527 : :
528 : 0 : void RemoteSerializer::Init()
529 : : {
530 [ # # ]: 0 : if ( initialized )
531 : 0 : return;
532 : :
533 [ # # ][ # # ]: 0 : if ( reading_traces && ! pseudo_realtime )
534 : : {
535 : 0 : using_communication = 0;
536 : 0 : return;
537 : : }
538 : :
539 : 0 : Fork();
540 : :
541 : 0 : io_sources.Register(this);
542 : :
543 : 0 : Log(LogInfo, fmt("communication started, parent pid is %d, child pid is %d", getpid(), child_pid));
544 : 0 : initialized = 1;
545 : : }
546 : :
547 : 0 : void RemoteSerializer::Fork()
548 : : {
549 [ # # ]: 0 : if ( child_pid )
550 : 0 : return;
551 : :
552 : : // If we are re-forking, remove old entries
553 [ # # ]: 0 : loop_over_list(peers, i)
554 : 0 : RemovePeer(peers[i]);
555 : :
556 : : // Create pipe for communication between parent and child.
557 : : int pipe[2];
558 : :
559 [ # # ]: 0 : if ( socketpair(AF_UNIX, SOCK_STREAM, 0, pipe) < 0 )
560 : : {
561 : 0 : Error(fmt("can't create pipe: %s", strerror(errno)));
562 : 0 : return;
563 : : }
564 : :
565 : : int bufsize;
566 : 0 : socklen_t len = sizeof(bufsize);
567 : :
568 [ # # ]: 0 : if ( getsockopt(pipe[0], SOL_SOCKET, SO_SNDBUF, &bufsize, &len ) < 0 )
569 : 0 : Log(LogInfo, fmt("warning: cannot get socket buffer size: %s", strerror(errno)));
570 : : else
571 : 0 : Log(LogInfo, fmt("pipe's socket buffer size is %d, setting to %d", bufsize, SOCKBUF_SIZE));
572 : :
573 : 0 : bufsize = SOCKBUF_SIZE;
574 : :
575 [ # # ][ # # ]: 0 : if ( setsockopt(pipe[0], SOL_SOCKET, SO_SNDBUF,
[ # # ][ # # ]
[ # # ]
576 : : &bufsize, sizeof(bufsize) ) < 0 ||
577 : : setsockopt(pipe[0], SOL_SOCKET, SO_RCVBUF,
578 : : &bufsize, sizeof(bufsize) ) < 0 ||
579 : : setsockopt(pipe[1], SOL_SOCKET, SO_SNDBUF,
580 : : &bufsize, sizeof(bufsize) ) < 0 ||
581 : : setsockopt(pipe[1], SOL_SOCKET, SO_RCVBUF,
582 : : &bufsize, sizeof(bufsize) ) < 0 )
583 : 0 : Log(LogInfo, fmt("warning: cannot set socket buffer size to %dK: %s", bufsize / 1024, strerror(errno)));
584 : :
585 : 0 : child_pid = 0;
586 : :
587 : 0 : int pid = fork();
588 : :
589 [ # # ]: 0 : if ( pid < 0 )
590 : : {
591 : 0 : Error(fmt("can't fork: %s", strerror(errno)));
592 : 0 : return;
593 : : }
594 : :
595 [ # # ]: 0 : if ( pid > 0 )
596 : : {
597 : : // Parent
598 : 0 : child_pid = pid;
599 : :
600 : 0 : io = new ChunkedIOFd(pipe[0], "parent->child", child_pid);
601 [ # # ]: 0 : if ( ! io->Init() )
602 : : {
603 : 0 : Error(fmt("can't init child io: %s", io->Error()));
604 : 0 : exit(1); // FIXME: Better way to handle this?
605 : : }
606 : :
607 : 0 : close(pipe[1]);
608 : :
609 : 0 : return;
610 : : }
611 : : else
612 : : { // child
613 : 0 : SocketComm child;
614 : :
615 : : ChunkedIOFd* io =
616 : 0 : new ChunkedIOFd(pipe[1], "child->parent", getppid());
617 [ # # ]: 0 : if ( ! io->Init() )
618 : : {
619 : 0 : Error(fmt("can't init parent io: %s", io->Error()));
620 : 0 : exit(1);
621 : : }
622 : :
623 : 0 : child.SetParentIO(io);
624 : 0 : close(pipe[0]);
625 : :
626 : : // Close file descriptors.
627 : 0 : close(0);
628 : 0 : close(1);
629 : 0 : close(2);
630 : :
631 : : // Be nice.
632 : 0 : setpriority(PRIO_PROCESS, 0, 5);
633 : :
634 : 0 : child.Run();
635 : 0 : internal_error("cannot be reached");
636 : : }
637 : : }
638 : :
639 : : RemoteSerializer::PeerID RemoteSerializer::Connect(addr_type ip, uint16 port,
640 : 0 : const char* our_class, double retry, bool use_ssl)
641 : : {
642 [ # # ]: 0 : if ( ! using_communication )
643 : 0 : return true;
644 : :
645 [ # # ]: 0 : if ( ! initialized )
646 : 0 : internal_error("remote serializer not initialized");
647 : :
648 : : #ifdef BROv6
649 : : if ( ! is_v4_addr(ip) )
650 : : Error("inter-Bro communication not supported over IPv6");
651 : :
652 : : uint32 ip4 = to_v4_addr(ip);
653 : : #else
654 : 0 : uint32 ip4 = ip;
655 : : #endif
656 : :
657 : 0 : ip4 = ntohl(ip4);
658 : :
659 [ # # ]: 0 : if ( ! child_pid )
660 : 0 : Fork();
661 : :
662 : 0 : Peer* p = AddPeer(ip4, port);
663 : 0 : p->orig = true;
664 : :
665 [ # # ]: 0 : if ( our_class )
666 : 0 : p->our_class = our_class;
667 : :
668 [ # # ]: 0 : if ( ! SendToChild(MSG_CONNECT_TO, p, 5, p->id,
669 : : ip4, port, uint32(retry), use_ssl) )
670 : : {
671 : 0 : RemovePeer(p);
672 : 0 : return false;
673 : : }
674 : :
675 : 0 : p->state = Peer::PENDING;
676 : 0 : return p->id;
677 : : }
678 : :
679 : 0 : bool RemoteSerializer::CloseConnection(Peer* peer)
680 : : {
681 [ # # ]: 0 : if ( peer->suspended_processing )
682 : : {
683 : 0 : net_continue_processing();
684 : 0 : peer->suspended_processing = false;
685 : : }
686 : :
687 [ # # ]: 0 : if ( peer->state == Peer::CLOSING )
688 : 0 : return true;
689 : :
690 : 0 : FlushPrintBuffer(peer);
691 : :
692 : 0 : Log(LogInfo, "closing connection", peer);
693 : :
694 : 0 : peer->state = Peer::CLOSING;
695 : 0 : return SendToChild(MSG_CLOSE, peer, 0);
696 : : }
697 : :
698 : 0 : bool RemoteSerializer::RequestSync(PeerID id, bool auth)
699 : : {
700 [ # # ]: 0 : if ( ! using_communication )
701 : 0 : return true;
702 : :
703 : 0 : Peer* peer = LookupPeer(id, true);
704 [ # # ]: 0 : if ( ! peer )
705 : : {
706 : 0 : run_time(fmt("unknown peer id %d for request sync", int(id)));
707 : 0 : return false;
708 : : }
709 : :
710 [ # # ]: 0 : if ( peer->phase != Peer::HANDSHAKE )
711 : : {
712 : : run_time(fmt("can't request sync from peer; wrong phase %d",
713 : 0 : peer->phase));
714 : 0 : return false;
715 : : }
716 : :
717 [ # # ][ # # ]: 0 : if ( ! SendToChild(MSG_REQUEST_SYNC, peer, 1, auth ? 1 : 0) )
718 : 0 : return false;
719 : :
720 [ # # ]: 0 : peer->sync_requested |= Peer::WE | (auth ? Peer::AUTH_WE : 0);
721 : :
722 : 0 : return true;
723 : : }
724 : :
725 : 0 : bool RemoteSerializer::RequestEvents(PeerID id, RE_Matcher* pattern)
726 : : {
727 [ # # ]: 0 : if ( ! using_communication )
728 : 0 : return true;
729 : :
730 : 0 : Peer* peer = LookupPeer(id, true);
731 [ # # ]: 0 : if ( ! peer )
732 : : {
733 : 0 : run_time(fmt("unknown peer id %d for request sync", int(id)));
734 : 0 : return false;
735 : : }
736 : :
737 [ # # ]: 0 : if ( peer->phase != Peer::HANDSHAKE )
738 : : {
739 : : run_time(fmt("can't request events from peer; wrong phase %d",
740 : 0 : peer->phase));
741 : 0 : return false;
742 : : }
743 : :
744 : 0 : EventRegistry::string_list* handlers = event_registry->Match(pattern);
745 : :
746 : : // Concat the handlers' names.
747 : 0 : int len = 0;
748 [ # # ]: 0 : loop_over_list(*handlers, i)
749 : 0 : len += strlen((*handlers)[i]) + 1;
750 : :
751 [ # # ]: 0 : if ( ! len )
752 : : {
753 : 0 : Log(LogInfo, "warning: no events to request");
754 [ # # ]: 0 : delete handlers;
755 : 0 : return true;
756 : : }
757 : :
758 : 0 : char* data = new char[len];
759 : 0 : char* d = data;
760 [ # # ]: 0 : loop_over_list(*handlers, j)
761 : : {
762 [ # # ]: 0 : for ( const char* p = (*handlers)[j]; *p; *d++ = *p++ )
763 : : ;
764 : 0 : *d++ = '\0';
765 : : }
766 : :
767 [ # # ]: 0 : delete handlers;
768 : :
769 : 0 : return SendToChild(MSG_REQUEST_EVENTS, peer, data, len);
770 : : }
771 : :
772 : 0 : bool RemoteSerializer::SetAcceptState(PeerID id, bool accept)
773 : : {
774 : 0 : Peer* p = LookupPeer(id, false);
775 [ # # ]: 0 : if ( ! p )
776 : 0 : return true;
777 : :
778 : 0 : p->accept_state = accept;
779 : 0 : return true;
780 : : }
781 : :
782 : 0 : bool RemoteSerializer::SetCompressionLevel(PeerID id, int level)
783 : : {
784 : 0 : Peer* p = LookupPeer(id, false);
785 [ # # ]: 0 : if ( ! p )
786 : 0 : return true;
787 : :
788 : 0 : p->comp_level = level;
789 : 0 : return true;
790 : : }
791 : :
792 : 0 : bool RemoteSerializer::CompleteHandshake(PeerID id)
793 : : {
794 : 0 : Peer* p = LookupPeer(id, false);
795 [ # # ]: 0 : if ( ! p )
796 : 0 : return true;
797 : :
798 [ # # ]: 0 : if ( p->phase != Peer::HANDSHAKE )
799 : : {
800 : : run_time(fmt("can't complete handshake; wrong phase %d",
801 : 0 : p->phase));
802 : 0 : return false;
803 : : }
804 : :
805 : 0 : p->handshake_done |= Peer::WE;
806 : :
807 [ # # ]: 0 : if ( ! SendToChild(MSG_PHASE_DONE, p, 0) )
808 : 0 : return false;
809 : :
810 [ # # ]: 0 : if ( p->handshake_done == Peer::BOTH )
811 : 0 : HandshakeDone(p);
812 : :
813 : 0 : return true;
814 : : }
815 : :
816 : : bool RemoteSerializer::SendCall(SerialInfo* info, PeerID id,
817 : 0 : const char* name, val_list* vl)
818 : : {
819 [ # # ][ # # ]: 0 : if ( ! using_communication || terminating )
820 : 0 : return true;
821 : :
822 : 0 : Peer* peer = LookupPeer(id, true);
823 [ # # ]: 0 : if ( ! peer )
824 : 0 : return false;
825 : :
826 : : // Do not send events back to originating peer.
827 [ # # ]: 0 : if ( current_peer == peer )
828 : 0 : return true;
829 : :
830 : 0 : return SendCall(info, peer, name, vl);
831 : : }
832 : :
833 : :
834 : : bool RemoteSerializer::SendCall(SerialInfo* info, Peer* peer,
835 : 0 : const char* name, val_list* vl)
836 : : {
837 [ # # ][ # # ]: 0 : if ( peer->phase != Peer::RUNNING || terminating )
838 : 0 : return false;
839 : :
840 : 0 : ++stats.events.out;
841 : 0 : SetCache(peer->cache_out);
842 : 0 : SetupSerialInfo(info, peer);
843 : :
844 [ # # ]: 0 : if ( ! Serialize(info, name, vl) )
845 : : {
846 : 0 : FatalError(io->Error());
847 : 0 : return false;
848 : : }
849 : :
850 : 0 : return true;
851 : : }
852 : :
853 : : bool RemoteSerializer::SendCall(SerialInfo* info, const char* name,
854 : 0 : val_list* vl)
855 : : {
856 [ # # ][ # # ]: 0 : if ( ! IsOpen() || ! PropagateAccesses() || terminating )
[ # # ][ # # ]
857 : 0 : return true;
858 : :
859 [ # # ][ # # ]: 0 : loop_over_list(peers, i)
860 : : {
861 : : // Do not send event back to originating peer.
862 [ # # ]: 0 : if ( peers[i] == current_peer )
863 : 0 : continue;
864 : :
865 : 0 : SerialInfo new_info(*info);
866 [ # # ]: 0 : if ( ! SendCall(&new_info, peers[i], name, vl) )
867 : 0 : return false;
868 : : }
869 : :
870 : 0 : return true;
871 : : }
872 : :
873 : : bool RemoteSerializer::SendAccess(SerialInfo* info, Peer* peer,
874 : 0 : const StateAccess& access)
875 : : {
876 [ # # ][ # # ]: 0 : if ( ! (peer->sync_requested & Peer::PEER) || terminating )
877 : 0 : return true;
878 : :
879 : : #ifdef DEBUG
880 : 0 : ODesc desc;
881 : 0 : access.Describe(&desc);
882 : 0 : DBG_LOG(DBG_COMM, "Sending %s", desc.Description());
883 : : #endif
884 : :
885 : 0 : ++stats.accesses.out;
886 : 0 : SetCache(peer->cache_out);
887 : 0 : SetupSerialInfo(info, peer);
888 : 0 : info->globals_as_names = true;
889 : :
890 [ # # ]: 0 : if ( ! Serialize(info, access) )
891 : : {
892 : 0 : FatalError(io->Error());
893 : 0 : return false;
894 : : }
895 : :
896 : 0 : return true;
897 : : }
898 : :
899 : : bool RemoteSerializer::SendAccess(SerialInfo* info, PeerID pid,
900 : 0 : const StateAccess& access)
901 : : {
902 : 0 : Peer* p = LookupPeer(pid, false);
903 [ # # ]: 0 : if ( ! p )
904 : 0 : return true;
905 : :
906 : 0 : return SendAccess(info, p, access);
907 : : }
908 : :
909 : 2 : bool RemoteSerializer::SendAccess(SerialInfo* info, const StateAccess& access)
910 : : {
911 [ + - ][ + - ]: 2 : if ( ! IsOpen() || ! PropagateAccesses() || terminating )
[ - + ][ - + ]
912 : 0 : return true;
913 : :
914 : : // A real broadcast would be nice here. But the different peers have
915 : : // different serialization caches, so we cannot simply send the same
916 : : // serialization to all of them ...
917 [ # # ][ - + ]: 2 : loop_over_list(peers, i)
918 : : {
919 : : // Do not send access back to originating peer.
920 [ # # ]: 0 : if ( peers[i] == source_peer )
921 : 0 : continue;
922 : :
923 : : // Only sent accesses for fully setup peers.
924 [ # # ]: 0 : if ( peers[i]->phase != Peer::RUNNING )
925 : 0 : continue;
926 : :
927 : 0 : SerialInfo new_info(*info);
928 [ # # ]: 0 : if ( ! SendAccess(&new_info, peers[i], access) )
929 : 0 : return false;
930 : : }
931 : :
932 : 2 : return true;
933 : : }
934 : :
935 : 0 : bool RemoteSerializer::SendAllSynchronized(Peer* peer, SerialInfo* info)
936 : : {
937 : : // FIXME: When suspending ID serialization works, remove!
938 : 0 : DisableSuspend suspend(info);
939 : :
940 : 0 : current_peer = peer;
941 : :
942 : 0 : Continuation* cont = &info->cont;
943 : : ptr_compat_int index;
944 : :
945 [ # # ]: 0 : if ( info->cont.NewInstance() )
946 : : {
947 : 0 : Log(LogInfo, "starting to send full state", peer);
948 : 0 : index = 0;
949 : : }
950 : :
951 : : else
952 : : {
953 : 0 : index = int(ptr_compat_int(cont->RestoreState()));
954 [ # # ]: 0 : if ( ! cont->ChildSuspended() )
955 : 0 : cont->Resume();
956 : : }
957 : :
958 [ # # ][ # # ]: 0 : for ( ; index < sync_ids.length(); ++index )
959 : : {
960 : 0 : cont->SaveContext();
961 : :
962 : : StateAccess sa(OP_ASSIGN, sync_ids[index],
963 : 0 : sync_ids[index]->ID_Val());
964 : : // FIXME: When suspending ID serialization works, we need to
965 : : // addsupport to StateAccesses, too.
966 : 0 : bool result = SendAccess(info, peer, sa);
967 : 0 : cont->RestoreContext();
968 : :
969 [ # # ]: 0 : if ( ! result )
970 : 0 : return false;
971 : :
972 [ # # ][ # # ]: 0 : if ( cont->ChildSuspended() || info->may_suspend )
[ # # ]
973 : : {
974 : 0 : double t = network_time + state_write_delay;
975 : 0 : timer_mgr->Add(new IncrementalSendTimer(t, peer, info));
976 : :
977 : 0 : cont->SaveState((void*) index);
978 [ # # ]: 0 : if ( info->may_suspend )
979 : 0 : cont->Suspend();
980 : :
981 : 0 : return true;
982 : : }
983 : : }
984 : :
985 [ # # ]: 0 : if ( ! SendToChild(MSG_PHASE_DONE, peer, 0) )
986 : 0 : return false;
987 : :
988 : 0 : suspend.Release();
989 [ # # ]: 0 : delete info;
990 : :
991 : 0 : Log(LogInfo, "done sending full state", peer);
992 : :
993 : 0 : return EnterPhaseRunning(peer);
994 : : }
995 : :
996 : 0 : bool RemoteSerializer::SendID(SerialInfo* info, Peer* peer, const ID& id)
997 : : {
998 [ # # ]: 0 : if ( terminating )
999 : 0 : return true;
1000 : :
1001 : : // FIXME: When suspending ID serialization works, remove!
1002 : 0 : DisableSuspend suspend(info);
1003 : :
1004 [ # # ]: 0 : if ( info->cont.NewInstance() )
1005 : 0 : ++stats.ids.out;
1006 : :
1007 : 0 : SetCache(peer->cache_out);
1008 : 0 : SetupSerialInfo(info, peer);
1009 : 0 : info->cont.SaveContext();
1010 : 0 : bool result = Serialize(info, id);
1011 : 0 : info->cont.RestoreContext();
1012 : :
1013 [ # # ]: 0 : if ( ! result )
1014 : : {
1015 : 0 : FatalError(io->Error());
1016 : 0 : return false;
1017 : : }
1018 : :
1019 : 0 : return true;
1020 : : }
1021 : :
1022 : 0 : bool RemoteSerializer::SendID(SerialInfo* info, PeerID pid, const ID& id)
1023 : : {
1024 [ # # ][ # # ]: 0 : if ( ! using_communication || terminating )
1025 : 0 : return true;
1026 : :
1027 : 0 : Peer* peer = LookupPeer(pid, true);
1028 [ # # ]: 0 : if ( ! peer )
1029 : 0 : return false;
1030 : :
1031 [ # # ]: 0 : if ( peer->phase != Peer::RUNNING )
1032 : 0 : return false;
1033 : :
1034 : 0 : return SendID(info, peer, id);
1035 : : }
1036 : :
1037 : : bool RemoteSerializer::SendConnection(SerialInfo* info, PeerID id,
1038 : 0 : const Connection& c)
1039 : : {
1040 [ # # ][ # # ]: 0 : if ( ! using_communication || terminating )
1041 : 0 : return true;
1042 : :
1043 : 0 : Peer* peer = LookupPeer(id, true);
1044 [ # # ]: 0 : if ( ! peer )
1045 : 0 : return false;
1046 : :
1047 [ # # ]: 0 : if ( peer->phase != Peer::RUNNING )
1048 : 0 : return false;
1049 : :
1050 : 0 : ++stats.conns.out;
1051 : 0 : SetCache(peer->cache_out);
1052 : 0 : SetupSerialInfo(info, peer);
1053 : :
1054 [ # # ]: 0 : if ( ! Serialize(info, c) )
1055 : : {
1056 : 0 : FatalError(io->Error());
1057 : 0 : return false;
1058 : : }
1059 : :
1060 : 0 : return true;
1061 : : }
1062 : :
1063 : 0 : bool RemoteSerializer::SendCaptureFilter(PeerID id, const char* filter)
1064 : : {
1065 [ # # ][ # # ]: 0 : if ( ! using_communication || terminating )
1066 : 0 : return true;
1067 : :
1068 : 0 : Peer* peer = LookupPeer(id, true);
1069 [ # # ]: 0 : if ( ! peer )
1070 : 0 : return false;
1071 : :
1072 [ # # ]: 0 : if ( peer->phase != Peer::HANDSHAKE )
1073 : : {
1074 : 0 : run_time(fmt("can't sent capture filter to peer; wrong phase %d", peer->phase));
1075 : 0 : return false;
1076 : : }
1077 : :
1078 : 0 : return SendToChild(MSG_CAPTURE_FILTER, peer, copy_string(filter));
1079 : : }
1080 : :
1081 : 0 : bool RemoteSerializer::SendPacket(SerialInfo* info, const Packet& p)
1082 : : {
1083 [ # # ][ # # ]: 0 : if ( ! IsOpen() || !PropagateAccesses() || terminating )
[ # # ][ # # ]
1084 : 0 : return true;
1085 : :
1086 [ # # ][ # # ]: 0 : loop_over_list(peers, i)
1087 : : {
1088 : : // Only sent packet for fully setup peers.
1089 [ # # ]: 0 : if ( peers[i]->phase != Peer::RUNNING )
1090 : 0 : continue;
1091 : :
1092 : 0 : SerialInfo new_info(*info);
1093 [ # # ]: 0 : if ( ! SendPacket(&new_info, peers[i], p) )
1094 : 0 : return false;
1095 : : }
1096 : :
1097 : 0 : return true;
1098 : : }
1099 : :
1100 : 0 : bool RemoteSerializer::SendPacket(SerialInfo* info, PeerID id, const Packet& p)
1101 : : {
1102 [ # # ][ # # ]: 0 : if ( ! using_communication || terminating )
1103 : 0 : return true;
1104 : :
1105 : 0 : Peer* peer = LookupPeer(id, true);
1106 [ # # ]: 0 : if ( ! peer )
1107 : 0 : return false;
1108 : :
1109 : 0 : return SendPacket(info, peer, p);
1110 : : }
1111 : :
1112 : 0 : bool RemoteSerializer::SendPacket(SerialInfo* info, Peer* peer, const Packet& p)
1113 : : {
1114 : 0 : ++stats.packets.out;
1115 : 0 : SetCache(peer->cache_out);
1116 : 0 : SetupSerialInfo(info, peer);
1117 : :
1118 [ # # ]: 0 : if ( ! Serialize(info, p) )
1119 : : {
1120 : 0 : FatalError(io->Error());
1121 : 0 : return false;
1122 : : }
1123 : :
1124 : 0 : return true;
1125 : : }
1126 : :
1127 : 0 : bool RemoteSerializer::SendPing(PeerID id, uint32 seq)
1128 : : {
1129 [ # # ][ # # ]: 0 : if ( ! using_communication || terminating )
1130 : 0 : return true;
1131 : :
1132 : 0 : Peer* peer = LookupPeer(id, true);
1133 [ # # ]: 0 : if ( ! peer )
1134 : 0 : return false;
1135 : :
1136 : 0 : char* data = new char[sizeof(ping_args)];
1137 : :
1138 : 0 : ping_args* args = (ping_args*) data;
1139 : 0 : args->seq = htonl(seq);
1140 : 0 : args->time1 = htond(current_time(true));
1141 : 0 : args->time2 = 0;
1142 : 0 : args->time3 = 0;
1143 : :
1144 : 0 : return SendToChild(MSG_PING, peer, data, sizeof(ping_args));
1145 : : }
1146 : :
1147 : 0 : bool RemoteSerializer::SendCapabilities(Peer* peer)
1148 : : {
1149 [ # # ]: 0 : if ( peer->phase != Peer::HANDSHAKE )
1150 : : {
1151 : : run_time(fmt("can't sent capabilties to peer; wrong phase %d",
1152 : 0 : peer->phase));
1153 : 0 : return false;
1154 : : }
1155 : :
1156 : 0 : uint32 caps = 0;
1157 : :
1158 : : #ifdef HAVE_LIBZ
1159 : 0 : caps |= Peer::COMPRESSION;
1160 : : #endif
1161 : :
1162 : 0 : caps |= Peer::PID_64BIT;
1163 : 0 : caps |= Peer::NEW_CACHE_STRATEGY;
1164 : :
1165 [ # # ]: 0 : return caps ? SendToChild(MSG_CAPS, peer, 3, caps, 0, 0) : true;
1166 : : }
1167 : :
1168 : 0 : bool RemoteSerializer::Listen(addr_type ip, uint16 port, bool expect_ssl)
1169 : : {
1170 [ # # ]: 0 : if ( ! using_communication )
1171 : 0 : return true;
1172 : :
1173 [ # # ]: 0 : if ( ! initialized )
1174 : 0 : internal_error("remote serializer not initialized");
1175 : :
1176 : : #ifdef BROv6
1177 : : if ( ! is_v4_addr(ip) )
1178 : : Error("inter-Bro communication not supported over IPv6");
1179 : :
1180 : : uint32 ip4 = to_v4_addr(ip);
1181 : : #else
1182 : 0 : uint32 ip4 = ip;
1183 : : #endif
1184 : :
1185 : 0 : ip4 = ntohl(ip4);
1186 : :
1187 [ # # ]: 0 : if ( ! SendToChild(MSG_LISTEN, 0, 3, ip4, port, expect_ssl) )
1188 : 0 : return false;
1189 : :
1190 : 0 : listening = true;
1191 : 0 : closed = false;
1192 : 0 : return true;
1193 : : }
1194 : :
1195 : 0 : void RemoteSerializer::SendSyncPoint(uint32 point)
1196 : : {
1197 [ # # ][ # # ]: 0 : if ( ! (remote_trace_sync_interval && pseudo_realtime) || terminating )
[ # # ]
1198 : 0 : return;
1199 : :
1200 : 0 : current_sync_point = point;
1201 : :
1202 [ # # ]: 0 : loop_over_list(peers, i)
1203 [ # # ][ # # ]: 0 : if ( peers[i]->phase == Peer::RUNNING &&
[ # # ]
1204 : : ! SendToChild(MSG_SYNC_POINT, peers[i],
1205 : : 1, current_sync_point) )
1206 : 0 : return;
1207 : :
1208 [ # # ]: 0 : if ( ! syncing_times )
1209 : : {
1210 : 0 : Log(LogInfo, "waiting for peers");
1211 : 0 : syncing_times = true;
1212 : :
1213 [ # # ]: 0 : loop_over_list(peers, i)
1214 : : {
1215 : : // Need to do this once per peer to correctly
1216 : : // track the number of suspend calls.
1217 : 0 : net_suspend_processing();
1218 : 0 : peers[i]->suspended_processing = true;
1219 : : }
1220 : : }
1221 : :
1222 : 0 : CheckSyncPoints();
1223 : : }
1224 : :
1225 : 0 : uint32 RemoteSerializer::SendSyncPoint()
1226 : : {
1227 : 0 : Log(LogInfo, fmt("reached sync-point %u", current_sync_point));
1228 : 0 : SendSyncPoint(current_sync_point + 1);
1229 : 0 : return current_sync_point;
1230 : : }
1231 : :
1232 : 0 : void RemoteSerializer::SendFinalSyncPoint()
1233 : : {
1234 : 0 : Log(LogInfo, fmt("reached end of trace, sending final sync point"));
1235 : 0 : SendSyncPoint(FINAL_SYNC_POINT);
1236 : 0 : }
1237 : :
1238 : 0 : bool RemoteSerializer::Terminate()
1239 : : {
1240 : 0 : Log(LogInfo, fmt("terminating..."));
1241 : 0 : return terminating = SendToChild(MSG_TERMINATE, 0, 0);
1242 : : }
1243 : :
1244 : 1 : bool RemoteSerializer::StopListening()
1245 : : {
1246 [ + - ]: 1 : if ( ! listening )
1247 : 1 : return true;
1248 : :
1249 [ # # ]: 0 : if ( ! SendToChild(MSG_LISTEN_STOP, 0, 0) )
1250 : 0 : return false;
1251 : :
1252 : 0 : listening = false;
1253 : 0 : closed = ! IsActive();
1254 : 1 : return true;
1255 : : }
1256 : :
1257 : 4 : void RemoteSerializer::Register(ID* id)
1258 : : {
1259 : 4 : DBG_LOG(DBG_STATE, "&synchronized %s", id->Name());
1260 : 4 : Unregister(id);
1261 : 4 : Ref(id);
1262 : 4 : sync_ids.append(id);
1263 : 4 : }
1264 : :
1265 : 4 : void RemoteSerializer::Unregister(ID* id)
1266 : : {
1267 [ + + ]: 6 : loop_over_list(sync_ids, i)
1268 [ + + ]: 4 : if ( streq(sync_ids[i]->Name(), id->Name()) )
1269 : : {
1270 : 2 : Unref(sync_ids[i]);
1271 : 2 : sync_ids.remove_nth(i);
1272 : 2 : break;
1273 : : }
1274 : 4 : }
1275 : :
1276 : 0 : void RemoteSerializer::GetFds(int* read, int* write, int* except)
1277 : : {
1278 : 0 : *read = io->Fd();
1279 : :
1280 [ # # ]: 0 : if ( io->CanWrite() )
1281 : 0 : *write = io->Fd();
1282 : 0 : }
1283 : :
1284 : 0 : double RemoteSerializer::NextTimestamp(double* local_network_time)
1285 : : {
1286 : 0 : Poll(false);
1287 : :
1288 [ # # ]: 0 : double et = events.length() ? events[0]->time : -1;
1289 [ # # ]: 0 : double pt = packets.length() ? packets[0]->time : -1;
1290 : :
1291 [ # # ]: 0 : if ( ! et )
1292 : 0 : et = timer_mgr->Time();
1293 : :
1294 [ # # ]: 0 : if ( ! pt )
1295 : 0 : pt = timer_mgr->Time();
1296 : :
1297 [ # # ]: 0 : if ( packets.length() )
1298 : 0 : idle = false;
1299 : :
1300 [ # # ][ # # ]: 0 : if ( et >= 0 && (et < pt || pt < 0) )
[ # # ]
1301 : 0 : return et;
1302 : :
1303 [ # # ]: 0 : if ( pt >= 0 )
1304 : : {
1305 : : // Return packet time as network time.
1306 : 0 : *local_network_time = packets[0]->p->time;
1307 : 0 : return pt;
1308 : : }
1309 : :
1310 : 0 : return -1;
1311 : : }
1312 : :
1313 : 0 : TimerMgr::Tag* RemoteSerializer::GetCurrentTag()
1314 : : {
1315 [ # # ]: 0 : return packets.length() ? &packets[0]->p->tag : 0;
1316 : : }
1317 : :
1318 : 0 : void RemoteSerializer::Process()
1319 : : {
1320 : 0 : Poll(false);
1321 : :
1322 : 0 : int i = 0;
1323 [ # # ]: 0 : while ( events.length() )
1324 : : {
1325 [ # # ][ # # ]: 0 : if ( max_remote_events_processed &&
[ # # ]
1326 : : ++i > max_remote_events_processed )
1327 : 0 : break;
1328 : :
1329 : 0 : BufferedEvent* be = events[0];
1330 : 0 : ::Event* event = new ::Event(be->handler, be->args, be->src);
1331 : :
1332 : 0 : Peer* old_current_peer = current_peer;
1333 : : // Prevent the source peer from getting the event back.
1334 : 0 : current_peer = LookupPeer(be->src, true); // may be null.
1335 : 0 : mgr.Dispatch(event, ! forward_remote_events);
1336 : 0 : current_peer = old_current_peer;
1337 : :
1338 [ # # ]: 0 : assert(events[0] == be);
1339 : 0 : delete be;
1340 : 0 : events.remove_nth(0);
1341 : : }
1342 : :
1343 : : // We shouldn't pass along more than one packet, as otherwise the
1344 : : // timer mgr will not advance.
1345 [ # # ]: 0 : if ( packets.length() )
1346 : : {
1347 : 0 : BufferedPacket* bp = packets[0];
1348 : 0 : Packet* p = bp->p;
1349 : :
1350 : : // FIXME: The following chunk of code is copied from
1351 : : // net_packet_dispatch(). We should change that function
1352 : : // to accept an IOSource instead of the PktSrc.
1353 : 0 : network_time = p->time;
1354 : :
1355 : 0 : SegmentProfiler(segment_logger, "expiring-timers");
1356 : 0 : TimerMgr* tmgr = sessions->LookupTimerMgr(GetCurrentTag());
1357 : : current_dispatched =
1358 : 0 : tmgr->Advance(network_time, max_timer_expires);
1359 : :
1360 : 0 : current_hdr = p->hdr;
1361 : 0 : current_pkt = p->pkt;
1362 : 0 : current_pktsrc = 0;
1363 : 0 : current_iosrc = this;
1364 : 0 : sessions->NextPacket(p->time, p->hdr, p->pkt, p->hdr_size, 0);
1365 : 0 : mgr.Drain();
1366 : :
1367 : 0 : current_hdr = 0; // done with these
1368 : 0 : current_pkt = 0;
1369 : 0 : current_iosrc = 0;
1370 : :
1371 [ # # ]: 0 : delete p;
1372 : 0 : delete bp;
1373 : 0 : packets.remove_nth(0);
1374 : : }
1375 : :
1376 [ # # ]: 0 : if ( packets.length() )
1377 : 0 : idle = false;
1378 : 0 : }
1379 : :
1380 : 1 : void RemoteSerializer::Finish()
1381 : : {
1382 [ + - ]: 1 : if ( ! using_communication )
1383 : 1 : return;
1384 : :
1385 [ # # ]: 0 : do
1386 : 0 : Poll(true);
1387 : : while ( io->CanWrite() );
1388 : :
1389 [ # # ]: 1 : loop_over_list(peers, i)
1390 : 0 : CloseConnection(peers[i]);
1391 : : }
1392 : :
1393 : 0 : bool RemoteSerializer::Poll(bool may_block)
1394 : : {
1395 [ # # ]: 0 : if ( ! child_pid )
1396 : 0 : return true;
1397 : :
1398 : : // See if there's any peer waiting for initial state synchronization.
1399 [ # # ][ # # ]: 0 : if ( sync_pending.length() && ! in_sync )
[ # # ]
1400 : : {
1401 : 0 : Peer* p = sync_pending[0];
1402 : 0 : sync_pending.remove_nth(0);
1403 : 0 : HandshakeDone(p);
1404 : : }
1405 : :
1406 : 0 : io->Flush();
1407 : 0 : idle = false;
1408 : :
1409 [ # # # ]: 0 : switch ( msgstate ) {
1410 : : case TYPE:
1411 : : {
1412 : 0 : current_peer = 0;
1413 : 0 : current_msgtype = MSG_NONE;
1414 : :
1415 : : // CMsg follows
1416 : : ChunkedIO::Chunk* c;
1417 [ # # ][ # # ]: 0 : READ_CHUNK_FROM_CHILD(c);
[ # # ]
1418 : :
1419 : 0 : CMsg* msg = (CMsg*) c->data;
1420 : 0 : current_peer = LookupPeer(msg->Peer(), false);
1421 : 0 : current_id = msg->Peer();
1422 : 0 : current_msgtype = msg->Type();
1423 : 0 : current_args = 0;
1424 : :
1425 [ # # ]: 0 : delete [] c->data;
1426 : 0 : delete c;
1427 : :
1428 [ # # # # ]: 0 : switch ( current_msgtype ) {
1429 : : case MSG_CLOSE:
1430 : : case MSG_CLOSE_ALL:
1431 : : case MSG_LISTEN_STOP:
1432 : : case MSG_PHASE_DONE:
1433 : : case MSG_TERMINATE:
1434 : : case MSG_DEBUG_DUMP:
1435 : : {
1436 : : // No further argument chunk.
1437 : 0 : msgstate = TYPE;
1438 : 0 : return DoMessage();
1439 : : }
1440 : : case MSG_VERSION:
1441 : : case MSG_SERIAL:
1442 : : case MSG_ERROR:
1443 : : case MSG_CONNECT_TO:
1444 : : case MSG_CONNECTED:
1445 : : case MSG_REQUEST_EVENTS:
1446 : : case MSG_REQUEST_SYNC:
1447 : : case MSG_LISTEN:
1448 : : case MSG_STATS:
1449 : : case MSG_CAPTURE_FILTER:
1450 : : case MSG_PING:
1451 : : case MSG_PONG:
1452 : : case MSG_CAPS:
1453 : : case MSG_COMPRESS:
1454 : : case MSG_LOG:
1455 : : case MSG_SYNC_POINT:
1456 : : case MSG_REMOTE_PRINT:
1457 : : {
1458 : : // One further argument chunk.
1459 : 0 : msgstate = ARGS;
1460 : 0 : return Poll(may_block);
1461 : : }
1462 : :
1463 : : case MSG_NONE:
1464 : : InternalCommError(fmt("unexpected msg type %d",
1465 : 0 : current_msgtype));
1466 : 0 : return true;
1467 : :
1468 : : default:
1469 : : InternalCommError(fmt("unknown msg type %d in Poll()",
1470 : 0 : current_msgtype));
1471 : 0 : return true;
1472 : : }
1473 : : }
1474 : :
1475 : : case ARGS:
1476 : : {
1477 : : // Argument chunk follows.
1478 : : ChunkedIO::Chunk* c;
1479 [ # # ][ # # ]: 0 : READ_CHUNK_FROM_CHILD(c);
[ # # ]
1480 : :
1481 : 0 : current_args = c;
1482 : 0 : msgstate = TYPE;
1483 : 0 : bool result = DoMessage();
1484 : :
1485 [ # # ]: 0 : delete [] current_args->data;
1486 : 0 : delete current_args;
1487 : 0 : current_args = 0;
1488 : :
1489 : 0 : return result;
1490 : : }
1491 : :
1492 : : default:
1493 : 0 : internal_error("unknown msgstate");
1494 : : }
1495 : :
1496 : : internal_error("cannot be reached");
1497 : : }
1498 : :
1499 : 0 : bool RemoteSerializer::DoMessage()
1500 : : {
1501 [ # # ][ # # ]: 0 : if ( current_peer &&
[ # # ][ # # ]
[ # # ]
1502 : : (current_peer->state == Peer::CLOSING ||
1503 : : current_peer->state == Peer::CLOSED) &&
1504 : : is_peer_msg(current_msgtype) )
1505 : : {
1506 : : // We shut the connection to this peer down,
1507 : : // so we ignore all further messages.
1508 [ # # ]: 0 : DEBUG_COMM(fmt("parent: ignoring %s due to shutdown of peer #%d",
1509 : : msgToStr(current_msgtype),
1510 : : current_peer ? current_peer->id : 0));
1511 : 0 : return true;
1512 : : }
1513 : :
1514 [ # # ]: 0 : DEBUG_COMM(fmt("parent: %s from child; peer is #%d",
1515 : : msgToStr(current_msgtype),
1516 : : current_peer ? current_peer->id : 0));
1517 : :
1518 [ # # # # ]: 0 : if ( current_peer &&
[ # # ]
1519 : : (current_msgtype < 0 || current_msgtype > MSG_ID_MAX) )
1520 : : {
1521 : : Log(LogError, "garbage message from peer, shutting down",
1522 : 0 : current_peer);
1523 : 0 : CloseConnection(current_peer);
1524 : 0 : return true;
1525 : : }
1526 : :
1527 : : // As long as we haven't finished the version
1528 : : // handshake, no other messages than MSG_VERSION
1529 : : // are allowed from peer.
1530 [ # # ][ # # ]: 0 : if ( current_peer && current_peer->phase == Peer::SETUP &&
[ # # ][ # # ]
[ # # ]
1531 : : is_peer_msg(current_msgtype) && current_msgtype != MSG_VERSION )
1532 : : {
1533 : 0 : Log(LogError, "peer did not send version", current_peer);
1534 : 0 : CloseConnection(current_peer);
1535 : 0 : return true;
1536 : : }
1537 : :
1538 [ # # # # # : 0 : switch ( current_msgtype ) {
# # # # #
# # # # #
# # # ]
1539 : : case MSG_CLOSE:
1540 : 0 : PeerDisconnected(current_peer);
1541 : 0 : return true;
1542 : :
1543 : : case MSG_CONNECTED:
1544 : 0 : return ProcessConnected();
1545 : :
1546 : : case MSG_SERIAL:
1547 : 0 : return ProcessSerialization();
1548 : :
1549 : : case MSG_REQUEST_EVENTS:
1550 : 0 : return ProcessRequestEventsMsg();
1551 : :
1552 : : case MSG_REQUEST_SYNC:
1553 : 0 : return ProcessRequestSyncMsg();
1554 : :
1555 : : case MSG_PHASE_DONE:
1556 : 0 : return ProcessPhaseDone();
1557 : :
1558 : : case MSG_ERROR:
1559 : 0 : return ProcessLogMsg(true);
1560 : :
1561 : : case MSG_LOG:
1562 : 0 : return ProcessLogMsg(false);
1563 : :
1564 : : case MSG_STATS:
1565 : 0 : return ProcessStatsMsg();
1566 : :
1567 : : case MSG_CAPTURE_FILTER:
1568 : 0 : return ProcessCaptureFilterMsg();
1569 : :
1570 : : case MSG_VERSION:
1571 : 0 : return ProcessVersionMsg();
1572 : :
1573 : : case MSG_PING:
1574 : 0 : return ProcessPingMsg();
1575 : :
1576 : : case MSG_PONG:
1577 : 0 : return ProcessPongMsg();
1578 : :
1579 : : case MSG_CAPS:
1580 : 0 : return ProcessCapsMsg();
1581 : :
1582 : : case MSG_SYNC_POINT:
1583 : 0 : return ProcessSyncPointMsg();
1584 : :
1585 : : case MSG_TERMINATE:
1586 [ # # ]: 0 : assert(terminating);
1587 : 0 : io_sources.Terminate();
1588 : 0 : return true;
1589 : :
1590 : : case MSG_REMOTE_PRINT:
1591 : 0 : return ProcessRemotePrint();
1592 : :
1593 : : default:
1594 : 0 : DEBUG_COMM(fmt("unexpected msg type: %d",
1595 : : int(current_msgtype)));
1596 : : InternalCommError(fmt("unexpected msg type in DoMessage(): %d",
1597 : 0 : int(current_msgtype)));
1598 : 0 : return true; // keep going
1599 : : }
1600 : :
1601 : : internal_error("cannot be reached");
1602 : : return false;
1603 : : }
1604 : :
1605 : 0 : void RemoteSerializer::PeerDisconnected(Peer* peer)
1606 : : {
1607 [ # # ]: 0 : assert(peer);
1608 : :
1609 [ # # ]: 0 : if ( peer->suspended_processing )
1610 : : {
1611 : 0 : net_continue_processing();
1612 : 0 : peer->suspended_processing = false;
1613 : : }
1614 : :
1615 [ # # ][ # # ]: 0 : if ( peer->state == Peer::CLOSED || peer->state == Peer::INIT )
1616 : 0 : return;
1617 : :
1618 [ # # ]: 0 : if ( peer->state == Peer::PENDING )
1619 : : {
1620 : 0 : peer->state = Peer::CLOSED;
1621 : 0 : Log(LogError, "could not connect", peer);
1622 : 0 : return;
1623 : : }
1624 : :
1625 : 0 : Log(LogInfo, "peer disconnected", peer);
1626 : :
1627 [ # # ]: 0 : if ( peer->phase != Peer::SETUP )
1628 : 0 : RaiseEvent(remote_connection_closed, peer);
1629 : :
1630 [ # # ]: 0 : if ( in_sync == peer )
1631 : 0 : in_sync = 0;
1632 : :
1633 : 0 : peer->state = Peer::CLOSED;
1634 : 0 : peer->phase = Peer::UNKNOWN;
1635 : 0 : peer->cache_in->Clear();
1636 : 0 : peer->cache_out->Clear();
1637 : 0 : UnregisterHandlers(peer);
1638 : : }
1639 : :
1640 : 0 : void RemoteSerializer::PeerConnected(Peer* peer)
1641 : : {
1642 [ # # ]: 0 : if ( peer->state == Peer::CONNECTED )
1643 : 0 : return;
1644 : :
1645 : 0 : peer->state = Peer::CONNECTED;
1646 : 0 : peer->phase = Peer::SETUP;
1647 : 0 : peer->sent_version = Peer::NONE;
1648 : 0 : peer->sync_requested = Peer::NONE;
1649 : 0 : peer->handshake_done = Peer::NONE;
1650 : :
1651 : 0 : peer->cache_in->Clear();
1652 : 0 : peer->cache_out->Clear();
1653 : 0 : peer->our_runtime = int(current_time(true) - bro_start_time);
1654 : 0 : peer->sync_point = 0;
1655 : :
1656 [ # # ]: 0 : if ( ! SendCMsgToChild(MSG_VERSION, peer) )
1657 : 0 : return;
1658 : :
1659 : 0 : int len = 4 * sizeof(uint32) + peer->our_class.size() + 1;
1660 : 0 : char* data = new char[len];
1661 : 0 : uint32* args = (uint32*) data;
1662 : :
1663 : 0 : *args++ = htonl(PROTOCOL_VERSION);
1664 : 0 : *args++ = htonl(peer->cache_out->GetMaxCacheSize());
1665 : 0 : *args++ = htonl(DATA_FORMAT_VERSION);
1666 : 0 : *args++ = htonl(peer->our_runtime);
1667 : 0 : strcpy((char*) args, peer->our_class.c_str());
1668 : :
1669 : 0 : ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
1670 : 0 : c->len = len;
1671 : 0 : c->data = data;
1672 : :
1673 [ # # ]: 0 : if ( peer->our_class.size() )
1674 : 0 : Log(LogInfo, fmt("sending class \"%s\"", peer->our_class.c_str()), peer);
1675 : :
1676 [ # # ]: 0 : if ( ! SendToChild(c) )
1677 : : {
1678 : 0 : Log(LogError, "can't send version message");
1679 : 0 : CloseConnection(peer);
1680 : 0 : return;
1681 : : }
1682 : :
1683 : 0 : peer->sent_version |= Peer::WE;
1684 : 0 : Log(LogInfo, "peer connected", peer);
1685 : 0 : Log(LogInfo, "phase: version", peer);
1686 : : }
1687 : :
1688 : 0 : RecordVal* RemoteSerializer::MakePeerVal(Peer* peer)
1689 : : {
1690 : 0 : RecordVal* v = new RecordVal(::peer);
1691 : 0 : v->Assign(0, new Val(uint32(peer->id), TYPE_COUNT));
1692 : : // Sic! Network order for AddrVal, host order for PortVal.
1693 : 0 : v->Assign(1, new AddrVal(htonl(peer->ip)));
1694 : 0 : v->Assign(2, new PortVal(peer->port, TRANSPORT_TCP));
1695 : 0 : v->Assign(3, new Val(false, TYPE_BOOL));
1696 : 0 : v->Assign(4, new StringVal("")); // set when received
1697 : : v->Assign(5, peer->peer_class.size() ?
1698 [ # # ]: 0 : new StringVal(peer->peer_class.c_str()) : 0);
1699 : 0 : return v;
1700 : : }
1701 : :
1702 : : RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port,
1703 : 0 : PeerID id)
1704 : : {
1705 : 0 : Peer* peer = new Peer;
1706 [ # # ]: 0 : peer->id = id != PEER_NONE ? id : id_counter++;
1707 : 0 : peer->ip = ip;
1708 : 0 : peer->port = port;
1709 : 0 : peer->state = Peer::INIT;
1710 : 0 : peer->phase = Peer::UNKNOWN;
1711 : 0 : peer->sent_version = Peer::NONE;
1712 : 0 : peer->sync_requested = Peer::NONE;
1713 : 0 : peer->handshake_done = Peer::NONE;
1714 : 0 : peer->orig = false;
1715 : 0 : peer->accept_state = false;
1716 : 0 : peer->send_state = false;
1717 : 0 : peer->caps = 0;
1718 : 0 : peer->comp_level = 0;
1719 : 0 : peer->suspended_processing = false;
1720 : 0 : peer->caps = 0;
1721 : 0 : peer->val = MakePeerVal(peer);
1722 : 0 : peer->cache_in = new SerializationCache(MAX_CACHE_SIZE);
1723 : 0 : peer->cache_out = new SerializationCache(MAX_CACHE_SIZE);
1724 : 0 : peer->sync_point = 0;
1725 : 0 : peer->print_buffer = 0;
1726 : 0 : peer->print_buffer_used = 0;
1727 : :
1728 : 0 : peers.append(peer);
1729 : 0 : Log(LogInfo, "added peer", peer);
1730 : :
1731 : 0 : return peer;
1732 : : }
1733 : :
1734 : 0 : void RemoteSerializer::UnregisterHandlers(Peer* peer)
1735 : : {
1736 : : // Unregister the peers for the EventHandlers.
1737 [ # # ]: 0 : loop_over_list(peer->handlers, i)
1738 : : {
1739 : 0 : peer->handlers[i]->RemoveRemoteHandler(peer->id);
1740 : : }
1741 : 0 : }
1742 : :
1743 : 0 : void RemoteSerializer::RemovePeer(Peer* peer)
1744 : : {
1745 [ # # ]: 0 : if ( peer->suspended_processing )
1746 : : {
1747 : 0 : net_continue_processing();
1748 : 0 : peer->suspended_processing = false;
1749 : : }
1750 : :
1751 : 0 : peers.remove(peer);
1752 : 0 : UnregisterHandlers(peer);
1753 : :
1754 : 0 : Log(LogInfo, "removed peer", peer);
1755 : :
1756 : 0 : int id = peer->id;
1757 : 0 : Unref(peer->val);
1758 [ # # ]: 0 : delete [] peer->print_buffer;
1759 [ # # ]: 0 : delete peer->cache_in;
1760 [ # # ]: 0 : delete peer->cache_out;
1761 [ # # ]: 0 : delete peer;
1762 : :
1763 : 0 : closed = ! IsActive();
1764 : :
1765 [ # # ]: 0 : if ( in_sync == peer )
1766 : 0 : in_sync = 0;
1767 : 0 : }
1768 : :
1769 : : RemoteSerializer::Peer* RemoteSerializer::LookupPeer(PeerID id,
1770 : 0 : bool only_if_connected)
1771 : : {
1772 : 0 : Peer* peer = 0;
1773 [ # # ]: 0 : loop_over_list(peers, i)
1774 [ # # ]: 0 : if ( peers[i]->id == id )
1775 : : {
1776 : 0 : peer = peers[i];
1777 : 0 : break;
1778 : : }
1779 : :
1780 [ # # ][ # # ]: 0 : if ( ! only_if_connected || (peer && peer->state == Peer::CONNECTED) )
[ # # ]
1781 : 0 : return peer;
1782 : : else
1783 : 0 : return 0;
1784 : : }
1785 : :
1786 : 0 : bool RemoteSerializer::ProcessVersionMsg()
1787 : : {
1788 : 0 : uint32* args = (uint32*) current_args->data;
1789 : 0 : uint32 version = ntohl(args[0]);
1790 : 0 : uint32 data_version = ntohl(args[2]);
1791 : :
1792 [ # # ]: 0 : if ( PROTOCOL_VERSION != version )
1793 : : {
1794 : : Log(LogError, fmt("remote protocol version mismatch: got %d, but expected %d",
1795 : 0 : version, PROTOCOL_VERSION), current_peer);
1796 : 0 : CloseConnection(current_peer);
1797 : 0 : return true;
1798 : : }
1799 : :
1800 : : // For backwards compatibility, data_version may be null.
1801 [ # # ][ # # ]: 0 : if ( data_version && DATA_FORMAT_VERSION != data_version )
1802 : : {
1803 : : Log(LogError, fmt("remote data version mismatch: got %d, but expected %d",
1804 : : data_version, DATA_FORMAT_VERSION),
1805 : 0 : current_peer);
1806 : 0 : CloseConnection(current_peer);
1807 : 0 : return true;
1808 : : }
1809 : :
1810 : 0 : uint32 cache_size = ntohl(args[1]);
1811 : 0 : current_peer->cache_in->SetMaxCacheSize(cache_size);
1812 : 0 : current_peer->runtime = ntohl(args[3]);
1813 : :
1814 : 0 : current_peer->sent_version |= Peer::PEER;
1815 : :
1816 [ # # ]: 0 : if ( current_args->len > 4 * sizeof(uint32) )
1817 : : {
1818 : : // The peer sends us a class string.
1819 : 0 : const char* pclass = (const char*) &args[4];
1820 : 0 : current_peer->peer_class = pclass;
1821 [ # # ]: 0 : if ( *pclass )
1822 : 0 : Log(LogInfo, fmt("peer sent class \"%s\"", pclass), current_peer);
1823 [ # # ]: 0 : if ( current_peer->val )
1824 : 0 : current_peer->val->Assign(5, new StringVal(pclass));
1825 : : }
1826 : :
1827 [ # # ]: 0 : assert(current_peer->sent_version == Peer::BOTH);
1828 : 0 : current_peer->phase = Peer::HANDSHAKE;
1829 : 0 : Log(LogInfo, "phase: handshake", current_peer);
1830 : :
1831 [ # # ]: 0 : if ( ! SendCapabilities(current_peer) )
1832 : 0 : return false;
1833 : :
1834 : 0 : RaiseEvent(remote_connection_established, current_peer);
1835 : :
1836 : 0 : return true;
1837 : : }
1838 : :
1839 : 0 : bool RemoteSerializer::EnterPhaseRunning(Peer* peer)
1840 : : {
1841 [ # # ]: 0 : if ( in_sync == peer )
1842 : 0 : in_sync = 0;
1843 : :
1844 : 0 : current_peer->phase = Peer::RUNNING;
1845 : 0 : Log(LogInfo, "phase: running", peer);
1846 : :
1847 : 0 : RaiseEvent(remote_connection_handshake_done, current_peer);
1848 : :
1849 [ # # ]: 0 : if ( remote_trace_sync_interval )
1850 : : {
1851 [ # # ]: 0 : loop_over_list(peers, i)
1852 : : {
1853 [ # # ]: 0 : if ( ! SendToChild(MSG_SYNC_POINT, peers[i],
1854 : : 1, current_sync_point) )
1855 : 0 : return false;
1856 : : }
1857 : : }
1858 : :
1859 : 0 : return true;
1860 : : }
1861 : :
1862 : 0 : bool RemoteSerializer::ProcessConnected()
1863 : : {
1864 : : // IP and port follow.
1865 : 0 : uint32* args = (uint32*) current_args->data;
1866 : 0 : uint32 host = ntohl(args[0]); // ### Fix: only works for IPv4
1867 : 0 : uint16 port = (uint16) ntohl(args[1]);
1868 : :
1869 [ # # ]: 0 : if ( ! current_peer )
1870 : : {
1871 : : // The other side connected to one of our listening ports.
1872 : 0 : current_peer = AddPeer(host, port, current_id);
1873 : 0 : current_peer->orig = false;
1874 : : }
1875 [ # # ]: 0 : else if ( current_peer->orig )
1876 : : {
1877 : : // It's a successful retry.
1878 : 0 : current_peer->port = port;
1879 : 0 : current_peer->accept_state = false;
1880 : 0 : Unref(current_peer->val);
1881 : 0 : current_peer->val = MakePeerVal(current_peer);
1882 : : }
1883 : :
1884 : 0 : PeerConnected(current_peer);
1885 : :
1886 : 0 : ID* descr = global_scope()->Lookup("peer_description");
1887 [ # # ]: 0 : if ( ! descr )
1888 : 0 : internal_error("peer_description not defined");
1889 : :
1890 : 0 : SerialInfo info(this);
1891 : 0 : SendID(&info, current_peer, *descr);
1892 : :
1893 : 0 : return true;
1894 : : }
1895 : :
1896 : 0 : bool RemoteSerializer::ProcessRequestEventsMsg()
1897 : : {
1898 [ # # ]: 0 : if ( ! current_peer )
1899 : 0 : return false;
1900 : :
1901 : : // Register new handlers.
1902 : 0 : char* p = current_args->data;
1903 [ # # ]: 0 : while ( p < current_args->data + current_args->len )
1904 : : {
1905 : 0 : EventHandler* handler = event_registry->Lookup(p);
1906 [ # # ]: 0 : if ( handler )
1907 : : {
1908 : 0 : handler->AddRemoteHandler(current_peer->id);
1909 : 0 : current_peer->handlers.append(handler);
1910 : 0 : RaiseEvent(remote_event_registered, current_peer, p);
1911 : : Log(LogInfo, fmt("registered for event %s", p),
1912 : 0 : current_peer);
1913 : :
1914 : : // If the other side requested the print_hook event,
1915 : : // we initialize the buffer.
1916 [ # # # # ]: 0 : if ( current_peer->print_buffer == 0 &&
[ # # ]
1917 : : streq(p, "print_hook") )
1918 : : {
1919 : : current_peer->print_buffer =
1920 : 0 : new char[PRINT_BUFFER_SIZE];
1921 : 0 : current_peer->print_buffer_used = 0;
1922 : : Log(LogInfo, "initialized print buffer",
1923 : 0 : current_peer);
1924 : : }
1925 : : }
1926 : : else
1927 : : Log(LogInfo, fmt("request for unknown event %s", p),
1928 : 0 : current_peer);
1929 : :
1930 : 0 : p += strlen(p) + 1;
1931 : : }
1932 : :
1933 : 0 : return true;
1934 : : }
1935 : :
1936 : 0 : bool RemoteSerializer::ProcessRequestSyncMsg()
1937 : : {
1938 [ # # ]: 0 : if ( ! current_peer )
1939 : 0 : return false;
1940 : :
1941 : 0 : int auth = 0;
1942 : 0 : uint32* args = (uint32*) current_args->data;
1943 [ # # ]: 0 : if ( ntohl(args[0]) != 0 )
1944 : : {
1945 : 0 : Log(LogInfo, "peer considers its state authoritative", current_peer);
1946 : 0 : auth = Peer::AUTH_PEER;
1947 : : }
1948 : :
1949 : 0 : current_peer->sync_requested |= Peer::PEER | auth;
1950 : 0 : return true;
1951 : : }
1952 : :
1953 : 0 : bool RemoteSerializer::ProcessPhaseDone()
1954 : : {
1955 [ # # # ]: 0 : switch ( current_peer->phase ) {
1956 : : case Peer::HANDSHAKE:
1957 : : {
1958 : 0 : current_peer->handshake_done |= Peer::PEER;
1959 : :
1960 [ # # ]: 0 : if ( current_peer->handshake_done == Peer::BOTH )
1961 : 0 : HandshakeDone(current_peer);
1962 : 0 : break;
1963 : : }
1964 : :
1965 : : case Peer::SYNC:
1966 : : {
1967 : : // Make sure that the other side is supposed to sent us this.
1968 [ # # ]: 0 : if ( current_peer->send_state )
1969 : : {
1970 : 0 : Log(LogError, "unexpected phase_done in sync phase from peer", current_peer);
1971 : 0 : CloseConnection(current_peer);
1972 : 0 : return false;
1973 : : }
1974 : :
1975 [ # # ]: 0 : if ( ! EnterPhaseRunning(current_peer) )
1976 : : {
1977 [ # # ]: 0 : if ( current_peer->suspended_processing )
1978 : : {
1979 : 0 : net_continue_processing();
1980 : 0 : current_peer->suspended_processing = false;
1981 : : }
1982 : :
1983 : 0 : return false;
1984 : : }
1985 : :
1986 [ # # ]: 0 : if ( current_peer->suspended_processing )
1987 : : {
1988 : 0 : net_continue_processing();
1989 : 0 : current_peer->suspended_processing = false;
1990 : : }
1991 : :
1992 : 0 : break;
1993 : : }
1994 : :
1995 : : default:
1996 : 0 : Log(LogError, "unexpected phase_done", current_peer);
1997 : 0 : CloseConnection(current_peer);
1998 : : }
1999 : :
2000 : 0 : return true;
2001 : : }
2002 : :
2003 : 0 : bool RemoteSerializer::HandshakeDone(Peer* peer)
2004 : : {
2005 : : #ifdef HAVE_LIBZ
2006 [ # # ][ # # ]: 0 : if ( peer->caps & Peer::COMPRESSION && peer->comp_level > 0 )
2007 [ # # ]: 0 : if ( ! SendToChild(MSG_COMPRESS, peer, 1, peer->comp_level) )
2008 : 0 : return false;
2009 : : #endif
2010 : :
2011 [ # # ]: 0 : if ( ! (current_peer->caps & Peer::PID_64BIT) )
2012 : 0 : Log(LogInfo, "peer does not support 64bit PIDs; using compatibility mode", current_peer);
2013 : :
2014 [ # # ]: 0 : if ( (current_peer->caps & Peer::NEW_CACHE_STRATEGY) )
2015 : : Log(LogInfo, "peer supports keep-in-cache; using that",
2016 : 0 : current_peer);
2017 : :
2018 [ # # ]: 0 : if ( peer->sync_requested != Peer::NONE )
2019 : : {
2020 [ # # ]: 0 : if ( in_sync )
2021 : : {
2022 : : Log(LogInfo, "another sync in progress, waiting...",
2023 : 0 : peer);
2024 : 0 : sync_pending.append(peer);
2025 : 0 : return true;
2026 : : }
2027 : :
2028 [ # # ][ # # ]: 0 : if ( (peer->sync_requested & Peer::AUTH_PEER) &&
2029 : : (peer->sync_requested & Peer::AUTH_WE) )
2030 : : {
2031 : : Log(LogError, "misconfiguration: authoritative state on both sides",
2032 : 0 : current_peer);
2033 : 0 : CloseConnection(current_peer);
2034 : 0 : return false;
2035 : : }
2036 : :
2037 : 0 : in_sync = peer;
2038 : 0 : peer->phase = Peer::SYNC;
2039 : :
2040 : : // If only one side has requested state synchronization,
2041 : : // it will get all the state from the peer.
2042 : : //
2043 : : // If both sides have shown interest, the one considering
2044 : : // itself authoritative will send the state. If none is
2045 : : // authoritative, the peer which is running longest sends
2046 : : // its state.
2047 : : //
2048 [ # # ]: 0 : if ( (peer->sync_requested & Peer::BOTH) != Peer::BOTH )
2049 : : {
2050 : : // One side.
2051 [ # # ]: 0 : if ( peer->sync_requested & Peer::PEER )
2052 : 0 : peer->send_state = true;
2053 [ # # ]: 0 : else if ( peer->sync_requested & Peer::WE )
2054 : 0 : peer->send_state = false;
2055 : : else
2056 : 0 : internal_error("illegal sync_requested value");
2057 : : }
2058 : : else
2059 : : {
2060 : : // Both.
2061 [ # # ]: 0 : if ( peer->sync_requested & Peer::AUTH_WE )
2062 : 0 : peer->send_state = true;
2063 [ # # ]: 0 : else if ( peer->sync_requested & Peer::AUTH_PEER )
2064 : 0 : peer->send_state = false;
2065 : : else
2066 : : {
2067 [ # # ]: 0 : if ( peer->our_runtime == peer->runtime )
2068 : 0 : peer->send_state = peer->orig;
2069 : : else
2070 : : peer->send_state = (peer->our_runtime >
2071 : 0 : peer->runtime);
2072 : : }
2073 : : }
2074 : :
2075 [ # # ]: 0 : Log(LogInfo, fmt("phase: sync (%s)", (peer->send_state ? "sender" : "receiver")), peer);
2076 : :
2077 [ # # ]: 0 : if ( peer->send_state )
2078 : : {
2079 : 0 : SerialInfo* info = new SerialInfo(this);
2080 : 0 : SendAllSynchronized(peer, info);
2081 : : }
2082 : :
2083 : : else
2084 : : {
2085 : : // Suspend until we got everything.
2086 : 0 : net_suspend_processing();
2087 : 0 : peer->suspended_processing = true;
2088 : : }
2089 : : }
2090 : : else
2091 : 0 : return EnterPhaseRunning(peer);
2092 : :
2093 : 0 : return true;
2094 : : }
2095 : :
2096 : 0 : bool RemoteSerializer::ProcessPingMsg()
2097 : : {
2098 [ # # ]: 0 : if ( ! current_peer )
2099 : 0 : return false;
2100 : :
2101 [ # # ]: 0 : if ( ! SendToChild(MSG_PONG, current_peer,
2102 : : current_args->data, current_args->len) )
2103 : 0 : return false;
2104 : :
2105 : 0 : return true;
2106 : : }
2107 : :
2108 : 0 : bool RemoteSerializer::ProcessPongMsg()
2109 : : {
2110 [ # # ]: 0 : if ( ! current_peer )
2111 : 0 : return false;
2112 : :
2113 : 0 : ping_args* args = (ping_args*) current_args->data;
2114 : :
2115 : 0 : val_list* vl = new val_list;
2116 : 0 : vl->append(current_peer->val->Ref());
2117 : 0 : vl->append(new Val((unsigned int) ntohl(args->seq), TYPE_COUNT));
2118 : : vl->append(new Val(current_time(true) - ntohd(args->time1),
2119 : 0 : TYPE_INTERVAL));
2120 : 0 : vl->append(new Val(ntohd(args->time2), TYPE_INTERVAL));
2121 : 0 : vl->append(new Val(ntohd(args->time3), TYPE_INTERVAL));
2122 : 0 : mgr.QueueEvent(remote_pong, vl);
2123 : 0 : return true;
2124 : : }
2125 : :
2126 : 0 : bool RemoteSerializer::ProcessCapsMsg()
2127 : : {
2128 [ # # ]: 0 : if ( ! current_peer )
2129 : 0 : return false;
2130 : :
2131 : 0 : uint32* args = (uint32*) current_args->data;
2132 : 0 : current_peer->caps = ntohl(args[0]);
2133 : 0 : return true;
2134 : : }
2135 : :
2136 : 0 : bool RemoteSerializer::ProcessLogMsg(bool is_error)
2137 : : {
2138 [ # # ]: 0 : Log(is_error ? LogError : LogInfo, current_args->data, 0, LogChild);
2139 : 0 : return true;
2140 : : }
2141 : :
2142 : 0 : bool RemoteSerializer::ProcessStatsMsg()
2143 : : {
2144 : : // Take the opportunity to log our stats, too.
2145 : 0 : LogStats();
2146 : :
2147 : : // Split the concatenated child stats into indiviual log messages.
2148 : 0 : int count = 0;
2149 [ # # ]: 0 : for ( char* p = current_args->data;
2150 : : p < current_args->data + current_args->len; p += strlen(p) + 1 )
2151 : : Log(LogInfo, fmt("child statistics: [%d] %s", count++, p),
2152 : 0 : current_peer);
2153 : :
2154 : 0 : return true;
2155 : : }
2156 : :
2157 : 0 : bool RemoteSerializer::ProcessCaptureFilterMsg()
2158 : : {
2159 [ # # ]: 0 : if ( ! current_peer )
2160 : 0 : return false;
2161 : :
2162 : 0 : RaiseEvent(remote_capture_filter, current_peer, current_args->data);
2163 : 0 : return true;
2164 : : }
2165 : :
2166 : 0 : bool RemoteSerializer::CheckSyncPoints()
2167 : : {
2168 [ # # ]: 0 : if ( ! current_sync_point )
2169 : 0 : return false;
2170 : :
2171 : 0 : int ready = 0;
2172 : :
2173 [ # # ]: 0 : loop_over_list(peers, i)
2174 [ # # ]: 0 : if ( peers[i]->sync_point >= current_sync_point )
2175 : 0 : ready++;
2176 : :
2177 [ # # ]: 0 : if ( ready < remote_trace_sync_peers )
2178 : 0 : return false;
2179 : :
2180 [ # # ]: 0 : if ( current_sync_point == FINAL_SYNC_POINT )
2181 : : {
2182 : 0 : Log(LogInfo, fmt("all peers reached final sync-point, going to finish"));
2183 : 0 : Terminate();
2184 : : }
2185 : : else
2186 : : Log(LogInfo, fmt("all peers reached sync-point %u",
2187 : 0 : current_sync_point));
2188 : :
2189 [ # # ]: 0 : if ( syncing_times )
2190 : : {
2191 [ # # ]: 0 : loop_over_list(peers, i)
2192 : : {
2193 [ # # ]: 0 : if ( peers[i]->suspended_processing )
2194 : : {
2195 : 0 : net_continue_processing();
2196 : 0 : peers[i]->suspended_processing = false;
2197 : : }
2198 : : }
2199 : :
2200 : 0 : syncing_times = false;
2201 : : }
2202 : :
2203 : 0 : return true;
2204 : : }
2205 : :
2206 : 0 : bool RemoteSerializer::ProcessSyncPointMsg()
2207 : : {
2208 [ # # ]: 0 : if ( ! current_peer )
2209 : 0 : return false;
2210 : :
2211 : 0 : uint32* args = (uint32*) current_args->data;
2212 : 0 : uint32 count = ntohl(args[0]);
2213 : :
2214 : 0 : current_peer->sync_point = max(current_peer->sync_point, count);
2215 : :
2216 [ # # ]: 0 : if ( current_peer->sync_point == FINAL_SYNC_POINT )
2217 : 0 : Log(LogInfo, fmt("reached final sync-point"), current_peer);
2218 : : else
2219 : 0 : Log(LogInfo, fmt("reached sync-point %u", current_peer->sync_point), current_peer);
2220 : :
2221 [ # # ]: 0 : if ( syncing_times )
2222 : 0 : CheckSyncPoints();
2223 : :
2224 : 0 : return true;
2225 : : }
2226 : :
2227 : 0 : bool RemoteSerializer::ProcessSerialization()
2228 : : {
2229 [ # # ]: 0 : if ( current_peer->state == Peer::CLOSING )
2230 : 0 : return false;
2231 : :
2232 : 0 : SetCache(current_peer->cache_in);
2233 : 0 : UnserialInfo info(this);
2234 : :
2235 : 0 : bool accept_state = current_peer->accept_state;
2236 : :
2237 : : #if 0
2238 : : // If processing is suspended, we unserialize the data but throw
2239 : : // it away.
2240 : : if ( current_peer->phase == Peer::RUNNING &&
2241 : : net_is_processing_suspended() )
2242 : : accept_state = false;
2243 : : #endif
2244 : :
2245 [ # # ]: 0 : assert(current_args);
2246 : 0 : info.chunk = current_args;
2247 : :
2248 : 0 : info.install_globals = accept_state;
2249 : 0 : info.install_conns = accept_state;
2250 : 0 : info.ignore_callbacks = ! accept_state;
2251 : :
2252 [ # # ]: 0 : if ( current_peer->phase != Peer::RUNNING )
2253 : 0 : info.id_policy = UnserialInfo::InstantiateNew;
2254 : : else
2255 : : info.id_policy = accept_state ?
2256 : : UnserialInfo::CopyNewToCurrent :
2257 [ # # ]: 0 : UnserialInfo::Keep;
2258 : :
2259 [ # # ][ # # ]: 0 : if ( ! (current_peer->caps & Peer::PID_64BIT) ||
2260 : : current_peer->phase != Peer::RUNNING )
2261 : 0 : info.pid_32bit = true;
2262 : :
2263 [ # # ][ # # ]: 0 : if ( (current_peer->caps & Peer::NEW_CACHE_STRATEGY) &&
2264 : : current_peer->phase == Peer::RUNNING )
2265 : 0 : info.new_cache_strategy = true;
2266 : :
2267 [ # # ]: 0 : if ( ! forward_remote_state_changes )
2268 : 0 : ignore_accesses = true;
2269 : :
2270 : 0 : source_peer = current_peer;
2271 : 0 : int i = Unserialize(&info);
2272 : 0 : source_peer = 0;
2273 : :
2274 [ # # ]: 0 : if ( ! forward_remote_state_changes )
2275 : 0 : ignore_accesses = false;
2276 : :
2277 [ # # ]: 0 : if ( i < 0 )
2278 : : {
2279 : 0 : Log(LogError, "unserialization error", current_peer);
2280 : 0 : CloseConnection(current_peer);
2281 : : // Error
2282 : 0 : return false;
2283 : : }
2284 : :
2285 : 0 : return true;
2286 : : }
2287 : :
2288 : 0 : bool RemoteSerializer::FlushPrintBuffer(Peer* p)
2289 : : {
2290 [ # # ]: 0 : if ( p->state == Peer::CLOSING )
2291 : 0 : return false;
2292 : :
2293 [ # # ]: 0 : if ( ! p->print_buffer )
2294 : 0 : return true;
2295 : :
2296 : 0 : SendToChild(MSG_REMOTE_PRINT, p, p->print_buffer, p->print_buffer_used);
2297 : :
2298 : 0 : p->print_buffer = new char[PRINT_BUFFER_SIZE];
2299 : 0 : p->print_buffer_used = 0;
2300 : 0 : return true;
2301 : : }
2302 : :
2303 : 0 : bool RemoteSerializer::SendPrintHookEvent(BroFile* f, const char* txt)
2304 : : {
2305 [ # # ]: 0 : loop_over_list(peers, i)
2306 : : {
2307 : 0 : Peer* p = peers[i];
2308 : :
2309 [ # # ]: 0 : if ( ! p->print_buffer )
2310 : 0 : continue;
2311 : :
2312 : 0 : const char* fname = f->Name();
2313 [ # # ]: 0 : if ( ! fname )
2314 : 0 : continue; // not a managed file.
2315 : :
2316 : 0 : int len = strlen(txt);
2317 : :
2318 : : // We cut off everything after the max buffer size. That
2319 : : // makes the code a bit easier, and we shouldn't have such
2320 : : // long lines anyway.
2321 : 0 : len = min(len, PRINT_BUFFER_SIZE - strlen(fname) - 2);
2322 : :
2323 : : // If there's not enough space in the buffer, flush it.
2324 : :
2325 : 0 : int need = strlen(fname) + 1 + len + 1;
2326 [ # # ]: 0 : if ( p->print_buffer_used + need > PRINT_BUFFER_SIZE )
2327 : : {
2328 [ # # ]: 0 : if ( ! FlushPrintBuffer(p) )
2329 : 0 : return false;
2330 : : }
2331 : :
2332 [ # # ]: 0 : assert(p->print_buffer_used + need <= PRINT_BUFFER_SIZE);
2333 : :
2334 : 0 : char* dst = p->print_buffer + p->print_buffer_used;
2335 : 0 : strcpy(dst, fname);
2336 : 0 : dst += strlen(fname) + 1;
2337 : 0 : memcpy(dst, txt, len);
2338 : 0 : dst += len;
2339 : 0 : *dst++ = '\0';
2340 : :
2341 : 0 : p->print_buffer_used = dst - p->print_buffer;
2342 : : }
2343 : :
2344 : 0 : return true;
2345 : : }
2346 : :
2347 : 0 : bool RemoteSerializer::ProcessRemotePrint()
2348 : : {
2349 [ # # ]: 0 : if ( current_peer->state == Peer::CLOSING )
2350 : 0 : return false;
2351 : :
2352 : 0 : const char* p = current_args->data;
2353 [ # # ]: 0 : while ( p < current_args->data + current_args->len )
2354 : : {
2355 : 0 : const char* fname = p;
2356 : 0 : p += strlen(p) + 1;
2357 : 0 : const char* txt = p;
2358 : 0 : p += strlen(p) + 1;
2359 : :
2360 : 0 : val_list* vl = new val_list(2);
2361 : 0 : BroFile* f = BroFile::GetFile(fname);
2362 : 0 : Ref(f);
2363 : 0 : vl->append(new Val(f));
2364 : 0 : vl->append(new StringVal(txt));
2365 : 0 : GotEvent("print_hook", -1.0, print_hook, vl);
2366 : : }
2367 : :
2368 : 0 : return true;
2369 : : }
2370 : :
2371 : :
2372 : : void RemoteSerializer::GotEvent(const char* name, double time,
2373 : 0 : EventHandlerPtr event, val_list* args)
2374 : : {
2375 [ # # ]: 0 : if ( time >= 0 )
2376 : : {
2377 : : // Marker for being called from ProcessRemotePrint().
2378 : 0 : DEBUG_COMM("parent: got event");
2379 : 0 : ++stats.events.in;
2380 : : }
2381 : :
2382 [ # # ]: 0 : if ( ! current_peer )
2383 : : {
2384 : 0 : Error("unserialized event from unknown peer");
2385 : 0 : return;
2386 : : }
2387 : :
2388 : 0 : BufferedEvent* e = new BufferedEvent;
2389 : :
2390 : : // Our time, not the time when the event was generated.
2391 : : e->time = pkt_srcs.length() ?
2392 [ # # ]: 0 : time_t(network_time) : time_t(timer_mgr->Time());
2393 : :
2394 : 0 : e->src = current_peer->id;
2395 : 0 : e->handler = event;
2396 : 0 : e->args = args;
2397 : :
2398 : 0 : events.append(e);
2399 : : }
2400 : :
2401 : : void RemoteSerializer::GotFunctionCall(const char* name, double time,
2402 : 0 : Func* function, val_list* args)
2403 : : {
2404 : 0 : DEBUG_COMM("parent: got function call");
2405 : 0 : ++stats.events.in;
2406 : :
2407 [ # # ]: 0 : if ( ! current_peer )
2408 : : {
2409 : 0 : Error("unserialized function from unknown peer");
2410 : 0 : return;
2411 : : }
2412 : :
2413 : 0 : function->Call(args);
2414 : : }
2415 : :
2416 : 0 : void RemoteSerializer::GotID(ID* id, Val* val)
2417 : : {
2418 : 0 : ++stats.ids.in;
2419 : :
2420 [ # # ]: 0 : if ( ! current_peer )
2421 : : {
2422 : 0 : Error("unserialized id from unknown peer");
2423 : 0 : Unref(id);
2424 : 0 : return;
2425 : : }
2426 : :
2427 [ # # ][ # # ]: 0 : if ( current_peer->phase == Peer::HANDSHAKE &&
[ # # ]
2428 : : streq(id->Name(), "peer_description") )
2429 : : {
2430 [ # # ]: 0 : if ( val->Type()->Tag() != TYPE_STRING )
2431 : : {
2432 : 0 : Error("peer_description not a string");
2433 : 0 : Unref(id);
2434 : 0 : return;
2435 : : }
2436 : :
2437 : 0 : const char* desc = val->AsString()->CheckString();
2438 : 0 : current_peer->val->Assign(4, new StringVal(desc));
2439 : :
2440 : : Log(LogInfo, fmt("peer_description is %s",
2441 : : (desc && *desc) ? desc : "not set"),
2442 [ # # # # ]: 0 : current_peer);
2443 : :
2444 : 0 : Unref(id);
2445 : 0 : return;
2446 : : }
2447 : :
2448 [ # # ]: 0 : if ( id->Name()[0] == '#' )
2449 : : {
2450 : : // This is a globally unique, non-user-visible ID.
2451 : :
2452 : : // Only MutableVals can be bound to names starting with '#'.
2453 [ # # ]: 0 : assert(val->IsMutableVal());
2454 : :
2455 : : // It must be already installed in the global namespace:
2456 : : // either we saw it before, or MutableVal::Unserialize()
2457 : : // installed it.
2458 [ # # ]: 0 : assert(global_scope()->Lookup(id->Name()));
2459 : :
2460 : : // Only synchronized values can arrive here.
2461 [ # # ]: 0 : assert(((MutableVal*) val)->GetProperties() & MutableVal::SYNCHRONIZED);
2462 : :
2463 : 0 : DBG_LOG(DBG_COMM, "got ID %s from peer\n", id->Name());
2464 : : }
2465 : :
2466 : 0 : Unref(id);
2467 : : }
2468 : :
2469 : 0 : void RemoteSerializer::GotConnection(Connection* c)
2470 : : {
2471 : 0 : ++stats.conns.in;
2472 : :
2473 : : // Nothing else to-do. Connection will be installed automatically
2474 : : // (if allowed).
2475 : :
2476 : 0 : Unref(c);
2477 : 0 : }
2478 : :
2479 : 0 : void RemoteSerializer::GotStateAccess(StateAccess* s)
2480 : : {
2481 : 0 : ++stats.accesses.in;
2482 : :
2483 : 0 : ODesc d;
2484 : 0 : DBG_LOG(DBG_COMM, "got StateAccess: %s", (s->Describe(&d), d.Description()));
2485 : :
2486 [ # # ]: 0 : if ( ! current_peer )
2487 : : {
2488 : 0 : Error("unserialized function from unknown peer");
2489 : 0 : return;
2490 : : }
2491 : :
2492 [ # # ]: 0 : if ( current_peer->sync_requested & Peer::WE )
2493 : 0 : s->Replay();
2494 : :
2495 [ # # ][ # # ]: 0 : delete s;
2496 : : }
2497 : :
2498 : 0 : void RemoteSerializer::GotTimer(Timer* s)
2499 : : {
2500 : 0 : run_time("RemoteSerializer::GotTimer not implemented");
2501 : 0 : }
2502 : :
2503 : 0 : void RemoteSerializer::GotPacket(Packet* p)
2504 : : {
2505 : 0 : ++stats.packets.in;
2506 : :
2507 : 0 : BufferedPacket* bp = new BufferedPacket;
2508 : 0 : bp->time = time_t(timer_mgr->Time());
2509 : 0 : bp->p = p;
2510 : 0 : packets.append(bp);
2511 : 0 : }
2512 : :
2513 : 0 : void RemoteSerializer::Log(LogLevel level, const char* msg)
2514 : : {
2515 : 0 : Log(level, msg, 0, LogParent);
2516 : 0 : }
2517 : :
2518 : : void RemoteSerializer::Log(LogLevel level, const char* msg, Peer* peer,
2519 : 0 : LogSrc src)
2520 : : {
2521 : 0 : const int BUFSIZE = 1024;
2522 : : char buffer[BUFSIZE];
2523 : :
2524 : 0 : int len = 0;
2525 : :
2526 [ # # ]: 0 : if ( peer )
2527 : : len += snprintf(buffer + len, sizeof(buffer) - len,
2528 : : "[#%d/%s:%d] ", int(peer->id), ip2a(peer->ip),
2529 : 0 : peer->port);
2530 : :
2531 : 0 : len += safe_snprintf(buffer + len, sizeof(buffer) - len, "%s", msg);
2532 : :
2533 : 0 : val_list* vl = new val_list();
2534 : 0 : vl->append(new Val(level, TYPE_COUNT));
2535 : 0 : vl->append(new Val(src, TYPE_COUNT));
2536 : 0 : vl->append(new StringVal(buffer));
2537 : 0 : mgr.QueueEvent(remote_log, vl);
2538 : :
2539 : 0 : DEBUG_COMM(fmt("parent: %.6f %s", current_time(), buffer));
2540 : 0 : }
2541 : :
2542 : : void RemoteSerializer::RaiseEvent(EventHandlerPtr event, Peer* peer,
2543 : 0 : const char* arg)
2544 : : {
2545 : 0 : val_list* vl = new val_list;
2546 : :
2547 [ # # ]: 0 : if ( peer )
2548 : : {
2549 : 0 : Ref(peer->val);
2550 : 0 : vl->append(peer->val);
2551 : : }
2552 : : else
2553 : : {
2554 : 0 : Val* v = mgr.GetLocalPeerVal();
2555 : 0 : v->Ref();
2556 : 0 : vl->append(v);
2557 : : }
2558 : :
2559 [ # # ]: 0 : if ( arg )
2560 : 0 : vl->append(new StringVal(arg));
2561 : :
2562 : : // If we only have remote sources, the network time
2563 : : // will not increase as long as no peers are connected.
2564 : : // Therefore, we send these events immediately.
2565 : 0 : mgr.Dispatch(new Event(event, vl, PEER_LOCAL));
2566 : 0 : }
2567 : :
2568 : 1 : void RemoteSerializer::LogStats()
2569 : : {
2570 [ + - ]: 1 : if ( ! io )
2571 : 1 : return;
2572 : :
2573 : : char buffer[512];
2574 : 0 : io->Stats(buffer, 512);
2575 : : Log(LogInfo, fmt("parent statistics: %s events=%lu/%lu operations=%lu/%lu",
2576 : : buffer, stats.events.in, stats.events.out,
2577 : 1 : stats.accesses.in, stats.accesses.out));
2578 : : }
2579 : :
2580 : 0 : RecordVal* RemoteSerializer::GetPeerVal(PeerID id)
2581 : : {
2582 : 0 : Peer* peer = LookupPeer(id, true);
2583 [ # # ]: 0 : if ( ! peer )
2584 : 0 : return 0;
2585 : :
2586 : 0 : Ref(peer->val);
2587 : 0 : return peer->val;
2588 : : }
2589 : :
2590 : 0 : void RemoteSerializer::ChildDied()
2591 : : {
2592 : 0 : Log(LogError, "child died");
2593 : 0 : closed = true;
2594 : 0 : child_pid = 0;
2595 : :
2596 : : // Shut down the main process as well.
2597 : 0 : terminate_processing();
2598 : 0 : }
2599 : :
2600 : 0 : bool RemoteSerializer::SendCMsgToChild(char msg_type, Peer* peer)
2601 : : {
2602 [ # # ][ # # ]: 0 : if ( ! sendCMsg(io, msg_type, peer ? peer->id : PEER_NONE) )
2603 : : {
2604 : : warn(fmt("can't send message of type %d: %s",
2605 : 0 : msg_type, io->Error()));
2606 : 0 : return false;
2607 : : }
2608 : 0 : return true;
2609 : : }
2610 : :
2611 : 0 : bool RemoteSerializer::SendToChild(char type, Peer* peer, char* str, int len)
2612 : : {
2613 [ # # ]: 0 : DEBUG_COMM(fmt("parent: (->child) %s (#%d, %s)", msgToStr(type), peer ? peer->id : PEER_NONE, str));
2614 : :
2615 [ # # ]: 0 : if ( ! child_pid )
2616 : 0 : return false;
2617 : :
2618 [ # # ][ # # ]: 0 : if ( sendToIO(io, type, peer ? peer->id : PEER_NONE, str, len) )
2619 : 0 : return true;
2620 : :
2621 [ # # ]: 0 : if ( io->Eof() )
2622 : 0 : ChildDied();
2623 : :
2624 : 0 : FatalError(io->Error());
2625 : 0 : return false;
2626 : : }
2627 : :
2628 : 0 : bool RemoteSerializer::SendToChild(char type, Peer* peer, int nargs, ...)
2629 : : {
2630 : : va_list ap;
2631 : :
2632 [ # # ]: 0 : if ( ! child_pid )
2633 : 0 : return false;
2634 : :
2635 : : #ifdef DEBUG
2636 : 0 : va_start(ap, nargs);
2637 [ # # ]: 0 : DEBUG_COMM(fmt("parent: (->child) %s (#%d,%s)",
2638 : : msgToStr(type), peer ? peer->id : PEER_NONE, fmt_uint32s(nargs, ap)));
2639 : 0 : va_end(ap);
2640 : : #endif
2641 : :
2642 : 0 : va_start(ap, nargs);
2643 [ # # ]: 0 : bool ret = sendToIO(io, type, peer ? peer->id : PEER_NONE, nargs, ap);
2644 : 0 : va_end(ap);
2645 : :
2646 [ # # ]: 0 : if ( ret )
2647 : 0 : return true;
2648 : :
2649 [ # # ]: 0 : if ( io->Eof() )
2650 : 0 : ChildDied();
2651 : :
2652 : 0 : FatalError(io->Error());
2653 : 0 : return false;
2654 : : }
2655 : :
2656 : 0 : bool RemoteSerializer::SendToChild(ChunkedIO::Chunk* c)
2657 : : {
2658 : 0 : DEBUG_COMM(fmt("parent: (->child) chunk of size %d", c->len));
2659 : :
2660 [ # # ]: 0 : if ( ! child_pid )
2661 : 0 : return false;
2662 : :
2663 [ # # ]: 0 : if ( sendToIO(io, c) )
2664 : 0 : return true;
2665 : :
2666 [ # # ]: 0 : if ( io->Eof() )
2667 : 0 : ChildDied();
2668 : :
2669 : 0 : FatalError(io->Error());
2670 : 0 : return false;
2671 : : }
2672 : :
2673 : 0 : void RemoteSerializer::FatalError(const char* msg)
2674 : : {
2675 : 0 : msg = fmt("fatal error, shutting down communication: %s", msg);
2676 : 0 : Log(LogError, msg);
2677 : 0 : error(msg);
2678 : :
2679 : 0 : closed = true;
2680 : 0 : kill(child_pid, SIGQUIT);
2681 : 0 : child_pid = 0;
2682 : 0 : using_communication = false;
2683 : 0 : io->Clear();
2684 : 0 : }
2685 : :
2686 : 0 : bool RemoteSerializer::IsActive()
2687 : : {
2688 [ # # ]: 0 : if ( listening )
2689 : 0 : return true;
2690 : :
2691 [ # # ]: 0 : loop_over_list(peers, i)
2692 [ # # ][ # # ]: 0 : if ( peers[i]->state == Peer::PENDING ||
[ # # ]
2693 : : peers[i]->state == Peer::CONNECTED )
2694 : 0 : return true;
2695 : :
2696 : 0 : return false;
2697 : : }
2698 : :
2699 : :
2700 : 3499 : const char* const* RemoteSerializer::GetBuiltins() const
2701 : : {
2702 : : static const char* builtins[] = { "connect", "listen", 0 };
2703 : 3499 : return builtins;
2704 : : }
2705 : :
2706 : 0 : void RemoteSerializer::ReportError(const char* msg)
2707 : : {
2708 [ # # ][ # # ]: 0 : if ( current_peer && current_peer->phase != Peer::SETUP )
2709 : 0 : RaiseEvent(remote_connection_error, current_peer, msg);
2710 : 0 : Log(LogError, msg, current_peer);
2711 : 0 : }
2712 : :
2713 : 0 : void RemoteSerializer::InternalCommError(const char* msg)
2714 : : {
2715 : : #ifdef DEBUG_COMMUNICATION
2716 : : DumpDebugData();
2717 : : #else
2718 : 0 : internal_error(msg);
2719 : : #endif
2720 : : }
2721 : :
2722 : : #ifdef DEBUG_COMMUNICATION
2723 : :
2724 : : void RemoteSerializer::DumpDebugData()
2725 : : {
2726 : : Log(LogError, "dumping debug data and terminating ...");
2727 : : io->DumpDebugData("comm-dump.parent", true);
2728 : : io->DumpDebugData("comm-dump.parent", false);
2729 : : SendToChild(MSG_DEBUG_DUMP, 0, 0);
2730 : : Terminate();
2731 : : }
2732 : :
2733 : : static ChunkedIO* openDump(const char* file)
2734 : : {
2735 : : int fd = open(file, O_RDONLY, 0600);
2736 : :
2737 : : if ( fd < 0 )
2738 : : {
2739 : : fprintf(stderr, "cannot open %s: %s\n", file, strerror(errno));
2740 : : return 0;
2741 : : }
2742 : :
2743 : : return new ChunkedIOFd(fd, "dump-file");
2744 : : }
2745 : :
2746 : : void RemoteSerializer::ReadDumpAsMessageType(const char* file)
2747 : : {
2748 : : ChunkedIO* io = openDump(file);
2749 : : if ( ! io )
2750 : : return;
2751 : :
2752 : : ChunkedIO::Chunk* chunk;
2753 : :
2754 : : if ( ! io->Read(&chunk, true ) )
2755 : : {
2756 : : fprintf(stderr, "cannot read %s: %s\n", file, strerror(errno));
2757 : : return;
2758 : : }
2759 : :
2760 : : CMsg* msg = (CMsg*) chunk->data;
2761 : :
2762 : : delete [] chunk->data;
2763 : : delete io;
2764 : : }
2765 : :
2766 : : void RemoteSerializer::ReadDumpAsSerialization(const char* file)
2767 : : {
2768 : : FileSerializer s;
2769 : : UnserialInfo info(&s);
2770 : : info.print = stdout;
2771 : : info.install_uniques = info.ignore_callbacks = true;
2772 : : s.Read(&info, file, false);
2773 : : }
2774 : :
2775 : : #endif
2776 : :
2777 : : ////////////////////////////
2778 : :
2779 : : // If true (set by signal handler), we will log some stats to parent.
2780 : : static bool log_stats = false;
2781 : : static bool log_prof = false;
2782 : :
2783 : : // How often stats are sent (in seconds).
2784 : : // Perhaps we should make this configurable...
2785 : : const int STATS_INTERVAL = 60;
2786 : :
2787 : 0 : static RETSIGTYPE sig_handler_log(int signo)
2788 : : {
2789 : : // SIGALRM is the only one we get.
2790 : 0 : log_stats = true;
2791 : 0 : }
2792 : :
2793 : 0 : static RETSIGTYPE sig_handler_prof(int signo)
2794 : : {
2795 : 0 : log_prof = true;
2796 : 0 : }
2797 : :
2798 : 0 : SocketComm::SocketComm()
2799 : : {
2800 : 0 : io = 0;
2801 : :
2802 : : // We start the ID counter high so that IDs assigned by us
2803 : : // (hopefully) don't conflict with those of our parent.
2804 : 0 : id_counter = 10000;
2805 : 0 : parent_peer = 0;
2806 : 0 : parent_msgstate = TYPE;
2807 : 0 : shutting_conns_down = false;
2808 : 0 : terminating = false;
2809 : 0 : killing = false;
2810 : :
2811 : 0 : listen_fd_clear = -1;
2812 : 0 : listen_fd_ssl = -1;
2813 : 0 : listen_next_try = 0;
2814 : :
2815 : : // We don't want to use the signal handlers of our parent.
2816 : 0 : (void) setsignal(SIGTERM, SIG_DFL);
2817 : 0 : (void) setsignal(SIGINT, SIG_DFL);
2818 : 0 : (void) setsignal(SIGUSR1, SIG_DFL);
2819 : 0 : (void) setsignal(SIGUSR2, SIG_DFL);
2820 : 0 : (void) setsignal(SIGCONT, SIG_DFL);
2821 : 0 : (void) setsignal(SIGCHLD, SIG_DFL);
2822 : :
2823 : : // Raping SIGPROF for profiling
2824 : 0 : (void) setsignal(SIGPROF, sig_handler_prof);
2825 : 0 : (void) setsignal(SIGALRM, sig_handler_log);
2826 : 0 : alarm(STATS_INTERVAL);
2827 : 0 : }
2828 : :
2829 : 0 : SocketComm::~SocketComm()
2830 : : {
2831 [ # # ][ # # ]: 0 : loop_over_list(peers, i)
2832 [ # # ][ # # ]: 0 : delete peers[i]->io;
2833 : :
2834 [ # # ][ # # ]: 0 : delete io;
2835 : 0 : close(listen_fd_clear);
2836 : 0 : close(listen_fd_ssl);
2837 : 0 : }
2838 : :
2839 : : static unsigned int first_rtime = 0;
2840 : :
2841 : 0 : void SocketComm::Run()
2842 : : {
2843 : 0 : first_rtime = (unsigned int) current_time(true);
2844 : :
2845 : 0 : while ( true )
2846 : : {
2847 : : // Logging signaled?
2848 [ # # ]: 0 : if ( log_stats )
2849 : 0 : LogStats();
2850 : :
2851 : : // Termination signaled
2852 [ # # ]: 0 : if ( terminating )
2853 : 0 : CheckFinished();
2854 : :
2855 : : // Build FDSets for select.
2856 : : fd_set fd_read, fd_write, fd_except;
2857 : :
2858 : 0 : FD_ZERO(&fd_read);
2859 : 0 : FD_ZERO(&fd_write);
2860 : 0 : FD_ZERO(&fd_except);
2861 : :
2862 : 0 : int max_fd = 0;
2863 : :
2864 : 0 : FD_SET(io->Fd(), &fd_read);
2865 : 0 : max_fd = io->Fd();
2866 : :
2867 [ # # ]: 0 : loop_over_list(peers, i)
2868 : : {
2869 [ # # ]: 0 : if ( peers[i]->connected )
2870 : : {
2871 : 0 : FD_SET(peers[i]->io->Fd(), &fd_read);
2872 [ # # ]: 0 : if ( peers[i]->io->Fd() > max_fd )
2873 : 0 : max_fd = peers[i]->io->Fd();
2874 : : }
2875 : : else
2876 : : {
2877 [ # # ][ # # ]: 0 : if ( peers[i]->next_try > 0 &&
[ # # ]
2878 : : time(0) > peers[i]->next_try )
2879 : : // Try reconnect.
2880 : 0 : Connect(peers[i]);
2881 : : }
2882 : : }
2883 : :
2884 [ # # ][ # # ]: 0 : if ( listen_next_try && time(0) > listen_next_try )
[ # # ]
2885 : 0 : Listen(listen_if, listen_port, listen_ssl);
2886 : :
2887 [ # # ]: 0 : if ( listen_fd_clear >= 0 )
2888 : : {
2889 : 0 : FD_SET(listen_fd_clear, &fd_read);
2890 [ # # ]: 0 : if ( listen_fd_clear > max_fd )
2891 : 0 : max_fd = listen_fd_clear;
2892 : : }
2893 : :
2894 [ # # ]: 0 : if ( listen_fd_ssl >= 0 )
2895 : : {
2896 : 0 : FD_SET(listen_fd_ssl, &fd_read);
2897 [ # # ]: 0 : if ( listen_fd_ssl > max_fd )
2898 : 0 : max_fd = listen_fd_ssl;
2899 : : }
2900 : :
2901 [ # # ][ # # ]: 0 : if ( io->IsFillingUp() && ! shutting_conns_down )
[ # # ]
2902 : : {
2903 : 0 : Error("queue to parent filling up; shutting down heaviest connection");
2904 : :
2905 : 0 : const ChunkedIO::Statistics* stats = 0;
2906 : 0 : unsigned long max = 0;
2907 : 0 : Peer* max_peer = 0;
2908 : :
2909 [ # # ]: 0 : loop_over_list(peers, i)
2910 : : {
2911 [ # # ]: 0 : if ( ! peers[i]->connected )
2912 : 0 : continue;
2913 : :
2914 : 0 : stats = peers[i]->io->Stats();
2915 [ # # ]: 0 : if ( stats->bytes_read > max )
2916 : : {
2917 : 0 : max = stats->bytes_read;
2918 : 0 : max_peer = peers[i];
2919 : : }
2920 : : }
2921 : :
2922 [ # # ]: 0 : if ( max_peer )
2923 : 0 : CloseConnection(max_peer, true);
2924 : :
2925 : 0 : shutting_conns_down = true;
2926 : : }
2927 : :
2928 [ # # ][ # # ]: 0 : if ( ! io->IsFillingUp() && shutting_conns_down )
[ # # ]
2929 : 0 : shutting_conns_down = false;
2930 : :
2931 : : // We cannot rely solely on select() as the there may
2932 : : // be some data left in our input/output queues. So, we use
2933 : : // a small timeout for select and check for data
2934 : : // manually afterwards.
2935 : :
2936 : : static long selects = 0;
2937 : : static long canwrites = 0;
2938 : : static long timeouts = 0;
2939 : :
2940 : 0 : ++selects;
2941 [ # # ]: 0 : if ( io->CanWrite() )
2942 : 0 : ++canwrites;
2943 : :
2944 : : // FIXME: Fine-tune this (timeouts, flush, etc.)
2945 : : struct timeval small_timeout;
2946 : 0 : small_timeout.tv_sec = 0;
2947 : : small_timeout.tv_usec =
2948 [ # # ][ # # ]: 0 : io->CanWrite() || io->CanRead() ? 1 : 10;
2949 : :
2950 : : int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except,
2951 : 0 : &small_timeout);
2952 : :
2953 [ # # ]: 0 : if ( a == 0 )
2954 : 0 : ++timeouts;
2955 : :
2956 [ # # ]: 0 : if ( selects % 100000 == 0 )
2957 : 0 : Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", selects, canwrites, timeouts));
2958 : :
2959 [ # # ]: 0 : if ( a < 0 )
2960 : : // Ignore errors for now.
2961 : 0 : continue;
2962 : :
2963 [ # # ]: 0 : if ( io->CanRead() )
2964 : 0 : ProcessParentMessage();
2965 : :
2966 : 0 : io->Flush();
2967 : :
2968 [ # # ]: 0 : loop_over_list(peers, j)
2969 : : {
2970 : : // We have to be careful here as the peer may
2971 : : // be removed when an error occurs.
2972 : 0 : Peer* current = peers[j];
2973 : 0 : int round = 0;
2974 [ # # ][ # # ]: 0 : while ( ++round <= 10 && j < peers.length() &&
[ # # ][ # # ]
[ # # ][ # # ]
2975 : : peers[j] == current && current->connected &&
2976 : : current->io->CanRead() )
2977 : : {
2978 : 0 : ProcessRemoteMessage(current);
2979 : : }
2980 : : }
2981 : :
2982 [ # # ][ # # ]: 0 : if ( listen_fd_clear >= 0 &&
[ # # ]
2983 : : FD_ISSET(listen_fd_clear, &fd_read) )
2984 : 0 : AcceptConnection(listen_fd_clear);
2985 : :
2986 [ # # ][ # # ]: 0 : if ( listen_fd_ssl >= 0 && FD_ISSET(listen_fd_ssl, &fd_read) )
[ # # ]
2987 : 0 : AcceptConnection(listen_fd_ssl);
2988 : :
2989 : : // Hack to display CPU usage of the child, triggered via
2990 : : // SIGPROF.
2991 : : static unsigned int first_rtime = 0;
2992 [ # # ]: 0 : if ( first_rtime == 0 )
2993 : 0 : first_rtime = (unsigned int) current_time(true);
2994 : :
2995 [ # # ]: 0 : if ( log_prof )
2996 : : {
2997 : 0 : LogProf();
2998 : 0 : log_prof = false;
2999 : : }
3000 : : }
3001 : : }
3002 : :
3003 : 0 : bool SocketComm::ProcessParentMessage()
3004 : : {
3005 [ # # # ]: 0 : switch ( parent_msgstate ) {
3006 : : case TYPE:
3007 : : {
3008 : 0 : parent_peer = 0;
3009 : 0 : parent_msgtype = MSG_NONE;
3010 : :
3011 : : // CMsg follows
3012 : : ChunkedIO::Chunk* c;
3013 [ # # ]: 0 : if ( ! io->Read(&c) )
3014 : : {
3015 [ # # ]: 0 : if ( io->Eof() )
3016 : 0 : Error("parent died", true);
3017 : :
3018 : : Error(fmt("can't read parent's cmsg: %s",
3019 : 0 : io->Error()), true);
3020 : 0 : return false;
3021 : : }
3022 : :
3023 [ # # ]: 0 : if ( ! c )
3024 : 0 : return true;
3025 : :
3026 : 0 : CMsg* msg = (CMsg*) c->data;
3027 : 0 : parent_peer = LookupPeer(msg->Peer(), false);
3028 : 0 : parent_id = msg->Peer();
3029 : 0 : parent_msgtype = msg->Type();
3030 : 0 : parent_args = 0;
3031 : :
3032 [ # # ]: 0 : delete [] c->data;
3033 : 0 : delete c;
3034 : :
3035 [ # # # ]: 0 : switch ( parent_msgtype ) {
3036 : : case MSG_LISTEN_STOP:
3037 : : case MSG_CLOSE:
3038 : : case MSG_CLOSE_ALL:
3039 : : case MSG_TERMINATE:
3040 : : case MSG_PHASE_DONE:
3041 : : case MSG_DEBUG_DUMP:
3042 : : {
3043 : : // No further argument chunk.
3044 : 0 : parent_msgstate = TYPE;
3045 : 0 : return DoParentMessage();
3046 : : }
3047 : :
3048 : : case MSG_LISTEN:
3049 : : case MSG_CONNECT_TO:
3050 : : case MSG_COMPRESS:
3051 : : case MSG_PING:
3052 : : case MSG_PONG:
3053 : : case MSG_REQUEST_EVENTS:
3054 : : case MSG_REQUEST_SYNC:
3055 : : case MSG_SERIAL:
3056 : : case MSG_CAPTURE_FILTER:
3057 : : case MSG_VERSION:
3058 : : case MSG_CAPS:
3059 : : case MSG_SYNC_POINT:
3060 : : case MSG_REMOTE_PRINT:
3061 : : {
3062 : : // One further argument chunk.
3063 : 0 : parent_msgstate = ARGS;
3064 : 0 : return ProcessParentMessage();
3065 : : }
3066 : :
3067 : : default:
3068 : 0 : internal_error(fmt("unknown msg type %d", parent_msgtype));
3069 : : return true;
3070 : : }
3071 : :
3072 : : internal_error("cannot be reached");
3073 : : }
3074 : :
3075 : : case ARGS:
3076 : : {
3077 : : // Argument chunk follows.
3078 : 0 : ChunkedIO::Chunk* c = 0;
3079 [ # # ][ # # ]: 0 : READ_CHUNK(io, c, Error("parent died", true));
[ # # ]
3080 : 0 : parent_args = c;
3081 : 0 : parent_msgstate = TYPE;
3082 : 0 : bool result = DoParentMessage();
3083 : :
3084 [ # # ]: 0 : if ( parent_args )
3085 : : {
3086 [ # # ]: 0 : delete [] parent_args->data;
3087 : 0 : delete parent_args;
3088 : 0 : parent_args = 0;
3089 : : }
3090 : :
3091 : 0 : return result;
3092 : : }
3093 : :
3094 : : default:
3095 : 0 : internal_error("unknown msgstate");
3096 : : }
3097 : :
3098 : : internal_error("cannot be reached");
3099 : : }
3100 : :
3101 : 0 : bool SocketComm::DoParentMessage()
3102 : : {
3103 [ # # # # # : 0 : switch ( parent_msgtype ) {
# # # # #
# # # ]
3104 : :
3105 : : case MSG_LISTEN_STOP:
3106 : : {
3107 [ # # ]: 0 : if ( listen_fd_ssl >= 0 )
3108 : 0 : close(listen_fd_ssl);
3109 : :
3110 [ # # ]: 0 : if ( listen_fd_clear >= 0 )
3111 : 0 : close(listen_fd_clear);
3112 : :
3113 : 0 : listen_fd_clear = listen_fd_ssl = -1;
3114 : 0 : Log("stopped listening");
3115 : :
3116 : 0 : return true;
3117 : : }
3118 : :
3119 : : case MSG_CLOSE:
3120 : : {
3121 [ # # ][ # # ]: 0 : if ( parent_peer && parent_peer->connected )
3122 : 0 : CloseConnection(parent_peer, false);
3123 : 0 : return true;
3124 : : }
3125 : :
3126 : : case MSG_CLOSE_ALL:
3127 : : {
3128 [ # # ]: 0 : loop_over_list(peers, i)
3129 : : {
3130 [ # # ]: 0 : if ( peers[i]->connected )
3131 : 0 : CloseConnection(peers[i], false);
3132 : : }
3133 : 0 : return true;
3134 : : }
3135 : :
3136 : : case MSG_TERMINATE:
3137 : : {
3138 : 0 : terminating = true;
3139 : 0 : CheckFinished();
3140 : 0 : return true;
3141 : : }
3142 : :
3143 : : case MSG_DEBUG_DUMP:
3144 : : {
3145 : : #ifdef DEBUG_COMMUNICATION
3146 : : io->DumpDebugData("comm-dump.child.pipe", true);
3147 : : io->DumpDebugData("comm-dump.child.pipe", false);
3148 : :
3149 : : loop_over_list(peers, j)
3150 : : {
3151 : : RemoteSerializer::PeerID id = peers[j]->id;
3152 : : peers[j]->io->DumpDebugData(fmt("comm-dump.child.peer.%d", id), true);
3153 : : peers[j]->io->DumpDebugData(fmt("comm-dump.child.peer.%d", id), false);
3154 : : }
3155 : : #else
3156 : 0 : internal_error("DEBUG_DUMP support not compiled in");
3157 : : #endif
3158 : : return true;
3159 : : }
3160 : :
3161 : : case MSG_PHASE_DONE:
3162 : : {
3163 : : // No argument block follows.
3164 [ # # ][ # # ]: 0 : if ( parent_peer && parent_peer->connected )
3165 : : {
3166 : 0 : DEBUG_COMM("child: forwarding with MSG_PHASE_DONE to peer");
3167 [ # # ]: 0 : if ( ! SendToPeer(parent_peer, MSG_PHASE_DONE, 0) )
3168 : 0 : return false;
3169 : : }
3170 : 0 : return true;
3171 : : }
3172 : :
3173 : : case MSG_LISTEN:
3174 : 0 : return ProcessListen();
3175 : :
3176 : : case MSG_CONNECT_TO:
3177 : 0 : return ProcessConnectTo();
3178 : :
3179 : : case MSG_COMPRESS:
3180 : 0 : return ProcessParentCompress();
3181 : :
3182 : : case MSG_PING:
3183 : : {
3184 : : // Set time2.
3185 [ # # ]: 0 : assert(parent_args);
3186 : 0 : ping_args* args = (ping_args*) parent_args->data;
3187 : 0 : args->time2 = htond(current_time(true));
3188 : 0 : return ForwardChunkToPeer();
3189 : : }
3190 : :
3191 : : case MSG_PONG:
3192 : : {
3193 [ # # ]: 0 : assert(parent_args);
3194 : : // Calculate time delta.
3195 : 0 : ping_args* args = (ping_args*) parent_args->data;
3196 : 0 : args->time3 = htond(current_time(true) - ntohd(args->time3));
3197 : 0 : return ForwardChunkToPeer();
3198 : : }
3199 : :
3200 : : case MSG_REQUEST_EVENTS:
3201 : : case MSG_REQUEST_SYNC:
3202 : : case MSG_SERIAL:
3203 : : case MSG_CAPTURE_FILTER:
3204 : : case MSG_VERSION:
3205 : : case MSG_CAPS:
3206 : : case MSG_SYNC_POINT:
3207 : : case MSG_REMOTE_PRINT:
3208 [ # # ]: 0 : assert(parent_args);
3209 : 0 : return ForwardChunkToPeer();
3210 : :
3211 : : default:
3212 : 0 : internal_error("ProcessParentMessage: unexpected state");
3213 : : }
3214 : :
3215 : : internal_error("cannot be reached");
3216 : : }
3217 : :
3218 : 0 : bool SocketComm::ForwardChunkToPeer()
3219 : : {
3220 : 0 : char state = parent_msgtype;
3221 : :
3222 [ # # ][ # # ]: 0 : if ( parent_peer && parent_peer->connected )
3223 : : {
3224 : 0 : DEBUG_COMM("child: forwarding with 1 arg to peer");
3225 : :
3226 [ # # ]: 0 : if ( ! SendToPeer(parent_peer, state, 0) )
3227 : 0 : return false;
3228 : :
3229 [ # # ]: 0 : if ( ! SendToPeer(parent_peer, parent_args) )
3230 : 0 : return false;
3231 : :
3232 : 0 : parent_args = 0;
3233 : : }
3234 : : else
3235 : : {
3236 : : #ifdef DEBUG
3237 [ # # ]: 0 : if ( parent_peer )
3238 : 0 : DEBUG_COMM(fmt("child: not connected to #%d", parent_id));
3239 : : #endif
3240 : : }
3241 : :
3242 : 0 : return true;
3243 : : }
3244 : :
3245 : 0 : bool SocketComm::ProcessConnectTo()
3246 : : {
3247 [ # # ]: 0 : assert(parent_args);
3248 : 0 : uint32* args = (uint32*) parent_args->data;
3249 : :
3250 : 0 : Peer* peer = new Peer;
3251 : 0 : peer->id = ntohl(args[0]);
3252 : 0 : peer->ip = ntohl(args[1]);
3253 : 0 : peer->port = ntohl(args[2]);
3254 : 0 : peer->retry = ntohl(args[3]);
3255 : 0 : peer->ssl = ntohl(args[4]);
3256 : :
3257 : 0 : Connect(peer);
3258 : 0 : return true;
3259 : : }
3260 : :
3261 : 0 : bool SocketComm::ProcessListen()
3262 : : {
3263 [ # # ]: 0 : assert(parent_args);
3264 : 0 : uint32* args = (uint32*) parent_args->data;
3265 : :
3266 : 0 : uint32 addr = ntohl(args[0]);
3267 : 0 : uint16 port = uint16(ntohl(args[1]));
3268 : 0 : uint32 ssl = ntohl(args[2]);
3269 : :
3270 : 0 : return Listen(addr, port, ssl);
3271 : : }
3272 : :
3273 : 0 : bool SocketComm::ProcessParentCompress()
3274 : : {
3275 : : #ifndef HAVE_LIBZ
3276 : : internal_error("supposed to enable compression but don't have zlib");
3277 : : return false;
3278 : : #else
3279 : :
3280 [ # # ]: 0 : assert(parent_args);
3281 : 0 : uint32* args = (uint32*) parent_args->data;
3282 : :
3283 : 0 : uint32 level = ntohl(args[0]);
3284 : :
3285 [ # # ]: 0 : if ( ! parent_peer->compressor )
3286 : : {
3287 : 0 : parent_peer->io = new CompressedChunkedIO(parent_peer->io);
3288 : 0 : parent_peer->io->Init();
3289 : 0 : parent_peer->compressor = true;
3290 : : }
3291 : :
3292 : : // Signal compression to peer.
3293 [ # # ]: 0 : if ( ! SendToPeer(parent_peer, MSG_COMPRESS, 0) )
3294 : 0 : return false;
3295 : :
3296 : : // This cast is safe.
3297 : 0 : CompressedChunkedIO* comp_io = (CompressedChunkedIO*) parent_peer->io;
3298 : 0 : comp_io->EnableCompression(level);
3299 : :
3300 : 0 : Log(fmt("enabling compression (level %d)", level), parent_peer);
3301 : :
3302 : 0 : return true;
3303 : : #endif
3304 : : }
3305 : :
3306 : 0 : bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer)
3307 : : {
3308 [ # # ]: 0 : assert(peer);
3309 : :
3310 : 0 : peer->io->Flush();
3311 : :
3312 [ # # # # # : 0 : switch ( peer->state ) {
# ]
3313 : : case MSG_NONE:
3314 : : { // CMsg follows
3315 : : ChunkedIO::Chunk* c;
3316 [ # # ][ # # ]: 0 : READ_CHUNK(peer->io, c,
[ # # ]
3317 : : (CloseConnection(peer, true), peer))
3318 : :
3319 : 0 : CMsg* msg = (CMsg*) c->data;
3320 : :
3321 : 0 : DEBUG_COMM(fmt("child: %s from peer #%d",
3322 : : msgToStr(msg->Type()), peer->id));
3323 : :
3324 [ # # ]: 0 : switch ( msg->Type() ) {
3325 : : case MSG_PHASE_DONE:
3326 : : // No further argument block.
3327 : 0 : DEBUG_COMM("child: forwarding with 0 args to parent");
3328 [ # # ]: 0 : if ( ! SendToParent(msg->Type(), peer, 0) )
3329 : 0 : return false;
3330 : 0 : break;
3331 : :
3332 : : default:
3333 : 0 : peer->state = msg->Type();
3334 : : }
3335 : :
3336 [ # # ]: 0 : delete [] c->data;
3337 : 0 : delete c;
3338 : :
3339 : 0 : break;
3340 : : }
3341 : :
3342 : : case MSG_COMPRESS:
3343 : 0 : ProcessPeerCompress(peer);
3344 : 0 : break;
3345 : :
3346 : : case MSG_PING:
3347 : : {
3348 : : // Messages with one further argument block which we simply
3349 : : // forward to our parent.
3350 : : ChunkedIO::Chunk* c;
3351 [ # # ][ # # ]: 0 : READ_CHUNK(peer->io, c,
[ # # ]
3352 : : (CloseConnection(peer, true), peer))
3353 : :
3354 : : // Set time3.
3355 : 0 : ping_args* args = (ping_args*) c->data;
3356 : 0 : args->time3 = htond(current_time(true));
3357 : 0 : return ForwardChunkToParent(peer, c);
3358 : : }
3359 : :
3360 : : case MSG_PONG:
3361 : : {
3362 : : // Messages with one further argument block which we simply
3363 : : // forward to our parent.
3364 : : ChunkedIO::Chunk* c;
3365 [ # # ][ # # ]: 0 : READ_CHUNK(peer->io, c,
[ # # ]
3366 : : (CloseConnection(peer, true), peer))
3367 : :
3368 : : // Calculate time delta.
3369 : 0 : ping_args* args = (ping_args*) c->data;
3370 : 0 : args->time2 = htond(current_time(true) - ntohd(args->time2));
3371 : 0 : return ForwardChunkToParent(peer, c);
3372 : : }
3373 : :
3374 : : case MSG_REQUEST_EVENTS:
3375 : : case MSG_REQUEST_SYNC:
3376 : : case MSG_SERIAL:
3377 : : case MSG_CAPTURE_FILTER:
3378 : : case MSG_VERSION:
3379 : : case MSG_CAPS:
3380 : : case MSG_SYNC_POINT:
3381 : : case MSG_REMOTE_PRINT:
3382 : : {
3383 : : // Messages with one further argument block which we simply
3384 : : // forward to our parent.
3385 : : ChunkedIO::Chunk* c;
3386 [ # # ][ # # ]: 0 : READ_CHUNK(peer->io, c,
[ # # ]
3387 : : (CloseConnection(peer, true), peer))
3388 : :
3389 : 0 : return ForwardChunkToParent(peer, c);
3390 : : }
3391 : :
3392 : : default:
3393 : 0 : internal_error("ProcessRemoteMessage: unexpected state");
3394 : : }
3395 : :
3396 : 0 : return true;
3397 : : }
3398 : :
3399 : 0 : bool SocketComm::ForwardChunkToParent(Peer* peer, ChunkedIO::Chunk* c)
3400 : : {
3401 : 0 : char state = peer->state;
3402 : 0 : peer->state = MSG_NONE;
3403 : :
3404 : 0 : DEBUG_COMM("child: forwarding message with 1 arg to parent");
3405 : :
3406 [ # # ]: 0 : if ( ! SendToParent(state, peer, 0) )
3407 : 0 : return false;
3408 : :
3409 [ # # ]: 0 : if ( ! SendToParent(c) )
3410 : 0 : return false;
3411 : :
3412 : 0 : io->Flush(); // FIXME: Needed?
3413 : 0 : return true;
3414 : : }
3415 : :
3416 : 0 : bool SocketComm::ProcessPeerCompress(Peer* peer)
3417 : : {
3418 : 0 : peer->state = MSG_NONE;
3419 : :
3420 : : #ifndef HAVE_LIBZ
3421 : : Error("peer compresses although we do not support it", peer);
3422 : : return false;
3423 : : #else
3424 [ # # ]: 0 : if ( ! parent_peer->compressor )
3425 : : {
3426 : 0 : parent_peer->io = new CompressedChunkedIO(parent_peer->io);
3427 : 0 : parent_peer->io->Init();
3428 : 0 : parent_peer->compressor = true;
3429 : : }
3430 : :
3431 : : // This cast is safe here.
3432 : 0 : ((CompressedChunkedIO*) peer->io)->EnableDecompression();
3433 : 0 : Log("enabling decompression", peer);
3434 : 0 : return true;
3435 : : #endif
3436 : : }
3437 : :
3438 : 0 : bool SocketComm::Connect(Peer* peer)
3439 : : {
3440 : : struct sockaddr_in server;
3441 : :
3442 : 0 : int sockfd = socket(PF_INET, SOCK_STREAM, 0);
3443 [ # # ]: 0 : if ( sockfd < 0 )
3444 : : {
3445 : 0 : Error(fmt("can't create socket, %s", strerror(errno)));
3446 : 0 : return false;
3447 : : }
3448 : :
3449 : 0 : bzero(&server, sizeof(server));
3450 : 0 : server.sin_family = AF_INET;
3451 : 0 : server.sin_port = htons(peer->port);
3452 : 0 : server.sin_addr.s_addr = htonl(peer->ip);
3453 : :
3454 : 0 : bool connected = true;
3455 : :
3456 [ # # ]: 0 : if ( connect(sockfd, (sockaddr*) &server, sizeof(server)) < 0 )
3457 : : {
3458 : 0 : Error(fmt("connect failed: %s", strerror(errno)), peer);
3459 : 0 : close(sockfd);
3460 : 0 : connected = false;
3461 : : }
3462 : :
3463 [ # # ][ # # ]: 0 : if ( ! (connected || peer->retry) )
3464 : : {
3465 : 0 : CloseConnection(peer, false);
3466 : 0 : return false;
3467 : : }
3468 : :
3469 : 0 : Peer* existing_peer = LookupPeer(peer->id, false);
3470 [ # # ]: 0 : if ( existing_peer )
3471 : : {
3472 : 0 : *existing_peer = *peer;
3473 : 0 : peer = existing_peer;
3474 : : }
3475 : : else
3476 : 0 : peers.append(peer);
3477 : :
3478 : 0 : peer->connected = connected;
3479 [ # # ]: 0 : peer->next_try = connected ? 0 : time(0) + peer->retry;
3480 : 0 : peer->state = MSG_NONE;
3481 : 0 : peer->io = 0;
3482 : 0 : peer->compressor = false;
3483 : :
3484 [ # # ]: 0 : if ( connected )
3485 : : {
3486 [ # # ]: 0 : if ( peer->ssl )
3487 : : {
3488 : 0 : peer->io = new ChunkedIOSSL(sockfd, false);
3489 : : }
3490 : : else
3491 : 0 : peer->io = new ChunkedIOFd(sockfd, "child->peer");
3492 : :
3493 [ # # ]: 0 : if ( ! peer->io->Init() )
3494 : : {
3495 : : Error(fmt("can't init peer io: %s",
3496 : 0 : peer->io->Error()), peer);
3497 : 0 : return 0;
3498 : : }
3499 : : }
3500 : :
3501 [ # # ]: 0 : if ( connected )
3502 : : {
3503 : 0 : Log("connected", peer);
3504 [ # # ]: 0 : if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
3505 : 0 : return false;
3506 : : }
3507 : :
3508 : 0 : return connected;
3509 : : }
3510 : :
3511 : 0 : bool SocketComm::CloseConnection(Peer* peer, bool reconnect)
3512 : : {
3513 [ # # ]: 0 : if ( ! SendToParent(MSG_CLOSE, peer, 0) )
3514 : 0 : return false;
3515 : :
3516 : 0 : Log("connection closed", peer);
3517 : :
3518 [ # # # # ]: 0 : if ( ! peer->retry || ! reconnect )
3519 : : {
3520 : 0 : peers.remove(peer);
3521 [ # # ]: 0 : delete peer->io; // This will close the fd.
3522 : 0 : delete peer;
3523 : : }
3524 : : else
3525 : : {
3526 [ # # ]: 0 : delete peer->io; // This will close the fd.
3527 : 0 : peer->io = 0;
3528 : 0 : peer->connected = false;
3529 : 0 : peer->next_try = time(0) + peer->retry;
3530 : : }
3531 : :
3532 [ # # ]: 0 : if ( parent_peer == peer )
3533 : : {
3534 : 0 : parent_peer = 0;
3535 : 0 : parent_id = RemoteSerializer::PEER_NONE;
3536 : : }
3537 : :
3538 : 0 : return true;
3539 : : }
3540 : :
3541 : 0 : bool SocketComm::Listen(uint32 ip, uint16 port, bool expect_ssl)
3542 : : {
3543 [ # # ]: 0 : int* listen_fd = expect_ssl ? &listen_fd_ssl : &listen_fd_clear;
3544 : :
3545 [ # # ]: 0 : if ( *listen_fd >= 0 )
3546 : 0 : close(*listen_fd);
3547 : :
3548 : : struct sockaddr_in server;
3549 : :
3550 : 0 : *listen_fd = socket(PF_INET, SOCK_STREAM, 0);
3551 [ # # ]: 0 : if ( *listen_fd < 0 )
3552 : : {
3553 : : Error(fmt("can't create listen socket, %s",
3554 : 0 : strerror(errno)));
3555 : 0 : return false;
3556 : : }
3557 : :
3558 : : // Set SO_REUSEADDR.
3559 : 0 : int turn_on = 1;
3560 [ # # ]: 0 : if ( setsockopt(*listen_fd, SOL_SOCKET, SO_REUSEADDR,
3561 : : &turn_on, sizeof(turn_on)) < 0 )
3562 : : {
3563 : : Error(fmt("can't set SO_REUSEADDR, %s",
3564 : 0 : strerror(errno)));
3565 : 0 : return false;
3566 : : }
3567 : :
3568 : 0 : bzero(&server, sizeof(server));
3569 : 0 : server.sin_family = AF_INET;
3570 : 0 : server.sin_port = htons(port);
3571 : 0 : server.sin_addr.s_addr = htonl(ip);
3572 : :
3573 [ # # ]: 0 : if ( bind(*listen_fd, (sockaddr*) &server, sizeof(server)) < 0 )
3574 : : {
3575 : 0 : Error(fmt("can't bind to port %d, %s", port, strerror(errno)));
3576 : 0 : *listen_fd = -1;
3577 : :
3578 [ # # ]: 0 : if ( errno == EADDRINUSE )
3579 : : {
3580 : 0 : listen_if = ip;
3581 : 0 : listen_port = port;
3582 : 0 : listen_ssl = expect_ssl;
3583 : : // FIXME: Make this timeout configurable.
3584 : 0 : listen_next_try = time(0) + 30;
3585 : : }
3586 : 0 : return false;
3587 : : }
3588 : :
3589 [ # # ]: 0 : if ( listen(*listen_fd, 50) < 0 )
3590 : : {
3591 : 0 : Error(fmt("can't listen, %s", strerror(errno)));
3592 : 0 : return false;
3593 : : }
3594 : :
3595 : 0 : listen_next_try = 0;
3596 : : Log(fmt("listening on %s:%d (%s)",
3597 [ # # ]: 0 : ip2a(ip), port, expect_ssl ? "ssl" : "clear"));
3598 : 0 : return true;
3599 : : }
3600 : :
3601 : 0 : bool SocketComm::AcceptConnection(int fd)
3602 : : {
3603 : : sockaddr_in client;
3604 : 0 : socklen_t len = sizeof(client);
3605 : :
3606 : 0 : int clientfd = accept(fd, (sockaddr*) &client, &len);
3607 [ # # ]: 0 : if ( clientfd < 0 )
3608 : : {
3609 : : Error(fmt("accept failed, %s %d",
3610 : 0 : strerror(errno), errno));
3611 : 0 : return false;
3612 : : }
3613 : :
3614 : 0 : Peer* peer = new Peer;
3615 : 0 : peer->id = id_counter++;
3616 : 0 : peer->ip = ntohl(client.sin_addr.s_addr);
3617 : 0 : peer->port = ntohs(client.sin_port);
3618 : 0 : peer->connected = true;
3619 : 0 : peer->ssl = (fd == listen_fd_ssl);
3620 : 0 : peer->compressor = false;
3621 : :
3622 [ # # ]: 0 : if ( peer->ssl )
3623 : 0 : peer->io = new ChunkedIOSSL(clientfd, true);
3624 : : else
3625 : 0 : peer->io = new ChunkedIOFd(clientfd, "child->peer");
3626 : :
3627 [ # # ]: 0 : if ( ! peer->io->Init() )
3628 : : {
3629 : : Error(fmt("can't init peer io: %s",
3630 : 0 : peer->io->Error()), peer);
3631 : 0 : return false;
3632 : : }
3633 : :
3634 : 0 : peers.append(peer);
3635 : :
3636 [ # # ]: 0 : Log(fmt("accepted %s connection", peer->ssl ? "SSL" : "clear"), peer);
3637 : :
3638 [ # # ]: 0 : if ( ! SendToParent(MSG_CONNECTED, peer, 2, peer->ip, peer->port) )
3639 : 0 : return false;
3640 : :
3641 : 0 : return true;
3642 : : }
3643 : :
3644 : 0 : const char* SocketComm::MakeLogString(const char* msg, Peer* peer)
3645 : : {
3646 : 0 : const int BUFSIZE = 1024;
3647 : : static char* buffer = 0;
3648 : :
3649 [ # # ]: 0 : if ( ! buffer )
3650 : 0 : buffer = new char[BUFSIZE];
3651 : :
3652 : 0 : int len = 0;
3653 : :
3654 [ # # ]: 0 : if ( peer )
3655 : : len = snprintf(buffer, BUFSIZE, "[#%d/%s:%d] ", int(peer->id),
3656 : 0 : ip2a(peer->ip), peer->port);
3657 : :
3658 : 0 : len += safe_snprintf(buffer + len, BUFSIZE - len, "%s", msg);
3659 : 0 : return buffer;
3660 : : }
3661 : :
3662 : 0 : void SocketComm::Error(const char* msg, bool kill_me)
3663 : : {
3664 [ # # ]: 0 : if ( kill_me )
3665 : : {
3666 : 0 : fprintf(stderr, "fatal error in child: %s\n", msg);
3667 : 0 : Kill();
3668 : : }
3669 : : else
3670 : : {
3671 [ # # ]: 0 : if ( io->Eof() )
3672 : : // Can't send to parent, so fall back to stderr.
3673 : 0 : fprintf(stderr, "error in child: %s", msg);
3674 : : else
3675 : 0 : SendToParent(MSG_ERROR, 0, copy_string(msg));
3676 : : }
3677 : :
3678 : 0 : DEBUG_COMM(fmt("child: %s", msg));
3679 : 0 : }
3680 : :
3681 : 0 : bool SocketComm::Error(const char* msg, Peer* peer)
3682 : : {
3683 : 0 : const char* buffer = MakeLogString(msg, peer);
3684 : 0 : Error(buffer);
3685 : :
3686 : : // If a remote peer causes an error, we shutdown the connection
3687 : : // as resynchronizing is in general not possible. But we may
3688 : : // try again later.
3689 [ # # ]: 0 : if ( peer->connected )
3690 : 0 : CloseConnection(peer, true);
3691 : :
3692 : 0 : return true;
3693 : : }
3694 : :
3695 : 0 : void SocketComm::Log(const char* msg, Peer* peer)
3696 : : {
3697 : 0 : const char* buffer = MakeLogString(msg, peer);
3698 : 0 : SendToParent(MSG_LOG, 0, copy_string(buffer));
3699 : 0 : DEBUG_COMM(fmt("child: %s", buffer));
3700 : 0 : }
3701 : :
3702 : 0 : void SocketComm::Kill()
3703 : : {
3704 [ # # ]: 0 : if ( killing )
3705 : : // Ignore recursive calls.
3706 : 0 : return;
3707 : :
3708 : 0 : killing = true;
3709 : :
3710 : 0 : LogProf();
3711 : 0 : Log("terminating");
3712 : :
3713 : 0 : close(listen_fd_clear);
3714 : 0 : close(listen_fd_ssl);
3715 : :
3716 : 0 : kill(getpid(), SIGTERM);
3717 : :
3718 : 0 : while ( 1 )
3719 : : ; // loop until killed
3720 : : }
3721 : :
3722 : : SocketComm::Peer* SocketComm::LookupPeer(RemoteSerializer::PeerID id,
3723 : 0 : bool only_if_connected)
3724 : : {
3725 [ # # ]: 0 : loop_over_list(peers, i)
3726 [ # # ]: 0 : if ( peers[i]->id == id )
3727 : : return ! only_if_connected ||
3728 [ # # ][ # # ]: 0 : peers[i]->connected ? peers[i] : 0;
3729 : 0 : return 0;
3730 : : }
3731 : :
3732 : 0 : bool SocketComm::LogStats()
3733 : : {
3734 [ # # ]: 0 : if ( ! peers.length() )
3735 : 0 : return true;
3736 : :
3737 : : // Concat stats of all peers into single buffer.
3738 : 0 : char* buffer = new char[peers.length() * 512];
3739 : 0 : int pos = 0;
3740 : :
3741 [ # # ]: 0 : loop_over_list(peers, i)
3742 : : {
3743 [ # # ]: 0 : if ( peers[i]->connected )
3744 : 0 : peers[i]->io->Stats(buffer+pos, 512);
3745 : : else
3746 : 0 : strcpy(buffer+pos, "not connected");
3747 : 0 : pos += strlen(buffer+pos) + 1;
3748 : : }
3749 : :
3750 : : // Send it.
3751 [ # # ]: 0 : if ( ! SendToParent(MSG_STATS, 0, buffer, pos) )
3752 : 0 : return false;
3753 : :
3754 : 0 : log_stats = false;
3755 : 0 : alarm(STATS_INTERVAL);
3756 : 0 : return true;
3757 : : }
3758 : :
3759 : 0 : bool SocketComm::LogProf()
3760 : : {
3761 : : static struct rusage cld_res;
3762 : 0 : getrusage(RUSAGE_SELF, &cld_res);
3763 : :
3764 : 0 : double Utime = cld_res.ru_utime.tv_sec + cld_res.ru_utime.tv_usec / 1e6;
3765 : 0 : double Stime = cld_res.ru_stime.tv_sec + cld_res.ru_stime.tv_usec / 1e6;
3766 : 0 : double Rtime = current_time(true);
3767 : :
3768 : : SocketComm::Log(fmt("CPU usage: user %.03f sys %.03f real %0.03f",
3769 : 0 : Utime, Stime, Rtime - first_rtime));
3770 : :
3771 : 0 : return true;
3772 : : }
3773 : :
3774 : 0 : void SocketComm::CheckFinished()
3775 : : {
3776 [ # # ]: 0 : assert(terminating);
3777 : :
3778 [ # # ]: 0 : loop_over_list(peers, i)
3779 : : {
3780 [ # # ]: 0 : if ( ! peers[i]->connected )
3781 : 0 : continue;
3782 [ # # ]: 0 : if ( ! peers[i]->io->IsIdle() )
3783 : 0 : return;
3784 : : }
3785 : :
3786 : 0 : LogProf();
3787 : 0 : Log("terminating");
3788 : :
3789 : : // All done.
3790 : 0 : SendToParent(MSG_TERMINATE, 0, 0);
3791 : : }
3792 : :
3793 : 0 : bool SocketComm::SendToParent(char type, Peer* peer, const char* str, int len)
3794 : : {
3795 : : #ifdef DEBUG
3796 : : // str may already by constructed with fmt()
3797 : 0 : const char* tmp = copy_string(str);
3798 [ # # ]: 0 : DEBUG_COMM(fmt("child: (->parent) %s (#%d, %s)", msgToStr(type), peer ? peer->id : RemoteSerializer::PEER_NONE, tmp));
3799 [ # # ]: 0 : delete [] tmp;
3800 : : #endif
3801 [ # # ][ # # ]: 0 : if ( sendToIO(io, type, peer ? peer->id : RemoteSerializer::PEER_NONE,
3802 : : str, len) )
3803 : 0 : return true;
3804 : :
3805 [ # # ]: 0 : if ( io->Eof() )
3806 : 0 : Error("parent died", true);
3807 : :
3808 : 0 : return false;
3809 : : }
3810 : :
3811 : 0 : bool SocketComm::SendToParent(char type, Peer* peer, int nargs, ...)
3812 : : {
3813 : : va_list ap;
3814 : :
3815 : : #ifdef DEBUG
3816 : 0 : va_start(ap,nargs);
3817 [ # # ]: 0 : DEBUG_COMM(fmt("child: (->parent) %s (#%d,%s)", msgToStr(type), peer ? peer->id : RemoteSerializer::PEER_NONE, fmt_uint32s(nargs, ap)));
3818 : 0 : va_end(ap);
3819 : : #endif
3820 : :
3821 : 0 : va_start(ap, nargs);
3822 : : bool ret = sendToIO(io, type,
3823 : : peer ? peer->id : RemoteSerializer::PEER_NONE,
3824 [ # # ]: 0 : nargs, ap);
3825 : 0 : va_end(ap);
3826 : :
3827 [ # # ]: 0 : if ( ret )
3828 : 0 : return true;
3829 : :
3830 [ # # ]: 0 : if ( io->Eof() )
3831 : 0 : Error("parent died", true);
3832 : :
3833 : 0 : return false;
3834 : : }
3835 : :
3836 : 0 : bool SocketComm::SocketComm::SendToParent(ChunkedIO::Chunk* c)
3837 : : {
3838 : 0 : DEBUG_COMM(fmt("child: (->parent) chunk of size %d", c->len));
3839 [ # # ]: 0 : if ( sendToIO(io, c) )
3840 : 0 : return true;
3841 : :
3842 [ # # ]: 0 : if ( io->Eof() )
3843 : 0 : Error("parent died", true);
3844 : :
3845 : 0 : return false;
3846 : : }
3847 : :
3848 : 0 : bool SocketComm::SendToPeer(Peer* peer, char type, const char* str, int len)
3849 : : {
3850 : : #ifdef DEBUG
3851 : : // str may already by constructed with fmt()
3852 : 0 : const char* tmp = copy_string(str);
3853 : 0 : DEBUG_COMM(fmt("child: (->peer) %s to #%d (%s)", msgToStr(type), peer->id, tmp));
3854 [ # # ]: 0 : delete [] tmp;
3855 : : #endif
3856 : :
3857 [ # # ]: 0 : if ( ! sendToIO(peer->io, type, RemoteSerializer::PEER_NONE, str, len) )
3858 : : {
3859 : 0 : Error(fmt("child: write error %s", io->Error()), peer);
3860 : 0 : return false;
3861 : : }
3862 : :
3863 : 0 : return true;
3864 : : }
3865 : :
3866 : 0 : bool SocketComm::SendToPeer(Peer* peer, char type, int nargs, ...)
3867 : : {
3868 : : va_list ap;
3869 : :
3870 : : #ifdef DEBUG
3871 : 0 : va_start(ap,nargs);
3872 : 0 : DEBUG_COMM(fmt("child: (->peer) %s to #%d (%s)",
3873 : : msgToStr(type), peer->id, fmt_uint32s(nargs, ap)));
3874 : 0 : va_end(ap);
3875 : : #endif
3876 : :
3877 : 0 : va_start(ap, nargs);
3878 : : bool ret = sendToIO(peer->io, type, RemoteSerializer::PEER_NONE,
3879 : 0 : nargs, ap);
3880 : 0 : va_end(ap);
3881 : :
3882 [ # # ]: 0 : if ( ! ret )
3883 : : {
3884 : 0 : Error(fmt("child: write error %s", io->Error()), peer);
3885 : 0 : return false;
3886 : : }
3887 : :
3888 : 0 : return true;
3889 : : }
3890 : :
3891 : 0 : bool SocketComm::SendToPeer(Peer* peer, ChunkedIO::Chunk* c)
3892 : : {
3893 : 0 : DEBUG_COMM(fmt("child: (->peer) chunk of size %d to #%d", c->len, peer->id));
3894 [ # # ]: 0 : if ( ! sendToIO(peer->io, c) )
3895 : : {
3896 : 0 : Error(fmt("child: write error %s", io->Error()), peer);
3897 : 0 : return false;
3898 : : }
3899 : :
3900 : 0 : return true;
3901 [ + - ][ + - ]: 6 : }
|