Branch data Line data Source code
1 : : // $Id: ChunkedIO.h 6888 2009-08-20 18:23:11Z vern $
2 : : //
3 : : // Implements non-blocking chunk-wise I/O.
4 : :
5 : : #ifndef CHUNKEDIO_H
6 : : #define CHUNKEDIO_H
7 : :
8 : : #include "config.h"
9 : : #include "List.h"
10 : : #include "util.h"
11 : :
12 : : #include <list>
13 : :
14 : : #ifdef NEED_KRB5_H
15 : : # include <krb5.h>
16 : : #endif
17 : :
18 : : #include <openssl/ssl.h>
19 : : #include <openssl/err.h>
20 : :
21 : : class CompressedChunkedIO;
22 : :
23 : : // #define DEBUG_COMMUNICATION 10
24 : :
25 : : // Abstract base class.
26 : : class ChunkedIO {
27 : : public:
28 : : ChunkedIO();
29 [ # # ][ # # ]: 0 : virtual ~ChunkedIO() { }
[ # # ]
30 : :
31 : : typedef struct {
32 : : char* data;
33 : : uint32 len;
34 : : } Chunk;
35 : :
36 : : // Initialization before any I/O operation is performed. Returns false
37 : : // on any form of error.
38 : 0 : virtual bool Init() { return true; }
39 : :
40 : : // Tries to read the next chunk of data. If it can be read completely,
41 : : // a pointer to it is returned in 'chunk' (ownership of chunk is
42 : : // passed). If not, 'chunk' is set to nil. Returns false if any
43 : : // I/O error occurred (use Eof() to see if it's an end-of-file).
44 : : // If 'may_block' is true, we explicitly allow blocking.
45 : : virtual bool Read(Chunk** chunk, bool may_block = false) = 0;
46 : :
47 : : // Puts the chunk into the write queue and writes as much data
48 : : // as possible (takes ownership of chunk).
49 : : // Returns false on any I/O error.
50 : : virtual bool Write(Chunk* chunk) = 0;
51 : :
52 : : // Tries to write as much as currently possible.
53 : : // Returns false on any I/O error.
54 : : virtual bool Flush() = 0;
55 : :
56 : : // If an I/O error has been encountered, returns a string describing it.
57 : : virtual const char* Error() = 0;
58 : :
59 : : // Return true if there is currently at least one chunk available
60 : : // for reading.
61 : : virtual bool CanRead() = 0;
62 : :
63 : : // Return true if there is currently at least one chunk waiting to be
64 : : // written.
65 : : virtual bool CanWrite() = 0;
66 : :
67 : : // Returns true if source believes that there won't be much data soon.
68 : : virtual bool IsIdle() = 0;
69 : :
70 : : // Returns true if internal write buffers are about to fill up.
71 : : virtual bool IsFillingUp() = 0;
72 : :
73 : : // Throws away buffered data.
74 : : virtual void Clear() = 0;
75 : :
76 : : // Returns true,if end-of-file has been reached.
77 : : virtual bool Eof() = 0;
78 : :
79 : : // Returns underlying fd if available, -1 otherwise.
80 : 0 : virtual int Fd() { return -1; }
81 : :
82 : : // Makes sure that no additional protocol data is written into
83 : : // the output stream. If this is activated, the output cannot
84 : : // be read again by any of these classes!
85 : 0 : void MakePure() { pure = true; }
86 : 3 : bool IsPure() { return pure; }
87 : :
88 : : // Writes a log message to the error_fd.
89 : : void Log(const char* str);
90 : :
91 : : struct Statistics {
92 : 2 : Statistics()
93 : : {
94 : 2 : bytes_read = 0;
95 : 2 : bytes_written = 0;
96 : 2 : chunks_read = 0;
97 : 2 : chunks_written = 0;
98 : 2 : reads = 0;
99 : 2 : writes = 0;
100 : 2 : pending = 0;
101 : 2 : }
102 : :
103 : : unsigned long bytes_read;
104 : : unsigned long bytes_written;
105 : : unsigned long chunks_read;
106 : : unsigned long chunks_written;
107 : : unsigned long reads; // # calls which transferred > 0 bytes
108 : : unsigned long writes;
109 : : unsigned long pending;
110 : : };
111 : :
112 : : // Returns raw statistics.
113 : 0 : const Statistics* Stats() const { return &stats; }
114 : :
115 : : // Puts a formatted string containing statistics into buffer.
116 : : virtual void Stats(char* buffer, int length);
117 : :
118 : : #ifdef DEBUG_COMMUNICATION
119 : : void DumpDebugData(const char* basefnname, bool want_reads);
120 : : #endif
121 : :
122 : : protected:
123 : : Statistics stats;
124 : : const char* tag;
125 : :
126 : : #ifdef DEBUG_COMMUNICATION
127 : : void AddToBuffer(char* data, bool is_read)
128 : : { AddToBuffer(strlen(data), data, is_read); }
129 : : void AddToBuffer(uint32 len, char* data, bool is_read);
130 : : void AddToBuffer(Chunk* chunk, bool is_read);
131 : : std::list<Chunk*> data_read;
132 : : std::list<Chunk*> data_written;
133 : : #endif
134 : :
135 : : private:
136 : : bool pure;
137 : : };
138 : :
139 : : // Chunked I/O using a file descriptor.
140 : : class ChunkedIOFd : public ChunkedIO {
141 : : public:
142 : : // fd is an open bidirectional file descriptor, tag is used in error
143 : : // messages, and pid gives a pid to monitor (if the process dies, we
144 : : // return EOF).
145 : : ChunkedIOFd(int fd, const char* tag, pid_t pid = 0);
146 : : virtual ~ChunkedIOFd();
147 : :
148 : : virtual bool Read(Chunk** chunk, bool may_block = false);
149 : : virtual bool Write(Chunk* chunk);
150 : : virtual bool Flush();
151 : : virtual const char* Error();
152 : : virtual bool CanRead();
153 : : virtual bool CanWrite();
154 : : virtual bool IsIdle();
155 : : virtual bool IsFillingUp();
156 : : virtual void Clear();
157 : 1 : virtual bool Eof() { return eof; }
158 : 0 : virtual int Fd() { return fd; }
159 : : virtual void Stats(char* buffer, int length);
160 : :
161 : : private:
162 : :
163 : : bool PutIntoWriteBuffer(Chunk* chunk);
164 : : bool FlushWriteBuffer();
165 : : Chunk* ExtractChunk();
166 : :
167 : : // Returns size of next chunk in buffer or 0 if none.
168 : : uint32 ChunkAvailable();
169 : :
170 : : // Flushes if it thinks it is time to.
171 : : bool OptionalFlush();
172 : :
173 : : // Concatenates the the data of the two chunks forming a new one.
174 : : // The old chunkds are deleted.
175 : : Chunk* ConcatChunks(Chunk* c1, Chunk* c2);
176 : :
177 : : // Reads/writes on chunk of upto BUFFER_SIZE bytes.
178 : : bool WriteChunk(Chunk* chunk, bool partial);
179 : : bool ReadChunk(Chunk** chunk, bool may_block);
180 : :
181 : : int fd;
182 : : bool eof;
183 : : double last_flush;
184 : : int failed_reads;
185 : :
186 : : // Optimally, this should match the file descriptor's
187 : : // buffer size (for sockets, it may be helpful to
188 : : // increase the send/receive buffers).
189 : : static const unsigned int BUFFER_SIZE = 1024 * 1024 * 1;
190 : :
191 : : // We 'or' this to the length of a data chunk to mark
192 : : // that it's part of a larger one. This has to be larger
193 : : // than BUFFER_SIZE.
194 : : static const uint32 FLAG_PARTIAL = 0x80000000;
195 : :
196 : : // We report that we're filling up when there are more than this number
197 : : // of pending chunks.
198 : : static const uint32 MAX_BUFFERED_CHUNKS_SOFT = 400000;
199 : :
200 : : // Maximum number of chunks we store in memory before rejecting writes.
201 : : static const uint32 MAX_BUFFERED_CHUNKS = 500000;
202 : :
203 : : char* read_buffer;
204 : : uint32 read_len;
205 : : uint32 read_pos;
206 : : Chunk* partial; // when we read an oversized chunk, we store it here
207 : :
208 : : char* write_buffer;
209 : : uint32 write_len;
210 : : uint32 write_pos;
211 : :
212 : : struct ChunkQueue {
213 : : Chunk* chunk;
214 : : ChunkQueue* next;
215 : : };
216 : :
217 : : // Chunks that don't fit into our write buffer.
218 : : ChunkQueue* pending_head;
219 : : ChunkQueue* pending_tail;
220 : :
221 : : pid_t pid;
222 : : };
223 : :
224 : : // Chunked I/O using an SSL connection.
225 : : class ChunkedIOSSL : public ChunkedIO {
226 : : public:
227 : : // Argument is an open socket and a flag indicating whether we are the
228 : : // server side of the connection.
229 : : ChunkedIOSSL(int socket, bool server);
230 : : virtual ~ChunkedIOSSL();
231 : :
232 : : virtual bool Init();
233 : : virtual bool Read(Chunk** chunk, bool mayblock = false);
234 : : virtual bool Write(Chunk* chunk);
235 : : virtual bool Flush();
236 : : virtual const char* Error();
237 : : virtual bool CanRead();
238 : : virtual bool CanWrite();
239 : : virtual bool IsIdle();
240 : : virtual bool IsFillingUp();
241 : : virtual void Clear();
242 : 0 : virtual bool Eof() { return eof; }
243 : 0 : virtual int Fd() { return socket; }
244 : : virtual void Stats(char* buffer, int length);
245 : :
246 : : private:
247 : : // Maximum number of chunks we store in memory before rejecting writes.
248 : : static const uint32 MAX_BUFFERED_CHUNKS = 500000;
249 : :
250 : : // Only returns true if all data has been read. If not, call
251 : : // it again with the same parameters as long as error is not
252 : : // set to true.
253 : : bool ReadData(char* p, uint32 len, bool* error);
254 : : // Same for writing.
255 : : bool WriteData(char* p, uint32 len, bool* error);
256 : :
257 : : int socket;
258 : : int last_ret; // last error code
259 : : bool eof;
260 : :
261 : : bool server; // are we the server?
262 : : bool setup; // has the connection been setup successfully?
263 : :
264 : : SSL* ssl;
265 : :
266 : : // Write queue.
267 : : struct Queue {
268 : : Chunk* chunk;
269 : : Queue* next;
270 : : };
271 : :
272 : : // The chunk part we are reading/writing
273 : : enum State { LEN, DATA };
274 : :
275 : : State write_state;
276 : : Queue* write_head;
277 : : Queue* write_tail;
278 : :
279 : : State read_state;
280 : : Chunk* read_chunk;
281 : : char* read_ptr;
282 : :
283 : : // One SSL for all connections.
284 : : static SSL_CTX* ctx;
285 : : };
286 : :
287 : : #ifdef HAVE_LIBZ
288 : :
289 : : #include <zlib.h>
290 : :
291 : : // Wrapper class around a another ChunkedIO which the (un-)compresses data.
292 : : class CompressedChunkedIO : public ChunkedIO {
293 : : public:
294 : 0 : CompressedChunkedIO(ChunkedIO* arg_io)
295 : 0 : : io(arg_io) {} // takes ownership
296 [ # # ][ # # ]: 0 : virtual ~CompressedChunkedIO() { delete io; }
[ # # ][ # # ]
297 : :
298 : : virtual bool Init(); // does *not* call arg_io->Init()
299 : : virtual bool Read(Chunk** chunk, bool may_block = false);
300 : : virtual bool Write(Chunk* chunk);
301 : 0 : virtual bool Flush() { return io->Flush(); }
302 [ # # ]: 0 : virtual const char* Error() { return error ? error : io->Error(); }
303 : 0 : virtual bool CanRead() { return io->CanRead(); }
304 : 0 : virtual bool CanWrite() { return io->CanWrite(); }
305 : 0 : virtual bool IsIdle() { return io->IsIdle(); }
306 : 0 : virtual bool IsFillingUp() { return io->IsFillingUp(); }
307 : 0 : virtual void Clear() { return io->Clear(); }
308 : :
309 : 0 : virtual bool Eof() { return io->Eof(); }
310 : 0 : virtual int Fd() { return io->Fd(); }
311 : : virtual void Stats(char* buffer, int length);
312 : :
313 : 0 : void EnableCompression(int level)
314 : 0 : { deflateInit(&zout, level); compress = true; }
315 : 0 : void EnableDecompression()
316 : 0 : { inflateInit(&zin); uncompress = true; }
317 : :
318 : : protected:
319 : : // Only compress block with size >= this.
320 : : static const unsigned int MIN_COMPRESS_SIZE = 30;
321 : :
322 : : ChunkedIO* io;
323 : : z_stream zin;
324 : : z_stream zout;
325 : : const char* error;
326 : :
327 : : bool compress;
328 : : bool uncompress;
329 : :
330 : : // Keep some statistics.
331 : : unsigned long uncompressed_bytes_read;
332 : : unsigned long uncompressed_bytes_written;
333 : : };
334 : :
335 : : #endif /* HAVE_LIBZ */
336 : :
337 : : #endif
|