Changeset 30:1dde7e03353f in xplcommon
- Timestamp:
- 01/02/13 11:54:56 (12 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 1 deleted
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
src/xplcommon/BlockingStream.cc
r29 r30 68 68 ReadingBuffer& readingBuffer = stream.getReadingBuffer(); 69 69 while (length>0) { 70 size_t toCopy = min(readingBuffer.getLength() - readingOffset, length); 71 if (toCopy>0) { 72 memcpy(d, readingBuffer.getData() + readingOffset, toCopy); 73 readingOffset += toCopy; 74 length -= toCopy; 75 d += toCopy; 70 size_t copied = readingBuffer.extract(d, length, readingOffset); 71 readingOffset += copied; 72 length -= copied; 73 d += copied; 74 75 if (length!=0) { 76 if (!fill()) return false; 76 77 } 77 78 if (length==0) break;79 80 readingBuffer.reset();81 readingOffset = 0;82 while (!failed()) {83 if (isInterrupted()) return false;84 85 if (readingBuffer.read()) {86 if (readingBuffer.isEmpty()) return false;87 break;88 } else if (readingBuffer.failed()) {89 setErrorCode(readingBuffer.getErrorCode());90 } else {91 Waiter* waiter = stream.getWaiter();92 waiter->wait();93 if (waiter->failed()) {94 setErrorCode(waiter->getErrorCode());95 }96 }97 }98 if (failed()) return false;99 78 } 100 79 … … 110 89 WritingBuffer& writingBuffer = stream.getWritingBuffer(); 111 90 while(length>0) { 112 size_t toCopy = min(length, writingBuffer.getAvailable()); 113 if (toCopy>0) { 114 memcpy(writingBuffer.getData() + writingBuffer.getLength(), 115 s, toCopy); 116 writingBuffer.addLength(toCopy); 117 length -= toCopy; 118 s += toCopy; 119 } 91 size_t copied = writingBuffer.append(s, length); 92 length -= copied; 93 s += copied; 120 94 121 95 if (length==0) break; … … 152 126 //------------------------------------------------------------------------------ 153 127 128 bool BlockingStream::fill() 129 { 130 ReadingBuffer& readingBuffer = stream.getReadingBuffer(); 131 132 readingBuffer.reset(); 133 readingOffset = 0; 134 while (true) { 135 if (isInterrupted()) return false; 136 137 if (readingBuffer.read()) { 138 return !readingBuffer.isEmpty(); 139 } else if (readingBuffer.failed()) { 140 setErrorCode(readingBuffer.getErrorCode()); 141 return false; 142 } else { 143 Waiter* waiter = stream.getWaiter(); 144 waiter->wait(); 145 if (waiter->failed()) { 146 setErrorCode(waiter->getErrorCode()); 147 return false; 148 } 149 } 150 } 151 } 152 153 //------------------------------------------------------------------------------ 154 154 155 // Local Variables: 155 156 // mode: C++ -
src/xplcommon/BlockingStream.h
r29 r30 119 119 */ 120 120 bool flush(); 121 122 private: 123 /** 124 * Fill the buffer with data from the stream. 125 */ 126 bool fill(); 121 127 }; 122 128 -
src/xplcommon/Buffer.h
r29 r30 35 35 36 36 #include <cstdlib> 37 #include <cstring> 37 38 38 39 //------------------------------------------------------------------------------ … … 130 131 * @return the number of bytes actually extracted. 131 132 */ 132 size_t extract(void* dest, size_t size, size_t offset = 0) ;133 size_t extract(void* dest, size_t size, size_t offset = 0) const; 133 134 }; 134 135 … … 207 208 length += toAdd; 208 209 return toAdd; 210 } 211 212 //------------------------------------------------------------------------------ 213 214 inline size_t Buffer::append(const void* src, size_t size) 215 { 216 size_t toCopy = std::min(size, getAvailable()); 217 memcpy(data + length, src, toCopy); 218 length += toCopy; 219 return toCopy; 220 } 221 222 //------------------------------------------------------------------------------ 223 224 inline size_t Buffer::extract(void* dest, size_t size, size_t offset) const 225 { 226 size_t toCopy = (offset>=length) ? 0 : std::min(size, length - offset); 227 memcpy(dest, data + offset, toCopy); 228 return toCopy; 209 229 } 210 230 -
src/xplcommon/Makefile.am
r29 r30 11 11 libxplcommon_la_SOURCES= \ 12 12 PseudoRandom.cc \ 13 Buffer.cc \14 13 BlockingStream.cc 15 14 -
test/testblkstream.cc
r29 r30 274 274 clientThread.start(); 275 275 276 //Thread::sleep(60000);277 Thread::sleep(5000);276 Thread::sleep(60000); 277 //Thread::sleep(5000); 278 278 printf("Signalling the server thread\n"); 279 279 serverThread.interrupt();
Note:
See TracChangeset
for help on using the changeset viewer.