Branch data Line data Source code
1 : : // $Id: ChunkedIO.cc 6888 2009-08-20 18:23:11Z vern $
2 : :
3 : : #include <unistd.h>
4 : : #include <fcntl.h>
5 : : #include <errno.h>
6 : : #include <signal.h>
7 : : #include <sys/time.h>
8 : : #include <netinet/in.h>
9 : : #include <assert.h>
10 : : #include <openssl/ssl.h>
11 : :
12 : : #include "config.h"
13 : : #include "ChunkedIO.h"
14 : : #include "NetVar.h"
15 : : #include "RemoteSerializer.h"
16 : :
17 : 2 : ChunkedIO::ChunkedIO()
18 : : {
19 : 2 : pure = false;
20 : 2 : }
21 : :
22 : 0 : void ChunkedIO::Stats(char* buffer, int length)
23 : : {
24 : : safe_snprintf(buffer, length,
25 : : "bytes=%luK/%luK chunks=%lu/%lu io=%lu/%lu bytes/io=%.2fK/%.2fK",
26 : : stats.bytes_read / 1024, stats.bytes_written / 1024,
27 : : stats.chunks_read, stats.chunks_written,
28 : : stats.reads, stats.writes,
29 : : stats.bytes_read / (1024.0 * stats.reads),
30 : 0 : stats.bytes_written / (1024.0 * stats.writes));
31 : 0 : }
32 : :
33 : : #ifdef DEBUG_COMMUNICATION
34 : :
35 : : void ChunkedIO::AddToBuffer(uint32 len, char* data, bool is_read)
36 : : {
37 : : Chunk* copy = new Chunk;
38 : : copy->len = len;
39 : : copy->data = new char[len];
40 : : memcpy(copy->data, data, len);
41 : :
42 : : std::list<Chunk*>* l = is_read ? &data_read : &data_written;
43 : : l->push_back(copy);
44 : :
45 : : if ( l->size() > DEBUG_COMMUNICATION )
46 : : {
47 : : Chunk* old = l->front();
48 : : l->pop_front();
49 : : delete [] old->data;
50 : : delete old;
51 : : }
52 : : }
53 : :
54 : : void ChunkedIO::AddToBuffer(Chunk* chunk, bool is_read)
55 : : {
56 : : AddToBuffer(chunk->len, chunk->data, is_read);
57 : : }
58 : :
59 : : void ChunkedIO::DumpDebugData(const char* basefnname, bool want_reads)
60 : : {
61 : : std::list<Chunk*>* l = want_reads ? &data_read : &data_written;
62 : :
63 : : int count = 0;
64 : :
65 : : for ( std::list<Chunk*>::iterator i = l->begin(); i != l->end(); ++i )
66 : : {
67 : : static char buffer[128];
68 : : snprintf(buffer, sizeof(buffer), "%s.%s.%d", basefnname,
69 : : want_reads ? "read" : "write", ++count);
70 : : buffer[sizeof(buffer) - 1] = '\0';
71 : :
72 : : int fd = open(buffer, O_WRONLY | O_CREAT | O_TRUNC, 0600);
73 : : if ( fd < 0 )
74 : : continue;
75 : :
76 : : ChunkedIOFd io(fd, "dump-file");
77 : : io.Write(*i);
78 : : io.Flush();
79 : : close(fd);
80 : : }
81 : :
82 : : l->clear();
83 : : }
84 : :
85 : : #endif
86 : :
87 : 2 : ChunkedIOFd::ChunkedIOFd(int arg_fd, const char* arg_tag, pid_t arg_pid)
88 : : {
89 : : int flags;
90 : :
91 : 2 : tag = arg_tag;
92 : 2 : fd = arg_fd;
93 : 2 : eof = 0;
94 : 2 : last_flush = current_time();
95 : 2 : failed_reads = 0;
96 : :
97 [ - + # # ]: 2 : if ( (flags = fcntl(fd, F_GETFL, 0)) < 0)
98 : : {
99 : 0 : Log(fmt("can't obtain socket flags: %s", strerror(errno)));
100 : 0 : exit(1);
101 : : }
102 : :
103 [ - + ][ # # ]: 2 : if ( fcntl(fd, F_SETFL, flags|O_NONBLOCK) < 0 )
104 : : {
105 : : Log(fmt("can't set fd to non-blocking: %s (%d)",
106 : 0 : strerror(errno), getpid()));
107 : 0 : exit(1);
108 : : }
109 : :
110 : 2 : read_buffer = new char[BUFFER_SIZE];
111 : 2 : read_len = 0;
112 : 2 : read_pos = 0;
113 : 2 : partial = 0;
114 : 2 : write_buffer = new char[BUFFER_SIZE];
115 : 2 : write_len = 0;
116 : 2 : write_pos = 0;
117 : :
118 : 2 : pending_head = 0;
119 : 2 : pending_tail = 0;
120 : :
121 : 2 : pid = arg_pid;
122 : 2 : }
123 : :
124 : 2 : ChunkedIOFd::~ChunkedIOFd()
125 : : {
126 : 2 : Clear();
127 : :
128 [ + - # # ]: 2 : delete [] read_buffer;
[ # # ]
129 [ + - ][ # # ]: 2 : delete [] write_buffer;
[ # # ]
130 : 2 : close(fd);
131 : :
132 [ - + # # # : 2 : if ( partial )
# ]
133 : : {
134 [ # # ][ # # ]: 0 : delete [] partial->data;
[ # # ]
135 : 0 : delete partial;
136 : : }
137 [ + - ][ # # ]: 2 : }
[ # # ]
138 : :
139 : 1 : bool ChunkedIOFd::Write(Chunk* chunk)
140 : : {
141 : : #ifdef DEBUG
142 : 1 : DBG_LOG(DBG_CHUNKEDIO, "write of size %d [%s]",
143 : : chunk->len, fmt_bytes(chunk->data, min(20, chunk->len)));
144 : : #endif
145 : :
146 : : // Reject if our queue of pending chunks is way too large. Otherwise,
147 : : // memory could fill up if the other side doesn't read.
148 [ - + ]: 1 : if ( stats.pending > MAX_BUFFERED_CHUNKS )
149 : : {
150 : 0 : DBG_LOG(DBG_CHUNKEDIO, "write queue full");
151 : :
152 : : #ifdef DEBUG_COMMUNICATION
153 : : AddToBuffer("<false:write-queue-full>", false);
154 : : #endif
155 : :
156 : 0 : errno = ENOSPC;
157 : 0 : return false;
158 : : }
159 : :
160 : : #ifdef DEBUG_COMMUNICATION
161 : : AddToBuffer(chunk, false);
162 : : #endif
163 : :
164 [ + - ]: 1 : if ( chunk->len <= BUFFER_SIZE - sizeof(uint32) )
165 : 1 : return WriteChunk(chunk, false);
166 : :
167 : : // We have to split it up.
168 : 0 : char* p = chunk->data;
169 : 0 : unsigned long left = chunk->len;
170 : :
171 [ # # ]: 0 : while ( left )
172 : : {
173 : 0 : Chunk* part = new Chunk;
174 : :
175 : 0 : part->len = min(BUFFER_SIZE - sizeof(uint32), left);
176 : 0 : part->data = new char[part->len];
177 : 0 : memcpy(part->data, p, part->len);
178 : 0 : left -= part->len;
179 : 0 : p += part->len;
180 : :
181 [ # # ]: 0 : if ( ! WriteChunk(part, left != 0) )
182 : 0 : return false;
183 : : }
184 : :
185 [ # # ]: 0 : delete [] chunk->data;
186 : 0 : delete chunk;
187 : :
188 : 1 : return true;
189 : : }
190 : :
191 : 1 : bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
192 : : {
193 [ - + ]: 1 : assert(chunk->len <= BUFFER_SIZE - sizeof(uint32) );
194 : :
195 [ - + ]: 1 : if ( chunk->len == 0 )
196 : 0 : internal_error( "attempt to write 0 bytes chunk");
197 : :
198 [ - + ]: 1 : if ( partial )
199 : 0 : chunk->len |= FLAG_PARTIAL;
200 : :
201 : 1 : ++stats.chunks_written;
202 : :
203 : : // If it fits into the buffer, we're done (but keep care not
204 : : // to reorder chunks).
205 [ + - ][ + - ]: 1 : if ( ! pending_head && PutIntoWriteBuffer(chunk) )
[ + - ]
206 : 1 : return true;
207 : :
208 : : // Otherwise queue it.
209 : 0 : ++stats.pending;
210 : 0 : ChunkQueue* q = new ChunkQueue;
211 : 0 : q->chunk = chunk;
212 : 0 : q->next = 0;
213 : :
214 [ # # ]: 0 : if ( pending_tail )
215 : : {
216 : 0 : pending_tail->next = q;
217 : 0 : pending_tail = q;
218 : : }
219 : : else
220 : 0 : pending_head = pending_tail = q;
221 : :
222 : 1 : return Flush();
223 : : }
224 : :
225 : :
226 : 1 : bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
227 : : {
228 : 1 : uint32 len = chunk->len & ~FLAG_PARTIAL;
229 : :
230 [ - + ][ - + ]: 1 : if ( write_len + len + (IsPure() ? 0 : sizeof(len)) > BUFFER_SIZE )
231 : 0 : return false;
232 : :
233 [ + - ]: 1 : if ( ! IsPure() )
234 : : {
235 : 1 : uint32 nlen = htonl(chunk->len);
236 : 1 : memcpy(write_buffer + write_len, &nlen, sizeof(nlen));
237 : 1 : write_len += sizeof(nlen);
238 : : }
239 : :
240 : 1 : memcpy(write_buffer + write_len, chunk->data, len);
241 : 1 : write_len += len;
242 : :
243 [ + - ]: 1 : delete [] chunk->data;
244 : 1 : delete chunk;
245 : :
246 [ - + ]: 1 : if ( network_time - last_flush > 0.005 )
247 : 0 : FlushWriteBuffer();
248 : :
249 : 1 : return true;
250 : : }
251 : :
252 : 4 : bool ChunkedIOFd::FlushWriteBuffer()
253 : : {
254 : 4 : last_flush = network_time;
255 : :
256 [ + + ]: 4 : while ( write_pos != write_len )
257 : : {
258 : 1 : uint32 len = write_len - write_pos;
259 : :
260 : 1 : int written = write(fd, write_buffer + write_pos, len);
261 : :
262 [ - + ]: 1 : if ( written < 0 )
263 : : {
264 [ # # ]: 0 : if ( errno == EPIPE )
265 : 0 : eof = true;
266 : :
267 [ # # ]: 0 : if ( errno != EINTR )
268 : : // These errnos are equal on POSIX.
269 [ # # ][ # # ]: 0 : return errno == EWOULDBLOCK || errno == EAGAIN;
270 : :
271 : : else
272 : 0 : written = 0;
273 : : }
274 : :
275 : 1 : stats.bytes_written += written;
276 [ + - ]: 1 : if ( written > 0 )
277 : 1 : ++stats.writes;
278 : :
279 [ + - ]: 1 : if ( unsigned(written) == len )
280 : : {
281 : 1 : write_pos = write_len = 0;
282 : 1 : return true;
283 : : }
284 : :
285 [ # # ]: 0 : if ( written == 0 )
286 : 0 : internal_error("written==0");
287 : :
288 : : // Short write.
289 : 0 : write_pos += written;
290 : : }
291 : :
292 : 4 : return true;
293 : : }
294 : :
295 : 2 : bool ChunkedIOFd::OptionalFlush()
296 : : {
297 : : // This threshhold is quite arbitrary.
298 : : // if ( current_time() - last_flush > 0.01 )
299 : 2 : return Flush();
300 : : }
301 : :
302 : 4 : bool ChunkedIOFd::Flush()
303 : : {
304 : : // Try to write data out.
305 [ - + ]: 4 : while ( pending_head )
306 : : {
307 [ # # ]: 0 : if ( ! FlushWriteBuffer() )
308 : 0 : return false;
309 : :
310 : : // If we couldn't write the whole buffer, we stop here
311 : : // and try again next time.
312 [ # # ]: 0 : if ( write_len > 0 )
313 : 0 : return true;
314 : :
315 : : // Put as many pending chunks into the buffer as possible.
316 [ # # ]: 0 : while ( pending_head )
317 : : {
318 [ # # ]: 0 : if ( ! PutIntoWriteBuffer(pending_head->chunk) )
319 : 0 : break;
320 : :
321 : 0 : ChunkQueue* q = pending_head;
322 : 0 : pending_head = pending_head->next;
323 [ # # ]: 0 : if ( ! pending_head )
324 : 0 : pending_tail = 0;
325 : :
326 : 0 : --stats.pending;
327 : 0 : delete q;
328 : : }
329 : : }
330 : :
331 : 4 : return FlushWriteBuffer();
332 : : }
333 : :
334 : 2 : uint32 ChunkedIOFd::ChunkAvailable()
335 : : {
336 : 2 : int bytes_left = read_len - read_pos;
337 : :
338 [ + - ]: 2 : if ( bytes_left < int(sizeof(uint32)) )
339 : 2 : return 0;
340 : :
341 : 0 : bytes_left -= sizeof(uint32);
342 : :
343 : : // We have to copy the value here as it may not be
344 : : // aligned correctly in the data.
345 : : uint32 len;
346 : 0 : memcpy(&len, read_buffer + read_pos, sizeof(len));
347 : 0 : len = ntohl(len);
348 : :
349 [ # # ]: 0 : if ( uint32(bytes_left) < (len & ~FLAG_PARTIAL) )
350 : 0 : return 0;
351 : :
352 [ # # ]: 0 : assert(len & ~FLAG_PARTIAL);
353 : :
354 : 2 : return len;
355 : : }
356 : :
357 : 2 : ChunkedIO::Chunk* ChunkedIOFd::ExtractChunk()
358 : : {
359 : 2 : uint32 len = ChunkAvailable();
360 : 2 : uint32 real_len = len & ~FLAG_PARTIAL;
361 [ + - ]: 2 : if ( ! real_len )
362 : 2 : return 0;
363 : :
364 : 0 : read_pos += sizeof(uint32);
365 : :
366 : 0 : Chunk* chunk = new Chunk;
367 : 0 : chunk->len = len;
368 : 0 : chunk->data = new char[real_len];
369 : 0 : memcpy(chunk->data, read_buffer + read_pos, real_len);
370 : 0 : read_pos += real_len;
371 : :
372 : 0 : ++stats.chunks_read;
373 : :
374 : 2 : return chunk;
375 : : }
376 : :
377 : 0 : ChunkedIO::Chunk* ChunkedIOFd::ConcatChunks(Chunk* c1, Chunk* c2)
378 : : {
379 : 0 : Chunk* c = new Chunk;
380 : :
381 : 0 : c->len = c1->len + c2->len;
382 : 0 : c->data = new char[c->len];
383 : :
384 : 0 : memcpy(c->data, c1->data, c1->len);
385 : 0 : memcpy(c->data + c1->len, c2->data, c2->len);
386 : :
387 [ # # ]: 0 : delete [] c1->data;
388 : 0 : delete c1;
389 [ # # ]: 0 : delete [] c2->data;
390 : 0 : delete c2;
391 : :
392 : 0 : return c;
393 : : }
394 : :
395 : 0 : void ChunkedIO::Log(const char* str)
396 : : {
397 : 0 : RemoteSerializer::Log(RemoteSerializer::LogError, str);
398 : 0 : }
399 : :
400 : 1 : bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
401 : : {
402 : 1 : *chunk = 0;
403 : :
404 : : // We will be called regularly. So take the opportunity
405 : : // to flush the write buffer once in a while.
406 : 1 : OptionalFlush();
407 : :
408 [ + - ]: 1 : if ( ! ReadChunk(chunk, may_block) )
409 : : {
410 : : #ifdef DEBUG_COMMUNICATION
411 : : AddToBuffer("<false:read-chunk>", true);
412 : : #endif
413 : 1 : return false;
414 : : }
415 : :
416 [ # # ]: 0 : if ( ! *chunk )
417 : : {
418 : : #ifdef DEBUG_COMMUNICATION
419 : : AddToBuffer("<null:no-data>", true);
420 : : #endif
421 : 0 : return true;
422 : : }
423 : :
424 : : #ifdef DEBUG
425 [ # # ]: 0 : if ( *chunk )
426 [ # # ]: 0 : DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
427 : : (*chunk)->len & ~FLAG_PARTIAL,
428 : : (*chunk)->len & FLAG_PARTIAL ? "(P) " : "",
429 : : fmt_bytes((*chunk)->data,
430 : : min(20, (*chunk)->len)));
431 : : #endif
432 : :
433 [ # # ]: 0 : if ( ! ((*chunk)->len & FLAG_PARTIAL) )
434 : : {
435 [ # # ]: 0 : if ( ! partial )
436 : : {
437 : : #ifdef DEBUG_COMMUNICATION
438 : : AddToBuffer(*chunk, true);
439 : : #endif
440 : 0 : return true;
441 : : }
442 : : else
443 : : {
444 : : // This is the last chunk of an oversized one.
445 : 0 : *chunk = ConcatChunks(partial, *chunk);
446 : 0 : partial = 0;
447 : :
448 : : #ifdef DEBUG
449 [ # # ]: 0 : if ( *chunk )
450 : 0 : DBG_LOG(DBG_CHUNKEDIO,
451 : : "built virtual chunk of size %d [%s]",
452 : : (*chunk)->len,
453 : : fmt_bytes((*chunk)->data, 20));
454 : : #endif
455 : :
456 : : #ifdef DEBUG_COMMUNICATION
457 : : AddToBuffer(*chunk, true);
458 : : #endif
459 : 0 : return true;
460 : : }
461 : : }
462 : :
463 : : // This chunk is the non-last part of an oversized.
464 : 0 : (*chunk)->len &= ~FLAG_PARTIAL;
465 : :
466 [ # # ]: 0 : if ( ! partial )
467 : : // First part of oversized chunk.
468 : 0 : partial = *chunk;
469 : : else
470 : 0 : partial = ConcatChunks(partial, *chunk);
471 : :
472 : : #ifdef DEBUG_COMMUNICATION
473 : : AddToBuffer("<null:partial>", true);
474 : : #endif
475 : :
476 : 0 : *chunk = 0;
477 : 1 : return true; // Read following part next time.
478 : : }
479 : :
480 : 1 : bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
481 : : {
482 : : // We will be called regularly. So take the opportunity
483 : : // to flush the write buffer once in a while.
484 : 1 : OptionalFlush();
485 : :
486 : 1 : *chunk = ExtractChunk();
487 [ - + ]: 1 : if ( *chunk )
488 : 0 : return true;
489 : :
490 : 1 : int bytes_left = read_len - read_pos;
491 : :
492 : : // If we have a partial chunk left, move this to the head of
493 : : // the buffer.
494 [ - + ]: 1 : if ( bytes_left )
495 : 0 : memmove(read_buffer, read_buffer + read_pos, bytes_left);
496 : :
497 : 1 : read_pos = 0;
498 : 1 : read_len = bytes_left;
499 : :
500 : : // If allowed, wait a bit for something to read.
501 [ - + ]: 1 : if ( may_block )
502 : : {
503 : : fd_set fd_read, fd_write, fd_except;
504 : :
505 : 0 : FD_ZERO(&fd_read);
506 : 0 : FD_ZERO(&fd_write);
507 : 0 : FD_ZERO(&fd_except);
508 : 0 : FD_SET(fd, &fd_read);
509 : :
510 : : struct timeval small_timeout;
511 : 0 : small_timeout.tv_sec = 0;
512 : 0 : small_timeout.tv_usec = 50;
513 : :
514 : 0 : select(fd + 1, &fd_read, &fd_write, &fd_except, &small_timeout);
515 : : }
516 : :
517 : : // Make sure the process is still runnning
518 : : // (only checking for EPIPE after a read doesn't
519 : : // seem to be sufficient).
520 [ - + ][ # # ]: 1 : if ( pid && kill(pid, 0) < 0 && errno != EPERM )
[ # # ][ - + ]
521 : : {
522 : 0 : eof = true;
523 : 0 : errno = EPIPE;
524 : 0 : return false;
525 : : }
526 : :
527 : : // Try to fill the buffer.
528 : 0 : while ( true )
529 : : {
530 : 1 : int len = BUFFER_SIZE - read_len;
531 : 1 : int read = ::read(fd, read_buffer + read_len, len);
532 : :
533 [ - + ]: 1 : if ( read < 0 )
534 : : {
535 [ # # ]: 0 : if ( errno != EINTR )
536 : : {
537 : : // These errnos are equal on POSIX.
538 [ # # ][ # # ]: 0 : if ( errno == EWOULDBLOCK || errno == EAGAIN )
[ # # ]
539 : : {
540 : : // Let's see if we have a chunk now --
541 : : // even if we time out, we may have read
542 : : // just enough in previous iterations!
543 : 0 : *chunk = ExtractChunk();
544 : 0 : ++failed_reads;
545 : 0 : return true;
546 : : }
547 : :
548 [ # # ]: 0 : if ( errno == EPIPE )
549 : 0 : eof = true;
550 : :
551 : 0 : return false;
552 : : }
553 : :
554 : : else
555 : 0 : read = 0;
556 : : }
557 : :
558 : 1 : failed_reads = 0;
559 : :
560 [ + - ][ + - ]: 1 : if ( read == 0 && len != 0 )
561 : : {
562 : 1 : *chunk = ExtractChunk();
563 [ - + ]: 1 : if ( *chunk )
564 : 0 : return true;
565 : :
566 : 1 : eof = true;
567 : 1 : return false;
568 : : }
569 : :
570 : 0 : read_len += read;
571 : :
572 : 0 : ++stats.reads;
573 : 0 : stats.bytes_read += read;
574 : :
575 [ # # ]: 0 : if ( read == len )
576 : 0 : break;
577 : : }
578 : :
579 : : // Let's see if we have a chunk now.
580 : 0 : *chunk = ExtractChunk();
581 : :
582 : 1 : return true;
583 : : }
584 : :
585 : 0 : bool ChunkedIOFd::CanRead()
586 : : {
587 : : // We will be called regularly. So take the opportunity
588 : : // to flush the write buffer once in a while.
589 : 0 : OptionalFlush();
590 : :
591 [ # # ]: 0 : if ( ChunkAvailable() )
592 : 0 : return true;
593 : :
594 : : fd_set fd_read;
595 : 0 : FD_ZERO(&fd_read);
596 : 0 : FD_SET(fd, &fd_read);
597 : :
598 : : struct timeval no_timeout;
599 : 0 : no_timeout.tv_sec = 0;
600 : 0 : no_timeout.tv_usec = 0;
601 : :
602 : 0 : return select(fd + 1, &fd_read, 0, 0, &no_timeout) > 0;
603 : : }
604 : :
605 : 0 : bool ChunkedIOFd::CanWrite()
606 : : {
607 : 0 : return pending_head != 0;
608 : : }
609 : :
610 : 0 : bool ChunkedIOFd::IsIdle()
611 : : {
612 [ # # ][ # # ]: 0 : if ( pending_head || ChunkAvailable() )
[ # # ]
613 : 0 : return false;
614 : :
615 [ # # ]: 0 : if ( failed_reads > 0 )
616 : 0 : return true;
617 : :
618 : 0 : return false;
619 : : }
620 : :
621 : 0 : bool ChunkedIOFd::IsFillingUp()
622 : : {
623 : 0 : return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
624 : : }
625 : :
626 : 2 : void ChunkedIOFd::Clear()
627 : : {
628 [ - + ]: 2 : while ( pending_head )
629 : : {
630 : 0 : ChunkQueue* next = pending_head->next;
631 [ # # ]: 0 : delete [] pending_head->chunk->data;
632 : 0 : delete pending_head->chunk;
633 : 0 : delete pending_head;
634 : 0 : pending_head = next;
635 : : }
636 : :
637 : 2 : pending_head = pending_tail = 0;
638 : 2 : }
639 : :
640 : 0 : const char* ChunkedIOFd::Error()
641 : : {
642 : : static char buffer[1024];
643 : 0 : safe_snprintf(buffer, sizeof(buffer), "%s [%d]", strerror(errno), errno);
644 : :
645 : 0 : return buffer;
646 : : }
647 : :
648 : 0 : void ChunkedIOFd::Stats(char* buffer, int length)
649 : : {
650 : 0 : int i = safe_snprintf(buffer, length, "pending=%d ", stats.pending);
651 : 0 : ChunkedIO::Stats(buffer + i, length - i);
652 : 0 : }
653 : :
654 : : SSL_CTX* ChunkedIOSSL::ctx;
655 : :
656 : 0 : ChunkedIOSSL::ChunkedIOSSL(int arg_socket, bool arg_server)
657 : : {
658 : 0 : socket = arg_socket;
659 : 0 : eof = false;
660 : 0 : setup = false;
661 : 0 : server = arg_server;
662 : 0 : ssl = 0;
663 : :
664 : 0 : write_state = LEN;
665 : 0 : write_head = 0;
666 : 0 : write_tail = 0;
667 : :
668 : 0 : read_state = LEN;
669 : 0 : read_chunk = 0;
670 : 0 : read_ptr = 0;
671 : 0 : }
672 : :
673 : 0 : ChunkedIOSSL::~ChunkedIOSSL()
674 : : {
675 [ # # ][ # # ]: 0 : if ( setup )
[ # # ]
676 : : {
677 : 0 : SSL_shutdown(ssl);
678 : :
679 : : // We don't care if the other side closes properly.
680 : 0 : setup = false;
681 : : }
682 : :
683 [ # # ][ # # ]: 0 : if ( ssl )
[ # # ]
684 : : {
685 : 0 : SSL_free(ssl);
686 : 0 : ssl = 0;
687 : : }
688 : :
689 : 0 : close(socket);
690 [ # # ][ # # ]: 0 : }
[ # # ]
691 : :
692 : :
693 : 0 : static int pem_passwd_cb(char* buf, int size, int rwflag, void* passphrase)
694 : : {
695 : 0 : safe_strncpy(buf, (char*) passphrase, size);
696 : 0 : buf[size - 1] = '\0';
697 : 0 : return strlen(buf);
698 : : }
699 : :
700 : 0 : bool ChunkedIOSSL::Init()
701 : : {
702 : : // If the handshake doesn't succeed immediately we will
703 : : // be called multiple times.
704 [ # # ]: 0 : if ( ! ctx )
705 : : {
706 : 0 : SSL_load_error_strings();
707 : :
708 : 0 : ctx = SSL_CTX_new(SSLv3_method());
709 [ # # ]: 0 : if ( ! ctx )
710 : : {
711 : 0 : Log("can't create SSL context");
712 : 0 : return false;
713 : : }
714 : :
715 : : // We access global variables here. But as they are
716 : : // declared const and we don't modify them this should
717 : : // be fine.
718 : 0 : const char* key = ssl_private_key->AsString()->CheckString();
719 : :
720 [ # # # # ]: 0 : if ( ! (key && *key &&
[ # # ][ # # ]
721 : : SSL_CTX_use_certificate_chain_file(ctx, key)) )
722 : : {
723 : 0 : Log(fmt("can't read certificate from file %s", key));
724 : 0 : return false;
725 : : }
726 : :
727 : : const char* passphrase =
728 : 0 : ssl_passphrase->AsString()->CheckString();
729 : :
730 [ # # # # ]: 0 : if ( passphrase && ! streq(passphrase, "<undefined>") )
[ # # ]
731 : : {
732 : 0 : SSL_CTX_set_default_passwd_cb(ctx, pem_passwd_cb);
733 : : SSL_CTX_set_default_passwd_cb_userdata(ctx,
734 : 0 : (void*) passphrase);
735 : : }
736 : :
737 [ # # ][ # # ]: 0 : if ( ! (key && *key &&
[ # # ][ # # ]
738 : : SSL_CTX_use_PrivateKey_file(ctx, key, SSL_FILETYPE_PEM)) )
739 : : {
740 : 0 : Log(fmt("can't read private key from file %s", key));
741 : 0 : return false;
742 : : }
743 : :
744 : 0 : const char* ca = ssl_ca_certificate->AsString()->CheckString();
745 [ # # # # ]: 0 : if ( ! (ca && *ca && SSL_CTX_load_verify_locations(ctx, ca, 0)) )
[ # # ][ # # ]
746 : : {
747 : 0 : Log(fmt("can't read CA certificate from file %s", ca));
748 : 0 : return false;
749 : : }
750 : :
751 : : // Only use real ciphers.
752 [ # # ]: 0 : if ( ! SSL_CTX_set_cipher_list(ctx, "HIGH") )
753 : : {
754 : 0 : Log("can't set cipher list");
755 : 0 : return false;
756 : : }
757 : :
758 : : // Require client certificate.
759 : : SSL_CTX_set_verify(ctx,
760 : 0 : SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
761 : : }
762 : :
763 : : int flags;
764 : :
765 [ # # ]: 0 : if ( (flags = fcntl(socket, F_GETFL, 0)) < 0)
766 : : {
767 : 0 : Log(fmt("can't obtain socket flags: %s", strerror(errno)));
768 : 0 : return false;
769 : : }
770 : :
771 [ # # ]: 0 : if ( fcntl(socket, F_SETFL, flags|O_NONBLOCK) < 0 )
772 : : {
773 : : Log(fmt("can't set socket to non-blocking: %s",
774 : 0 : strerror(errno)));
775 : 0 : return false;
776 : : }
777 : :
778 [ # # ]: 0 : if ( ! ssl )
779 : : {
780 : 0 : ssl = SSL_new(ctx);
781 [ # # ]: 0 : if ( ! ssl )
782 : : {
783 : 0 : Log("can't create SSL object");
784 : 0 : return false;
785 : : }
786 : :
787 : 0 : BIO* bio = BIO_new_socket(socket, BIO_NOCLOSE);
788 : 0 : BIO_set_nbio(bio, 1);
789 : 0 : SSL_set_bio(ssl, bio, bio);
790 : : }
791 : :
792 : : int success;
793 [ # # ]: 0 : if ( server )
794 : 0 : success = last_ret = SSL_accept(ssl);
795 : : else
796 : 0 : success = last_ret = SSL_connect(ssl);
797 : :
798 [ # # ]: 0 : if ( success > 0 )
799 : : { // handshake done
800 : 0 : setup = true;
801 : 0 : return true;
802 : : }
803 : :
804 : 0 : int error = SSL_get_error(ssl, success);
805 : :
806 [ # # # # ]: 0 : if ( success <= 0 &&
[ # # ]
807 : : (error == SSL_ERROR_WANT_WRITE || error == SSL_ERROR_WANT_READ) )
808 : : // Handshake not finished yet, but that's ok for now.
809 : 0 : return true;
810 : :
811 : : // Some error.
812 : 0 : eof = true;
813 : 0 : return false;
814 : : }
815 : :
816 : 0 : bool ChunkedIOSSL::Write(Chunk* chunk)
817 : : {
818 : : #ifdef DEBUG
819 : 0 : DBG_LOG(DBG_CHUNKEDIO, "ssl write of size %d [%s]",
820 : : chunk->len, fmt_bytes(chunk->data, 20));
821 : : #endif
822 : :
823 : : // Reject if our queue of pending chunks is way too large. Otherwise,
824 : : // memory could fill up if the other side doesn't read.
825 [ # # ]: 0 : if ( stats.pending > MAX_BUFFERED_CHUNKS )
826 : : {
827 : 0 : DBG_LOG(DBG_CHUNKEDIO, "write queue full");
828 : 0 : errno = ENOSPC;
829 : 0 : return false;
830 : : }
831 : :
832 : : // Queue it.
833 : 0 : ++stats.pending;
834 : 0 : Queue* q = new Queue;
835 : 0 : q->chunk = chunk;
836 : 0 : q->next = 0;
837 : :
838 : : // Temporarily convert len into network byte order.
839 : 0 : chunk->len = htonl(chunk->len);
840 : :
841 [ # # ]: 0 : if ( write_tail )
842 : : {
843 : 0 : write_tail->next = q;
844 : 0 : write_tail = q;
845 : : }
846 : : else
847 : 0 : write_head = write_tail = q;
848 : :
849 : 0 : Flush();
850 : 0 : return true;
851 : : }
852 : :
853 : 0 : bool ChunkedIOSSL::WriteData(char* p, uint32 len, bool* error)
854 : : {
855 : 0 : *error = false;
856 : :
857 : 0 : double t = current_time();
858 : :
859 : 0 : int written = last_ret = SSL_write(ssl, p, len);
860 : :
861 [ # # # # : 0 : switch ( SSL_get_error(ssl, written) ) {
# ]
862 : : case SSL_ERROR_NONE:
863 : : // SSL guarantees us that all bytes have been written.
864 : : // That's nice. :-)
865 : 0 : return true;
866 : :
867 : : case SSL_ERROR_WANT_READ:
868 : : case SSL_ERROR_WANT_WRITE:
869 : : // Would block.
870 : 0 : DBG_LOG(DBG_CHUNKEDIO,
871 : : "SSL_write: SSL_ERROR_WANT_READ [%d,%d]",
872 : : written, SSL_get_error(ssl, written));
873 : 0 : *error = false;
874 : 0 : return false;
875 : :
876 : : case SSL_ERROR_ZERO_RETURN:
877 : : // Regular remote connection shutdown.
878 : 0 : DBG_LOG(DBG_CHUNKEDIO,
879 : : "SSL_write: SSL_ZERO_RETURN [%d,%d]",
880 : : written, SSL_get_error(ssl, written));
881 : 0 : *error = eof = true;
882 : 0 : return false;
883 : :
884 : : case SSL_ERROR_SYSCALL:
885 : 0 : DBG_LOG(DBG_CHUNKEDIO,
886 : : "SSL_write: SSL_SYS_CALL [%d,%d]",
887 : : written, SSL_get_error(ssl, written));
888 : :
889 [ # # ]: 0 : if ( written == 0 )
890 : : {
891 : : // Socket connection closed.
892 : 0 : *error = eof = true;
893 : 0 : return false;
894 : : }
895 : :
896 : : // Fall through.
897 : :
898 : : default:
899 : 0 : DBG_LOG(DBG_CHUNKEDIO,
900 : : "SSL_write: fatal error [%d,%d]",
901 : : written, SSL_get_error(ssl, written));
902 : : // Fatal SSL error.
903 : 0 : *error = true;
904 : 0 : return false;
905 : : }
906 : :
907 : : internal_error("can't be reached");
908 : : return false;
909 : : }
910 : :
911 : 0 : bool ChunkedIOSSL::Flush()
912 : : {
913 [ # # ]: 0 : if ( ! setup )
914 : : {
915 : : // We may need to finish the handshake.
916 [ # # ]: 0 : if ( ! Init() )
917 : 0 : return false;
918 [ # # ]: 0 : if ( ! setup )
919 : 0 : return true;
920 : : }
921 : :
922 [ # # ]: 0 : while ( write_head )
923 : : {
924 : : bool error;
925 : :
926 : 0 : Chunk* c = write_head->chunk;
927 : :
928 [ # # ]: 0 : if ( write_state == LEN )
929 : : {
930 [ # # ]: 0 : if ( ! WriteData((char*)&c->len, sizeof(c->len), &error) )
931 : 0 : return ! error;
932 : 0 : write_state = DATA;
933 : :
934 : : // Convert back from network byte order.
935 : 0 : c->len = ntohl(c->len);
936 : : }
937 : :
938 [ # # ]: 0 : if ( ! WriteData(c->data, c->len, &error) )
939 : 0 : return ! error;
940 : :
941 : : // Chunk written, throw away.
942 : 0 : Queue* q = write_head;
943 : 0 : write_head = write_head->next;
944 [ # # ]: 0 : if ( ! write_head )
945 : 0 : write_tail = 0;
946 : 0 : --stats.pending;
947 : 0 : delete q;
948 : :
949 [ # # ]: 0 : delete [] c->data;
950 : 0 : delete c;
951 : :
952 : 0 : write_state = LEN;
953 : : }
954 : :
955 : 0 : return true;
956 : : }
957 : :
958 : 0 : bool ChunkedIOSSL::ReadData(char* p, uint32 len, bool* error)
959 : : {
960 [ # # ]: 0 : if ( ! read_ptr )
961 : 0 : read_ptr = p;
962 : :
963 : 0 : while ( true )
964 : : {
965 : 0 : double t = current_time();
966 : :
967 : : int read = last_ret =
968 : 0 : SSL_read(ssl, read_ptr, len - (read_ptr - p));
969 : :
970 [ # # # # : 0 : switch ( SSL_get_error(ssl, read) ) {
# ]
971 : : case SSL_ERROR_NONE:
972 : : // We're fine.
973 : 0 : read_ptr += read;
974 : :
975 [ # # ]: 0 : if ( unsigned(read_ptr - p) == len )
976 : : {
977 : : // We have read as much as requested..
978 : 0 : read_ptr = 0;
979 : 0 : *error = false;
980 : 0 : return true;
981 : : }
982 : :
983 : : break;
984 : :
985 : : case SSL_ERROR_WANT_READ:
986 : : case SSL_ERROR_WANT_WRITE:
987 : : // Would block.
988 : 0 : DBG_LOG(DBG_CHUNKEDIO,
989 : : "SSL_read: SSL_ERROR_WANT_READ [%d,%d]",
990 : : read, SSL_get_error(ssl, read));
991 : 0 : *error = false;
992 : 0 : return false;
993 : :
994 : : case SSL_ERROR_ZERO_RETURN:
995 : : // Regular remote connection shutdown.
996 : 0 : DBG_LOG(DBG_CHUNKEDIO,
997 : : "SSL_read: SSL_ZERO_RETURN [%d,%d]",
998 : : read, SSL_get_error(ssl, read));
999 : 0 : *error = eof = true;
1000 : 0 : return false;
1001 : :
1002 : : case SSL_ERROR_SYSCALL:
1003 : 0 : DBG_LOG(DBG_CHUNKEDIO, "SSL_read: SSL_SYS_CALL [%d,%d]",
1004 : : read, SSL_get_error(ssl, read));
1005 : :
1006 [ # # ]: 0 : if ( read == 0 )
1007 : : {
1008 : : // Socket connection closed.
1009 : 0 : *error = eof = true;
1010 : 0 : return false;
1011 : : }
1012 : :
1013 : : // Fall through.
1014 : :
1015 : : default:
1016 : 0 : DBG_LOG(DBG_CHUNKEDIO,
1017 : : "SSL_read: fatal error [%d,%d]",
1018 : : read, SSL_get_error(ssl, read));
1019 : :
1020 : : // Fatal SSL error.
1021 : 0 : *error = true;
1022 : 0 : return false;
1023 : : }
1024 : : }
1025 : :
1026 : : // Can't be reached.
1027 : : internal_error("can't be reached");
1028 : : return false;
1029 : : }
1030 : :
1031 : 0 : bool ChunkedIOSSL::Read(Chunk** chunk, bool mayblock)
1032 : : {
1033 : 0 : *chunk = 0;
1034 : :
1035 [ # # ]: 0 : if ( ! setup )
1036 : : {
1037 : : // We may need to finish the handshake.
1038 [ # # ]: 0 : if ( ! Init() )
1039 : 0 : return false;
1040 [ # # ]: 0 : if ( ! setup )
1041 : 0 : return true;
1042 : : }
1043 : :
1044 : : bool error;
1045 : :
1046 : 0 : Flush();
1047 : :
1048 [ # # ]: 0 : if ( read_state == LEN )
1049 : : {
1050 [ # # ]: 0 : if ( ! read_chunk )
1051 : : {
1052 : 0 : read_chunk = new Chunk;
1053 : 0 : read_chunk->data = 0;
1054 : : }
1055 : :
1056 [ # # ]: 0 : if ( ! ReadData((char*)&read_chunk->len,
1057 : : sizeof(read_chunk->len),
1058 : : &error) )
1059 : 0 : return ! error;
1060 : :
1061 : 0 : read_state = DATA;
1062 : 0 : read_chunk->len = ntohl(read_chunk->len);
1063 : : }
1064 : :
1065 [ # # ]: 0 : if ( ! read_chunk->data )
1066 : 0 : read_chunk->data = new char[read_chunk->len];
1067 : :
1068 [ # # ]: 0 : if ( ! ReadData(read_chunk->data, read_chunk->len, &error) )
1069 : 0 : return ! error;
1070 : :
1071 : : // Chunk fully read. Pass it on.
1072 : 0 : *chunk = read_chunk;
1073 : 0 : read_chunk = 0;
1074 : 0 : read_state = LEN;
1075 : :
1076 : : #ifdef DEBUG
1077 [ # # ]: 0 : if ( *chunk )
1078 : 0 : DBG_LOG(DBG_CHUNKEDIO, "ssl read of size %d [%s]",
1079 : : (*chunk)->len, fmt_bytes((*chunk)->data, 20));
1080 : : #endif
1081 : :
1082 : 0 : return true;
1083 : : }
1084 : :
1085 : 0 : bool ChunkedIOSSL::CanRead()
1086 : : {
1087 : : // We will be called regularly. So take the opportunity
1088 : : // to flush the write buffer.
1089 : 0 : Flush();
1090 : :
1091 [ # # ]: 0 : if ( SSL_pending(ssl) )
1092 : 0 : return true;
1093 : :
1094 : : fd_set fd_read;
1095 : 0 : FD_ZERO(&fd_read);
1096 : 0 : FD_SET(socket, &fd_read);
1097 : :
1098 : : struct timeval notimeout;
1099 : 0 : notimeout.tv_sec = 0;
1100 : 0 : notimeout.tv_usec = 0;
1101 : :
1102 : 0 : return select(socket + 1, &fd_read, NULL, NULL, ¬imeout) > 0;
1103 : : }
1104 : :
1105 : 0 : bool ChunkedIOSSL::CanWrite()
1106 : : {
1107 : 0 : return write_head != 0;
1108 : : }
1109 : :
1110 : 0 : bool ChunkedIOSSL::IsIdle()
1111 : : {
1112 [ # # ][ # # ]: 0 : return ! (CanRead() || CanWrite());
1113 : : }
1114 : :
1115 : 0 : bool ChunkedIOSSL::IsFillingUp()
1116 : : {
1117 : : // We don't really need this at the moment (since SSL is only used for
1118 : : // peer-to-peer communication). Thus, we always return false for now.
1119 : 0 : return false;
1120 : : }
1121 : :
1122 : 0 : void ChunkedIOSSL::Clear()
1123 : : {
1124 [ # # ]: 0 : while ( write_head )
1125 : : {
1126 : 0 : Queue* next = write_head->next;
1127 [ # # ]: 0 : delete [] write_head->chunk->data;
1128 : 0 : delete write_head->chunk;
1129 : 0 : delete write_head;
1130 : 0 : write_head = next;
1131 : : }
1132 : 0 : write_head = write_tail = 0;
1133 : 0 : }
1134 : :
1135 : 0 : const char* ChunkedIOSSL::Error()
1136 : : {
1137 : 0 : const int BUFLEN = 512;
1138 : : static char buffer[BUFLEN];
1139 : :
1140 : 0 : int sslcode = SSL_get_error(ssl, last_ret);
1141 : 0 : int errcode = ERR_get_error();
1142 : :
1143 : : int count = safe_snprintf(buffer, BUFLEN, "[%d,%d,%d] SSL error: ",
1144 : 0 : errcode, sslcode, last_ret);
1145 : :
1146 [ # # ]: 0 : if ( errcode )
1147 : 0 : ERR_error_string_n(errcode, buffer + count, BUFLEN - count);
1148 : :
1149 [ # # ]: 0 : else if ( sslcode == SSL_ERROR_SYSCALL )
1150 : : {
1151 [ # # ]: 0 : if ( last_ret )
1152 : : // Look at errno.
1153 : : safe_snprintf(buffer + count, BUFLEN - count,
1154 : 0 : "syscall: %s", strerror(errno));
1155 : : else
1156 : : // Errno is not valid in this case.
1157 : : safe_strncpy(buffer + count,
1158 : : "syscall: unexpected end-of-file",
1159 : 0 : BUFLEN - count);
1160 : : }
1161 : : else
1162 : 0 : safe_strncpy(buffer + count, "unknown error", BUFLEN - count);
1163 : :
1164 : 0 : return buffer;
1165 : : }
1166 : :
1167 : 0 : void ChunkedIOSSL::Stats(char* buffer, int length)
1168 : : {
1169 : 0 : int i = safe_snprintf(buffer, length, "pending=%ld ", stats.pending);
1170 : 0 : ChunkedIO::Stats(buffer + i, length - i);
1171 : 0 : }
1172 : :
1173 : : #ifdef HAVE_LIBZ
1174 : :
1175 : 0 : bool CompressedChunkedIO::Init()
1176 : : {
1177 : 0 : zin.zalloc = 0;
1178 : 0 : zin.zfree = 0;
1179 : 0 : zin.opaque = 0;
1180 : :
1181 : 0 : zout.zalloc = 0;
1182 : 0 : zout.zfree = 0;
1183 : 0 : zout.opaque = 0;
1184 : :
1185 : 0 : compress = uncompress = false;
1186 : 0 : error = 0;
1187 : 0 : uncompressed_bytes_read = 0;
1188 : 0 : uncompressed_bytes_written = 0;
1189 : :
1190 : 0 : return true;
1191 : : }
1192 : :
1193 : 0 : bool CompressedChunkedIO::Read(Chunk** chunk, bool may_block)
1194 : : {
1195 [ # # ]: 0 : if ( ! io->Read(chunk, may_block) )
1196 : 0 : return false;
1197 : :
1198 [ # # ]: 0 : if ( ! uncompress )
1199 : 0 : return true;
1200 : :
1201 [ # # ]: 0 : if ( ! *chunk )
1202 : 0 : return true;
1203 : :
1204 : : uint32 uncompressed_len =
1205 : 0 : *(uint32*)((*chunk)->data + (*chunk)->len - sizeof(uint32));
1206 : :
1207 [ # # ]: 0 : if ( uncompressed_len == 0 )
1208 : : {
1209 : : // Not compressed.
1210 : 0 : DBG_LOG(DBG_CHUNKEDIO, "zlib read pass-through: size=%d",
1211 : : (*chunk)->len);
1212 : 0 : return true;
1213 : : }
1214 : :
1215 : 0 : char* uncompressed = new char[uncompressed_len];
1216 : :
1217 : 0 : DBG_LOG(DBG_CHUNKEDIO, "zlib read: size=%d uncompressed=%d",
1218 : : (*chunk)->len, uncompressed_len);
1219 : :
1220 : 0 : zin.next_in = (Bytef*) (*chunk)->data;
1221 : 0 : zin.avail_in = (*chunk)->len - sizeof(uint32);
1222 : 0 : zin.next_out = (Bytef*) uncompressed;
1223 : 0 : zin.avail_out = uncompressed_len;
1224 : :
1225 [ # # ]: 0 : if ( inflate(&zin, Z_SYNC_FLUSH) != Z_OK )
1226 : : {
1227 : 0 : error = zin.msg;
1228 : 0 : return false;
1229 : : }
1230 : :
1231 [ # # ]: 0 : if ( zin.avail_in > 0 )
1232 : : {
1233 : 0 : error = "compressed data longer than expected";
1234 : 0 : return false;
1235 : : }
1236 : :
1237 [ # # ]: 0 : delete [] (*chunk)->data;
1238 : :
1239 : 0 : uncompressed_bytes_read += uncompressed_len;
1240 : :
1241 : 0 : (*chunk)->len = uncompressed_len;
1242 : 0 : (*chunk)->data = uncompressed;
1243 : :
1244 : 0 : return true;
1245 : : }
1246 : :
1247 : 0 : bool CompressedChunkedIO::Write(Chunk* chunk)
1248 : : {
1249 [ # # ][ # # ]: 0 : if ( (! compress) || IsPure() )
[ # # ]
1250 : : // No compression.
1251 : 0 : return io->Write(chunk);
1252 : :
1253 : : // We compress block-wise (rather than stream-wise) because:
1254 : : //
1255 : : // (1) it's significantly easier to implement due to our block-oriented
1256 : : // communication model (with a stream compression, we'd need to chop
1257 : : // the stream into blocks during decompression which would require
1258 : : // additional buffering and copying).
1259 : : //
1260 : : // (2) it ensures that we do not introduce any additional latencies (a
1261 : : // stream compression may decide to wait for the next chunk of data
1262 : : // before writing anything out).
1263 : : //
1264 : : // The block-wise compression comes at the cost of a smaller compression
1265 : : // factor.
1266 : : //
1267 : : // A compressed chunk's data looks like this:
1268 : : // char[] compressed data
1269 : : // uint32 uncompressed_length
1270 : : //
1271 : : // By including uncompressed_length, we again trade easier
1272 : : // decompression for a smaller reduction factor. If uncompressed_length
1273 : : // is zero, the data is *not* compressed.
1274 : :
1275 : 0 : uncompressed_bytes_written += chunk->len;
1276 : 0 : uint32 original_size = chunk->len;
1277 : :
1278 : 0 : char* compressed = new char[chunk->len + sizeof(uint32)];
1279 : :
1280 [ # # ]: 0 : if ( chunk->len < MIN_COMPRESS_SIZE )
1281 : : {
1282 : : // Too small; not worth any compression.
1283 : 0 : memcpy(compressed, chunk->data, chunk->len);
1284 : 0 : *(uint32*) (compressed + chunk->len) = 0; // uncompressed_length
1285 : :
1286 [ # # ]: 0 : delete [] chunk->data;
1287 : 0 : chunk->data = compressed;
1288 : 0 : chunk->len += 4;
1289 : :
1290 : 0 : DBG_LOG(DBG_CHUNKEDIO, "zlib write pass-through: size=%d", chunk->len);
1291 : : }
1292 : : else
1293 : : {
1294 : 0 : zout.next_in = (Bytef*) chunk->data;
1295 : 0 : zout.avail_in = chunk->len;
1296 : 0 : zout.next_out = (Bytef*) compressed;
1297 : 0 : zout.avail_out = chunk->len;
1298 : :
1299 [ # # ]: 0 : if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
1300 : : {
1301 : 0 : error = zout.msg;
1302 : 0 : return false;
1303 : : }
1304 : :
1305 [ # # ]: 0 : while ( zout.avail_out == 0 )
1306 : : {
1307 : : // D'oh! Not enough space, i.e., it hasn't got smaller.
1308 : 0 : char* old = compressed;
1309 : 0 : int old_size = (char*) zout.next_out - compressed;
1310 : 0 : int new_size = old_size * 2 + sizeof(uint32);
1311 : :
1312 : 0 : compressed = new char[new_size];
1313 : 0 : memcpy(compressed, old, old_size);
1314 [ # # ]: 0 : delete [] old;
1315 : :
1316 : 0 : zout.next_out = (Bytef*) (compressed + old_size);
1317 : 0 : zout.avail_out = old_size; // Sic! We doubled.
1318 : :
1319 [ # # ]: 0 : if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
1320 : : {
1321 : 0 : error = zout.msg;
1322 : 0 : return false;
1323 : : }
1324 : : }
1325 : :
1326 : 0 : *(uint32*) zout.next_out = original_size; // uncompressed_length
1327 : :
1328 [ # # ]: 0 : delete [] chunk->data;
1329 : 0 : chunk->data = compressed;
1330 : : chunk->len =
1331 : 0 : ((char*) zout.next_out - compressed) + sizeof(uint32);
1332 : :
1333 : 0 : DBG_LOG(DBG_CHUNKEDIO, "zlib write: size=%d compressed=%d",
1334 : : original_size, chunk->len);
1335 : : }
1336 : :
1337 : 0 : return io->Write(chunk);
1338 : : }
1339 : :
1340 : 0 : void CompressedChunkedIO::Stats(char* buffer, int length)
1341 : : {
1342 : 0 : const Statistics* stats = io->Stats();
1343 : :
1344 : : int i = snprintf(buffer, length, "compression=%.2f/%.2f ",
1345 : : uncompressed_bytes_read ? double(stats->bytes_read) / uncompressed_bytes_read : -1,
1346 [ # # ][ # # ]: 0 : uncompressed_bytes_written ? double(stats->bytes_written) / uncompressed_bytes_written : -1 );
1347 : :
1348 : 0 : io->Stats(buffer + i, length - i);
1349 : 0 : buffer[length-1] = '\0';
1350 [ + - ][ + - ]: 6 : }
1351 : 3 :
1352 : : #endif /* HAVE_LIBZ */
|