Branch data Line data Source code
1 : : // $Id: PersistenceSerializer.cc 6752 2009-06-14 04:24:52Z vern $
2 : :
3 : : #include <fcntl.h>
4 : : #include <unistd.h>
5 : : #include <errno.h>
6 : : #include <time.h>
7 : : #include <dirent.h>
8 : : #include <libgen.h>
9 : : #include <sys/time.h>
10 : : #include <sys/stat.h>
11 : :
12 : : #include "PersistenceSerializer.h"
13 : : #include "RemoteSerializer.h"
14 : : #include "Conn.h"
15 : : #include "Event.h"
16 : : #include "Logger.h"
17 : : #include "Net.h"
18 : :
19 [ # # ][ # # ]: 0 : class IncrementalWriteTimer : public Timer {
20 : : public:
21 : 0 : IncrementalWriteTimer(double t, PersistenceSerializer::SerialStatus* s)
22 : 0 : : Timer(t, TIMER_INCREMENTAL_WRITE), status(s) {}
23 : :
24 : : void Dispatch(double t, int is_expire);
25 : :
26 : : PersistenceSerializer::SerialStatus* status;
27 : : };
28 : :
29 : 0 : void IncrementalWriteTimer::Dispatch(double t, int is_expire)
30 : : {
31 : : // Never suspend when we're finishing up.
32 [ # # ]: 0 : if ( terminating )
33 : 0 : status->info.may_suspend = false;
34 : :
35 : 0 : persistence_serializer->RunSerialization(status);
36 : 0 : }
37 : :
38 : 3 : PersistenceSerializer::PersistenceSerializer()
39 : : {
40 : 3 : dir = 0;
41 : 3 : }
42 : :
43 : 1 : PersistenceSerializer::~PersistenceSerializer()
44 : : {
45 [ + - ][ # # ]: 1 : }
[ # # ]
46 : :
47 : 32 : void PersistenceSerializer::Register(ID* id)
48 : : {
49 [ - + ]: 32 : if ( id->Type()->Tag() == TYPE_FUNC )
50 : : {
51 : 0 : Error("can't register functions as persistent ID");
52 : 0 : return;
53 : : }
54 : :
55 : 32 : DBG_LOG(DBG_STATE, "&persistent %s", id->Name());
56 : :
57 : 32 : HashKey key(id->Name());
58 [ + + ]: 32 : if ( persistent_ids.Lookup(&key) )
59 : 32 : return;
60 : :
61 : 16 : Ref(id);
62 [ + + ]: 32 : persistent_ids.Insert(&key, id);
63 : : }
64 : :
65 : 0 : void PersistenceSerializer::Unregister(ID* id)
66 : : {
67 : 0 : HashKey key(id->Name());
68 : 0 : Unref((ID*) persistent_ids.Remove(&key));
69 : 0 : }
70 : :
71 : 0 : void PersistenceSerializer::Register(Connection* conn)
72 : : {
73 [ # # ]: 0 : if ( persistent_conns.Lookup(conn->Key()) )
74 : 0 : return;
75 : :
76 : 0 : Ref(conn);
77 : 0 : HashKey* k = conn->Key();
78 : 0 : HashKey* new_key = new HashKey(k->Key(), k->Size(), k->Hash());
79 : 0 : persistent_conns.Insert(new_key, conn);
80 [ # # ]: 0 : delete new_key;
81 : : }
82 : :
83 : 0 : void PersistenceSerializer::Unregister(Connection* conn)
84 : : {
85 : 0 : Unref(persistent_conns.RemoveEntry(conn->Key()));
86 : 0 : }
87 : :
88 : 3 : bool PersistenceSerializer::CheckTimestamp(const char* file)
89 : : {
90 : : struct stat s;
91 [ + + ]: 3 : if ( stat(file, &s) < 0 )
92 : 1 : return false;
93 : :
94 [ - + ]: 2 : if ( ! S_ISREG(s.st_mode) )
95 : 0 : return false;
96 : :
97 : 2 : bool changed = true;
98 : :
99 : 2 : HashKey* key = new HashKey(file, strlen(file));
100 : 2 : time_t* t = files.Lookup(key);
101 : :
102 [ + + ]: 2 : if ( ! t )
103 : : {
104 : 1 : t = (time_t*) malloc(sizeof(time_t));
105 [ - + ]: 1 : if ( ! t )
106 : 0 : out_of_memory("saving file timestamp");
107 : 1 : files.Insert(key, t);
108 : : }
109 : :
110 [ - + ]: 1 : else if ( *t >= s.st_mtime )
111 : 0 : changed = false;
112 : :
113 : 2 : *t = s.st_mtime;
114 : :
115 [ + - ]: 2 : delete key;
116 : 3 : return changed;
117 : : }
118 : :
119 : : bool PersistenceSerializer::CheckForFile(UnserialInfo* info, const char* file,
120 : 2 : bool delete_file)
121 : : {
122 : 2 : bool ret = true;
123 [ + + ]: 2 : if ( CheckTimestamp(file) )
124 : : {
125 : : // Need to copy the filename here, as it may be passed
126 : : // in via fmt().
127 : 1 : const char* f = copy_string(file);
128 : :
129 : 1 : bool ret = Read(info, f);
130 : :
131 [ + - - + ]: 1 : if ( delete_file && unlink(f) < 0 )
[ - + ]
132 : 0 : Error(fmt("can't delete file %s: %s", f, strerror(errno)));
133 : :
134 [ + - ]: 1 : delete [] f;
135 : : }
136 : :
137 : 2 : return ret;
138 : : }
139 : :
140 : 1 : bool PersistenceSerializer::ReadAll(bool is_init, bool delete_files)
141 : : {
142 : : #ifdef USE_PERFTOOLS
143 : : HeapLeakChecker::Disabler disabler;
144 : : #endif
145 : :
146 [ - + ]: 1 : assert(dir);
147 : :
148 : 1 : UnserialInfo config_info(this);
149 : : config_info.id_policy = is_init ?
150 [ + - ]: 1 : UnserialInfo::Replace : UnserialInfo::CopyCurrentToNew;
151 : :
152 [ - + ]: 1 : if ( ! CheckForFile(&config_info, fmt("%s/config.bst", dir),
153 : : delete_files) )
154 : 0 : return false;
155 : :
156 : 1 : UnserialInfo state_info(this);
157 : 1 : state_info.id_policy = UnserialInfo::CopyNewToCurrent;
158 [ - + ]: 1 : if ( ! CheckForFile(&state_info, fmt("%s/state.bst", dir),
159 : : delete_files) )
160 : 0 : return false;
161 : :
162 : 1 : return true;
163 : : }
164 : :
165 : 1 : bool PersistenceSerializer::MoveFileUp(const char* dir, const char* file)
166 : : {
167 : : char oldname[PATH_MAX];
168 : : char newname[PATH_MAX];
169 : :
170 : 1 : safe_snprintf(oldname, PATH_MAX, "%s/.tmp/%s", dir, file );
171 : 1 : safe_snprintf(newname, PATH_MAX, "%s/%s", dir, file );
172 : :
173 [ - + ]: 1 : if ( rename(oldname, newname) < 0 )
174 : : {
175 : : Error(fmt("can't move %s to %s: %s", oldname, newname,
176 : 0 : strerror(errno)));
177 : 0 : return false;
178 : : }
179 : :
180 : 1 : CheckTimestamp(newname);
181 : 1 : return true;
182 : : }
183 : :
184 : : #if 0
185 : : void PersistenceSerializer::RaiseFinishedSendState()
186 : : {
187 : : val_list* vl = new val_list;
188 : : vl->append(new AddrVal(htonl(remote_host)));
189 : : vl->append(new PortVal(remote_port));
190 : :
191 : : mgr.QueueEvent(finished_send_state, vl);
192 : : bro_logger->Log("Serialization done.");
193 : : }
194 : : #endif
195 : :
196 : : void PersistenceSerializer::GotEvent(const char* name, double time,
197 : 0 : EventHandlerPtr event, val_list* args)
198 : : {
199 : 0 : mgr.QueueEvent(event, args);
200 : 0 : }
201 : :
202 : : void PersistenceSerializer::GotFunctionCall(const char* name, double time,
203 : 0 : Func* func, val_list* args)
204 : : {
205 : 0 : func->Call(args);
206 : 0 : }
207 : :
208 : 0 : void PersistenceSerializer::GotStateAccess(StateAccess* s)
209 : : {
210 : 0 : s->Replay();
211 [ # # ]: 0 : delete s;
212 : 0 : }
213 : :
214 : 0 : void PersistenceSerializer::GotTimer(Timer* s)
215 : : {
216 : 0 : run_time("PersistenceSerializer::GotTimer not implemented");
217 : 0 : }
218 : :
219 : 0 : void PersistenceSerializer::GotConnection(Connection* c)
220 : : {
221 : 0 : Unref(c);
222 : 0 : }
223 : :
224 : 0 : void PersistenceSerializer::GotID(ID* id, Val* /* val */)
225 : : {
226 : 0 : Unref(id);
227 : 0 : }
228 : :
229 : 0 : void PersistenceSerializer::GotPacket(Packet* p)
230 : : {
231 : 0 : run_time("PersistenceSerializer::GotPacket not implemented");
232 : 0 : }
233 : :
234 : 0 : bool PersistenceSerializer::LogAccess(const StateAccess& s)
235 : : {
236 [ # # ]: 0 : if ( ! IsSerializationRunning() )
237 : 0 : return true;
238 : :
239 [ # # ]: 0 : loop_over_list(running, i)
240 : : {
241 : 0 : running[i]->accesses.append(new StateAccess(s));
242 : : }
243 : :
244 : 0 : return true;
245 : : }
246 : :
247 : 1 : bool PersistenceSerializer::WriteState(bool may_suspend)
248 : : {
249 : : SerialStatus* status =
250 : 1 : new SerialStatus(this, SerialStatus::WritingState);
251 : :
252 : 1 : status->info.may_suspend = may_suspend;
253 : :
254 : 1 : status->ids = &persistent_ids;
255 : 1 : status->conns = &persistent_conns;
256 : 1 : status->filename = "state.bst";
257 : :
258 : 1 : return RunSerialization(status);
259 : : }
260 : :
261 : 0 : bool PersistenceSerializer::WriteConfig(bool may_suspend)
262 : : {
263 [ # # ][ # # ]: 0 : if ( mgr.IsDraining() && may_suspend )
[ # # ]
264 : : // Events which trigger checkpoint are flushed. Ignore; we'll
265 : : // checkpoint at termination in any case.
266 : 0 : return true;
267 : :
268 : : SerialStatus* status =
269 : 0 : new SerialStatus(this, SerialStatus::WritingConfig);
270 : :
271 : 0 : status->info.may_suspend = may_suspend;
272 : 0 : status->info.clear_containers = true;
273 : 0 : status->ids = global_scope()->GetIDs();
274 : 0 : status->filename = "config.bst";
275 : :
276 : 0 : return RunSerialization(status);
277 : : }
278 : :
279 : 0 : bool PersistenceSerializer::SendState(SourceID peer, bool may_suspend)
280 : : {
281 : : SerialStatus* status =
282 : 0 : new SerialStatus(remote_serializer, SerialStatus::SendingState);
283 : :
284 : 0 : status->info.may_suspend = may_suspend;
285 : 0 : status->ids = &persistent_ids;
286 : 0 : status->conns = &persistent_conns;
287 : 0 : status->peer = peer;
288 : :
289 : 0 : bro_logger->Log("Sending state...");
290 : :
291 : 0 : return RunSerialization(status);
292 : : }
293 : :
294 : 0 : bool PersistenceSerializer::SendConfig(SourceID peer, bool may_suspend)
295 : : {
296 : : SerialStatus* status =
297 : 0 : new SerialStatus(remote_serializer, SerialStatus::SendingConfig);
298 : :
299 : 0 : status->info.may_suspend = may_suspend;
300 : 0 : status->info.clear_containers = true;
301 : 0 : status->ids = global_scope()->GetIDs();
302 : 0 : status->peer = peer;
303 : :
304 : 0 : bro_logger->Log("Sending config...");
305 : :
306 : 0 : return RunSerialization(status);
307 : : }
308 : :
309 : 1 : bool PersistenceSerializer::RunSerialization(SerialStatus* status)
310 : : {
311 : 1 : Continuation* cont = &status->info.cont;
312 : :
313 [ + - ]: 1 : if ( cont->NewInstance() )
314 : : {
315 : : // Serialization is starting. Initialize.
316 : :
317 : : // See if there is already a serialization of this type running.
318 [ - + ]: 1 : loop_over_list(running, i)
319 : : {
320 [ # # ]: 0 : if ( running[i]->type == status->type )
321 : : {
322 : : // ### We don't report this anymore as it would go to stderr.
323 : : // Warning(fmt("Serialization of type %d already running.", status->type));
324 : 0 : return false;
325 : : }
326 : : }
327 : :
328 : 1 : running.append(status);
329 : :
330 : : // Initialize.
331 [ + - - + ]: 1 : if ( ! (ensure_dir(dir) && ensure_dir(fmt("%s/.tmp", dir))) )
[ - + ]
332 : 0 : return false;
333 : :
334 [ - + ]: 1 : if ( ! OpenFile(fmt("%s/.tmp/%s", dir, status->filename), false) )
335 : 0 : return false;
336 : :
337 [ - + ]: 1 : if ( ! PrepareForWriting() )
338 : 0 : return false;
339 : :
340 [ + - ]: 1 : if ( status->ids )
341 : : {
342 : 1 : status->id_cookie = status->ids->InitForIteration();
343 : 1 : status->ids->MakeRobustCookie(status->id_cookie);
344 : : }
345 : :
346 [ + - ]: 1 : if ( status->conns )
347 : : {
348 : 1 : status->conn_cookie = status->conns->InitForIteration();
349 : 1 : status->conns->MakeRobustCookie(status->conn_cookie);
350 : : }
351 : :
352 [ - + ]: 1 : if ( status->info.may_suspend )
353 : 1 : bro_logger->Log("Starting incremental serialization...");
354 : : }
355 : :
356 [ # # ]: 0 : else if ( cont->ChildSuspended() )
357 : : {
358 : : // One of our former Serialize() calls suspended itself.
359 : : // We have to call it again.
360 : :
361 [ # # ]: 0 : if ( status->id_cookie )
362 : : {
363 [ # # ]: 0 : if ( ! DoIDSerialization(status, status->current.id) )
364 : 0 : return false;
365 : :
366 [ # # ]: 0 : if ( cont->ChildSuspended() )
367 : : {
368 : : // Oops, it did it again.
369 : 0 : timer_mgr->Add(new IncrementalWriteTimer(network_time + state_write_delay, status));
370 : 0 : return true;
371 : : }
372 : : }
373 : :
374 [ # # ]: 0 : else if ( status->conn_cookie )
375 : : {
376 [ # # ]: 0 : if ( ! DoConnSerialization(status, status->current.conn) )
377 : 0 : return false;
378 : :
379 [ # # ]: 0 : if ( cont->ChildSuspended() )
380 : : {
381 : : // Oops, it did it again.
382 : 0 : timer_mgr->Add(new IncrementalWriteTimer(network_time + state_write_delay, status));
383 : 0 : return true;
384 : : }
385 : : }
386 : :
387 : : else
388 : 0 : internal_error("unknown suspend state");
389 : : }
390 : :
391 [ # # ]: 0 : else if ( cont->Resuming() )
392 : 0 : cont->Resume();
393 : :
394 : : else
395 : 0 : internal_error("unknown continuation state");
396 : :
397 [ + - ]: 1 : if ( status->id_cookie )
398 : : {
399 : : ID* id;
400 : :
401 [ + + ]: 2 : while ( (id = status->ids->NextEntry(status->id_cookie)) )
402 : : {
403 : 1 : ID* g = global_scope()->Lookup(id->Name());
404 : :
405 [ - + ]: 1 : if ( ! DoIDSerialization(status, id) )
406 : 0 : return false;
407 : :
408 [ - + ]: 1 : if ( cont->ChildSuspended() )
409 : : {
410 : 0 : timer_mgr->Add(new IncrementalWriteTimer(network_time + state_write_delay, status));
411 : 0 : return true;
412 : : }
413 : :
414 [ - + ]: 1 : if ( status->info.may_suspend )
415 : : {
416 : 0 : timer_mgr->Add(new IncrementalWriteTimer(network_time + state_write_delay, status));
417 : 0 : cont->Suspend();
418 : 0 : return true;
419 : : }
420 : : }
421 : :
422 : : // Cookie has been set to 0 by NextEntry().
423 : : }
424 : :
425 [ + - ]: 1 : if ( status->conn_cookie )
426 : : {
427 : : Connection* conn;
428 [ - + ]: 1 : while ( (conn = status->conns->NextEntry(status->conn_cookie)) )
429 : : {
430 [ # # ]: 0 : if ( ! DoConnSerialization(status, conn) )
431 : 0 : return false;
432 : :
433 [ # # ]: 0 : if ( cont->ChildSuspended() )
434 : : {
435 : 0 : timer_mgr->Add(new IncrementalWriteTimer(network_time + state_write_delay, status));
436 : 0 : return true;
437 : : }
438 : :
439 [ # # ]: 0 : if ( status->info.may_suspend )
440 : : {
441 : 0 : timer_mgr->Add(new IncrementalWriteTimer(network_time + state_write_delay, status));
442 : 0 : cont->Suspend();
443 : 0 : return true;
444 : : }
445 : :
446 : : }
447 : :
448 : : // Cookie has been set to 0 by NextEntry().
449 : : }
450 : :
451 : 1 : DBG_LOG(DBG_STATE, "finished serialization; %d accesses pending",
452 : : status->accesses.length());
453 : :
454 [ - + ]: 1 : if ( status->accesses.length() )
455 : : {
456 : : // Serialize pending state accesses.
457 : : // FIXME: Does this need to suspend?
458 : : StateAccess* access;
459 [ # # ]: 0 : loop_over_list(status->accesses, i)
460 : : {
461 : : // Serializing a StateAccess will not suspend.
462 [ # # ]: 0 : if ( ! DoAccessSerialization(status, status->accesses[i]) )
463 : 0 : return false;
464 : :
465 [ # # ]: 0 : delete status->accesses[i];
466 : : }
467 : : }
468 : :
469 : : // Finalize.
470 : 1 : CloseFile();
471 : :
472 : 1 : bool ret = MoveFileUp(dir, status->filename);
473 : :
474 [ + - ]: 1 : loop_over_list(running, i)
475 : : {
476 [ + - ]: 1 : if ( running[i]->type == status->type )
477 : : {
478 : 1 : running.remove_nth(i);
479 : 1 : break;
480 : : }
481 : : }
482 : :
483 [ - + ]: 1 : if ( status->info.may_suspend )
484 : 0 : bro_logger->Log("Finished incremental serialization.");
485 : :
486 [ + - ]: 1 : delete status;
487 : 1 : return ret;
488 : : }
489 : :
490 : 1 : bool PersistenceSerializer::DoIDSerialization(SerialStatus* status, ID* id)
491 : : {
492 : 1 : bool success = false;
493 : 1 : Continuation* cont = &status->info.cont;
494 : :
495 : 1 : status->current.id = id;
496 : :
497 [ + - - ]: 1 : switch ( status->type ) {
498 : : case SerialStatus::WritingState:
499 : : case SerialStatus::WritingConfig:
500 : 1 : cont->SaveContext();
501 : 1 : success = Serialize(&status->info, *id);
502 : 1 : cont->RestoreContext();
503 : 1 : break;
504 : :
505 : : case SerialStatus::SendingState:
506 : : case SerialStatus::SendingConfig:
507 : 0 : cont->SaveContext();
508 : : success = remote_serializer->SendID(&status->info,
509 : 0 : status->peer, *id);
510 : 0 : cont->RestoreContext();
511 : 0 : break;
512 : :
513 : : default:
514 : 0 : internal_error("unknown serialization type");
515 : : }
516 : :
517 : 1 : return success;
518 : : }
519 : :
520 : : bool PersistenceSerializer::DoConnSerialization(SerialStatus* status,
521 : 0 : Connection* conn)
522 : : {
523 : 0 : bool success = false;
524 : 0 : Continuation* cont = &status->info.cont;
525 : :
526 : 0 : status->current.conn = conn;
527 : :
528 [ # # # ]: 0 : switch ( status->type ) {
529 : : case SerialStatus::WritingState:
530 : : case SerialStatus::WritingConfig:
531 : 0 : cont->SaveContext();
532 : 0 : success = Serialize(&status->info, *conn);
533 : 0 : cont->RestoreContext();
534 : 0 : break;
535 : :
536 : : case SerialStatus::SendingState:
537 : : case SerialStatus::SendingConfig:
538 : 0 : cont->SaveContext();
539 : : success = remote_serializer->SendConnection(&status->info,
540 : 0 : status->peer, *conn);
541 : 0 : cont->RestoreContext();
542 : 0 : break;
543 : :
544 : : default:
545 : 0 : internal_error("unknown serialization type");
546 : : }
547 : :
548 : 0 : return success;
549 : : }
550 : :
551 : : bool PersistenceSerializer::DoAccessSerialization(SerialStatus* status,
552 : 0 : StateAccess* access)
553 : : {
554 : 0 : bool success = false;
555 : 0 : DisableSuspend suspend(&status->info);
556 : :
557 [ # # # ]: 0 : switch ( status->type ) {
558 : : case SerialStatus::WritingState:
559 : : case SerialStatus::WritingConfig:
560 : 0 : success = Serialize(&status->info, *access);
561 : 0 : break;
562 : :
563 : : case SerialStatus::SendingState:
564 : : case SerialStatus::SendingConfig:
565 : : success = remote_serializer->SendAccess(&status->info,
566 : 0 : status->peer, *access);
567 : 0 : break;
568 : :
569 : : default:
570 : 0 : internal_error("unknown serialization type");
571 : : }
572 : :
573 : 0 : return success;
574 [ + - ][ + - ]: 6 : }
|