Branch data Line data Source code
1 : : // $Id: RemoteSerializer.h 6951 2009-12-04 22:23:28Z vern $
2 : : //
3 : : // Communication between two Bro's.
4 : :
5 : : #ifndef REMOTE_SERIALIZER
6 : : #define REMOTE_SERIALIZER
7 : :
8 : : #include "Dict.h"
9 : : #include "List.h"
10 : : #include "Serializer.h"
11 : : #include "IOSource.h"
12 : : #include "Stats.h"
13 : : #include "File.h"
14 : :
15 : : // All IP arguments are in host byte-order.
16 : : // FIXME: Change this to network byte order
17 : :
18 : : class IncrementalSendTimer;
19 : :
20 : : // This class handles the communication done in Bro's main loop.
21 : : class RemoteSerializer : public Serializer, public IOSource {
22 : : public:
23 : : RemoteSerializer();
24 : : virtual ~RemoteSerializer();
25 : :
26 : : // Initialize the remote serializer (calling this will fork).
27 : : void Init();
28 : :
29 : : // FIXME: Use SourceID directly (or rename everything to Peer*).
30 : : typedef SourceID PeerID;
31 : : static const PeerID PEER_LOCAL = SOURCE_LOCAL;
32 : : static const PeerID PEER_NONE = SOURCE_LOCAL;
33 : :
34 : : // Connect to host (returns PEER_NONE on error).
35 : : PeerID Connect(addr_type ip, uint16 port, const char* our_class, double retry, bool use_ssl);
36 : :
37 : : // Request all events matching pattern from remote side.
38 : : bool RequestEvents(PeerID peer, RE_Matcher* pattern);
39 : :
40 : : // Request synchronization of IDs with remote side. If auth is true,
41 : : // we consider our current state to authoritative and send it to
42 : : // the peer right after the handshake.
43 : : bool RequestSync(PeerID peer, bool auth);
44 : :
45 : : // Sets flag whether we're accepting state from this peer
46 : : // (default: yes).
47 : : bool SetAcceptState(PeerID peer, bool accept);
48 : :
49 : : // Sets compression level (0-9, 0 is defaults and means no compression)
50 : : bool SetCompressionLevel(PeerID peer, int level);
51 : :
52 : : // Signal the other side that we have finished our part of
53 : : // the initial handshake.
54 : : bool CompleteHandshake(PeerID peer);
55 : :
56 : : // Start to listen.
57 : : bool Listen(addr_type ip, uint16 port, bool expect_ssl);
58 : :
59 : : // Stop it.
60 : : bool StopListening();
61 : :
62 : : // Broadcast the event/function call.
63 : : bool SendCall(SerialInfo* info, const char* name, val_list* vl);
64 : :
65 : : // Send the event/function call (only if handshake completed).
66 : : bool SendCall(SerialInfo* info, PeerID peer, const char* name, val_list* vl);
67 : :
68 : : // Broadcasts the access (only if handshake completed).
69 : : bool SendAccess(SerialInfo* info, const StateAccess& access);
70 : :
71 : : // Send the access.
72 : : bool SendAccess(SerialInfo* info, PeerID pid, const StateAccess& access);
73 : :
74 : : // Sends ID.
75 : : bool SendID(SerialInfo* info, PeerID peer, const ID& id);
76 : :
77 : : // Sends the internal connection state.
78 : : bool SendConnection(SerialInfo* info, PeerID peer, const Connection& c);
79 : :
80 : : // Send capture filter.
81 : : bool SendCaptureFilter(PeerID peer, const char* filter);
82 : :
83 : : // Send packet.
84 : : bool SendPacket(SerialInfo* info, PeerID peer, const Packet& p);
85 : :
86 : : // Broadcast packet.
87 : : bool SendPacket(SerialInfo* info, const Packet& p);
88 : :
89 : : // Broadcast ping.
90 : : bool SendPing(PeerID peer, uint32 seq);
91 : :
92 : : // Broadcast remote print.
93 : : bool SendPrintHookEvent(BroFile* f, const char* txt);
94 : :
95 : : // Synchronzizes time with all connected peers. Returns number of
96 : : // current sync-point, or -1 on error.
97 : : uint32 SendSyncPoint();
98 : : void SendFinalSyncPoint();
99 : :
100 : : // Registers the ID to be &synchronized.
101 : : void Register(ID* id);
102 : : void Unregister(ID* id);
103 : :
104 : : // Stop/restart propagating state updates.
105 : 0 : void SuspendStateUpdates() { --propagate_accesses; }
106 : 0 : void ResumeStateUpdates() { ++propagate_accesses; }
107 : :
108 : : // Check for incoming events and queue them.
109 : : bool Poll(bool may_block);
110 : :
111 : : // Returns the corresponding record (already ref'ed).
112 : : RecordVal* GetPeerVal(PeerID id);
113 : :
114 : : // Log some statistics.
115 : : void LogStats();
116 : :
117 : : // Return a 0-terminated array of built-in functions which,
118 : : // when referenced, trigger the remote serializer's initialization.
119 : : const char* const* GetBuiltins() const;
120 : :
121 : : // Tries to sent out all remaining data.
122 : : // FIXME: Do we still need this?
123 : : void Finish();
124 : :
125 : : // Overidden from IOSource:
126 : : virtual void GetFds(int* read, int* write, int* except);
127 : : virtual double NextTimestamp(double* local_network_time);
128 : : virtual void Process();
129 : : virtual TimerMgr::Tag* GetCurrentTag();
130 : 0 : virtual const char* Tag() { return "RemoteSerializer"; }
131 : :
132 : : // Gracefully finishes communication by first making sure that all
133 : : // remaining data (parent & child) has been sent out.
134 : : virtual bool Terminate();
135 : :
136 : : #ifdef DEBUG_COMMUNICATION
137 : : // Dump data recently read/written into files.
138 : : void DumpDebugData();
139 : :
140 : : // Read dump file and interpret as message block.
141 : : void ReadDumpAsMessageType(const char* file);
142 : :
143 : : // Read dump file and interpret as serialization.
144 : : void ReadDumpAsSerialization(const char* file);
145 : : #endif
146 : :
147 : : enum LogLevel { LogInfo = 1, LogError = 2, };
148 : : static void Log(LogLevel level, const char* msg);
149 : :
150 : : protected:
151 : : friend class PersistenceSerializer;
152 : : friend class IncrementalSendTimer;
153 : :
154 : : // Maximum size of serialization caches.
155 : : static const unsigned int MAX_CACHE_SIZE = 3000;
156 : :
157 : : // When syncing traces in pseudo-realtime mode, we wait this many
158 : : // seconds after the final sync-point to make sure that all
159 : : // remaining I/O gets propagated.
160 : : static const unsigned int FINAL_SYNC_POINT_DELAY = 5;
161 : :
162 : 0 : declare(PList, EventHandler);
163 : : typedef PList(EventHandler) handler_list;
164 : :
165 : 0 : struct Peer {
166 : : PeerID id; // Unique ID (non-zero) per peer.
167 : :
168 : : // ### Fix: currently, we only work for IPv4.
169 : : // addr_type ip;
170 : : uint32 ip;
171 : :
172 : : uint16 port;
173 : : handler_list handlers;
174 : : RecordVal* val; // Record of type event_source.
175 : : SerializationCache* cache_in; // One cache for each direction.
176 : : SerializationCache* cache_out;
177 : :
178 : : // TCP-level state of the connection to the peer.
179 : : // State of the connection to the peer.
180 : : enum { INIT, PENDING, CONNECTED, CLOSING, CLOSED } state;
181 : :
182 : : // Current protocol phase of the connection (see RemoteSerializer.cc)
183 : : enum { UNKNOWN, SETUP, HANDSHAKE, SYNC, RUNNING } phase;
184 : :
185 : : // Capabilities.
186 : : static const int COMPRESSION = 1;
187 : : static const int NO_CACHING = 2;
188 : : static const int PID_64BIT = 4;
189 : : static const int NEW_CACHE_STRATEGY = 8;
190 : :
191 : : // Constants to remember to who did something.
192 : : static const int NONE = 0;
193 : : static const int WE = 1;
194 : : static const int PEER = 2;
195 : : static const int BOTH = WE | PEER;
196 : :
197 : : static const int AUTH_WE = 4;
198 : : static const int AUTH_PEER = 8;
199 : :
200 : : int sent_version; // Who has sent the VERSION.
201 : : int handshake_done; // Who finished its handshake phase.
202 : : int sync_requested; // Who requested sync'ed state.
203 : :
204 : : bool orig; // True if we connected to the peer.
205 : : bool accept_state; // True if we accept state from peer.
206 : : bool send_state; // True if we're supposed to initially sent our state.
207 : : int comp_level; // Compression level.
208 : :
209 : : // True if this peer triggered a net_suspend_processing().
210 : : bool suspended_processing;
211 : :
212 : : uint32 caps; // Capabilities announced by peer.
213 : : int runtime; // Runtime we got from the peer.
214 : : int our_runtime; // Our runtime as we told it to this peer.
215 : : string peer_class; // Class from peer ("" = no class).
216 : : string our_class; // Class we send the peer.
217 : : uint32 sync_point; // Highest sync-point received so far
218 : : char* print_buffer; // Buffer for remote print or null.
219 : : int print_buffer_used; // Number of bytes used in buffer.
220 : : };
221 : :
222 : : // Shuts down remote serializer.
223 : : void FatalError(const char* msg);
224 : :
225 : : enum LogSrc { LogChild = 1, LogParent = 2, LogScript = 3, };
226 : :
227 : : static void Log(LogLevel level, const char* msg, Peer* peer, LogSrc src = LogParent);
228 : :
229 : : virtual void ReportError(const char* msg);
230 : :
231 : : virtual void GotEvent(const char* name, double time,
232 : : EventHandlerPtr event, val_list* args);
233 : : virtual void GotFunctionCall(const char* name, double time,
234 : : Func* func, val_list* args);
235 : : virtual void GotID(ID* id, Val* val);
236 : : virtual void GotStateAccess(StateAccess* s);
237 : : virtual void GotTimer(Timer* t);
238 : : virtual void GotConnection(Connection* c);
239 : : virtual void GotPacket(Packet* packet);
240 : :
241 : : void Fork();
242 : :
243 : : bool DoMessage();
244 : : bool ProcessConnected();
245 : : bool ProcessSerialization();
246 : : bool ProcessRequestEventsMsg();
247 : : bool ProcessRequestSyncMsg();
248 : : bool ProcessVersionMsg();
249 : : bool ProcessLogMsg(bool is_error);
250 : : bool ProcessStatsMsg();
251 : : bool ProcessCaptureFilterMsg();
252 : : bool ProcessPhaseDone();
253 : : bool ProcessPingMsg();
254 : : bool ProcessPongMsg();
255 : : bool ProcessCapsMsg();
256 : : bool ProcessSyncPointMsg();
257 : : bool ProcessRemotePrint();
258 : :
259 : : Peer* AddPeer(uint32 ip, uint16 port, PeerID id = PEER_NONE);
260 : : Peer* LookupPeer(PeerID id, bool only_if_connected);
261 : : void RemovePeer(Peer* peer);
262 : : bool IsConnectedPeer(PeerID id);
263 : : void PeerDisconnected(Peer* peer);
264 : : void PeerConnected(Peer* peer);
265 : : RecordVal* MakePeerVal(Peer* peer);
266 : : bool HandshakeDone(Peer* peer);
267 : : bool IsActive();
268 : : void SetupSerialInfo(SerialInfo* info, Peer* peer);
269 : : bool CheckSyncPoints();
270 : : void SendSyncPoint(uint32 syncpoint);
271 : 2 : bool PropagateAccesses()
272 : : {
273 : : return ignore_accesses ?
274 [ - + ]: 2 : propagate_accesses > 1 : propagate_accesses > 0;
275 : : }
276 : :
277 : : bool CloseConnection(Peer* peer);
278 : :
279 : : bool SendAllSynchronized(Peer* peer, SerialInfo* info);
280 : : bool SendCall(SerialInfo* info, Peer* peer, const char* name, val_list* vl);
281 : : bool SendAccess(SerialInfo* info, Peer* peer, const StateAccess& access);
282 : : bool SendID(SerialInfo* info, Peer* peer, const ID& id);
283 : : bool SendCapabilities(Peer* peer);
284 : : bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p);
285 : :
286 : : void UnregisterHandlers(Peer* peer);
287 : : void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0);
288 : : bool EnterPhaseRunning(Peer* peer);
289 : : bool FlushPrintBuffer(Peer* p);
290 : :
291 : : void ChildDied();
292 : : void InternalCommError(const char* msg);
293 : :
294 : : // Communication helpers
295 : : bool SendCMsgToChild(char msg_type, Peer* peer);
296 : : bool SendToChild(char type, Peer* peer, char* str, int len = -1);
297 : : bool SendToChild(char type, Peer* peer, int nargs, ...); // can send uints32 only
298 : : bool SendToChild(ChunkedIO::Chunk* c);
299 : :
300 : : private:
301 : : enum { TYPE, ARGS } msgstate; // current state of reading comm.
302 : : Peer* current_peer;
303 : : PeerID current_id;
304 : : char current_msgtype;
305 : : ChunkedIO::Chunk* current_args;
306 : :
307 : : id_list sync_ids;
308 : :
309 : : // FIXME: Check which of these are necessary...
310 : : bool initialized;
311 : : bool listening;
312 : : int propagate_accesses;
313 : : bool ignore_accesses;
314 : : bool terminating;
315 : : Peer* source_peer;
316 : : PeerID id_counter; // Keeps track of assigned IDs.
317 : : uint32 current_sync_point;
318 : : bool syncing_times;
319 : :
320 : 8 : declare(PList, Peer);
321 : : typedef PList(Peer) peer_list;
322 : : peer_list peers;
323 : :
324 : : Peer* in_sync; // Peer we're currently syncing state with.
325 : : peer_list sync_pending; // List of peers waiting to sync state.
326 : :
327 : : // Event buffer
328 : 0 : struct BufferedEvent {
329 : : time_t time;
330 : : PeerID src;
331 : : EventHandlerPtr handler;
332 : : val_list* args;
333 : : };
334 : :
335 : 4 : declare(PList, BufferedEvent);
336 : : typedef PList(BufferedEvent) EventQueue;
337 : : EventQueue events;
338 : :
339 : : // Packet buffer
340 : : struct BufferedPacket {
341 : : time_t time;
342 : : Packet* p;
343 : : };
344 : :
345 : 4 : declare(PList, BufferedPacket);
346 : : typedef PList(BufferedPacket) PacketQueue;
347 : : PacketQueue packets;
348 : :
349 : : // Some stats
350 : 3 : struct Statistics {
351 : : struct Pair {
352 : 15 : Pair() : in(0), out(0) {}
353 : : unsigned long in;
354 : : unsigned long out;
355 : : };
356 : :
357 : : Pair events; // actually events and function calls
358 : : Pair accesses;
359 : : Pair conns;
360 : : Pair packets;
361 : : Pair ids;
362 : : } stats;
363 : :
364 : : };
365 : :
366 : : // This class handles the communication done in the forked child.
367 : : class SocketComm {
368 : : public:
369 : : SocketComm();
370 : : ~SocketComm();
371 : :
372 : 0 : void SetParentIO(ChunkedIO* arg_io) { io = arg_io; }
373 : :
374 : : void Run(); // does not return
375 : :
376 : : // Log some statistics (via pipe to parent).
377 : : bool LogStats();
378 : :
379 : : // Log CPU usage (again via pipe to parent).
380 : : bool LogProf();
381 : :
382 : : protected:
383 : : struct Peer {
384 : 0 : Peer()
385 : : {
386 : 0 : id = 0;
387 : 0 : io = 0;
388 : 0 : ip = 0;
389 : 0 : port = 0;
390 : 0 : state = 0;
391 : 0 : connected = false;
392 : 0 : ssl = false;
393 : 0 : retry = 0;
394 : 0 : next_try = 0;
395 : 0 : compressor = false;
396 : 0 : }
397 : :
398 : : RemoteSerializer::PeerID id;
399 : : ChunkedIO* io;
400 : : uint32 ip;
401 : : uint16 port;
402 : : char state;
403 : : bool connected;
404 : : bool ssl;
405 : : // If we get disconnected, reconnect after this many seconds.
406 : : int retry;
407 : : // Time of next connection attempt (0 if none).
408 : : time_t next_try;
409 : : // True if io is a CompressedChunkedIO.
410 : : bool compressor;
411 : : };
412 : :
413 : : bool Listen(uint32 ip, uint16 port, bool expect_ssl);
414 : : bool AcceptConnection(int listen_fd);
415 : : bool Connect(Peer* peer);
416 : : bool CloseConnection(Peer* peer, bool reconnect);
417 : :
418 : : Peer* LookupPeer(RemoteSerializer::PeerID id, bool only_if_connected);
419 : :
420 : : bool ProcessRemoteMessage(Peer* peer);
421 : : bool ProcessParentMessage();
422 : : bool DoParentMessage();
423 : :
424 : : bool ProcessListen();
425 : : bool ProcessConnectTo();
426 : : bool ProcessCompress();
427 : :
428 : : void Log(const char* msg, Peer* peer = 0);
429 : :
430 : : // The connection to the peer will be closed.
431 : : bool Error(const char* msg, Peer* peer);
432 : :
433 : : // If kill is true, this is a fatal error and we kill ourselves.
434 : : void Error(const char* msg, bool kill = false);
435 : :
436 : : // Kill the current process.
437 : : void Kill();
438 : :
439 : : // Check whether everything has been sent out.
440 : : void CheckFinished();
441 : :
442 : : // Communication helpers.
443 : : bool SendToParent(char type, Peer* peer, const char* str, int len = -1);
444 : : bool SendToParent(char type, Peer* peer, int nargs, ...); // can send uints32 only
445 : : bool SendToParent(ChunkedIO::Chunk* c);
446 : : bool SendToPeer(Peer* peer, char type, const char* str, int len = -1);
447 : : bool SendToPeer(Peer* peer, char type, int nargs, ...); // can send uints32 only
448 : : bool SendToPeer(Peer* peer, ChunkedIO::Chunk* c);
449 : : bool ProcessParentCompress();
450 : : bool ProcessPeerCompress(Peer* peer);
451 : : bool ForwardChunkToParent(Peer* p, ChunkedIO::Chunk* c);
452 : : bool ForwardChunkToPeer();
453 : : const char* MakeLogString(const char* msg, Peer *peer);
454 : :
455 : : // Peers we are communicating with:
456 : 0 : declare(PList, Peer);
457 : : typedef PList(Peer) peer_list;
458 : :
459 : : RemoteSerializer::PeerID id_counter;
460 : : peer_list peers;
461 : :
462 : : ChunkedIO* io; // I/O to parent
463 : :
464 : : // Current state of reading from parent.
465 : : enum { TYPE, ARGS } parent_msgstate;
466 : : Peer* parent_peer;
467 : : RemoteSerializer::PeerID parent_id;
468 : : char parent_msgtype;
469 : : ChunkedIO::Chunk* parent_args;
470 : :
471 : : int listen_fd_clear;
472 : : int listen_fd_ssl;
473 : :
474 : : // If the port we're trying to bind to is already in use, we will retry
475 : : // it regularly.
476 : : uint32 listen_if; // Fix: only supports IPv4
477 : : uint16 listen_port;
478 : : bool listen_ssl;
479 : : time_t listen_next_try;
480 : : bool shutting_conns_down;
481 : : bool terminating;
482 : : bool killing;
483 : : };
484 : :
485 : : extern RemoteSerializer* remote_serializer;
486 : :
487 : : #endif
|