Changeset 31:bbd688924703 in xplcommon
- Timestamp:
- 01/02/13 13:58:25 (12 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 2 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
src/xplcommon/BlockingStream.cc
r30 r31 46 46 //------------------------------------------------------------------------------ 47 47 48 bool BlockingStream::isInterrupted()48 inline bool BlockingStream::checkInterrupted() 49 49 { 50 if (failed()) return false;51 52 50 if (!interrupted) { 53 51 interrupted = event.check(); … … 83 81 //------------------------------------------------------------------------------ 84 82 83 bool BlockingStream::skip(size_t length) 84 { 85 ReadingBuffer& readingBuffer = stream.getReadingBuffer(); 86 while (length>0) { 87 size_t toSkip = min(length, readingBuffer.getLength() - readingOffset); 88 readingOffset += toSkip; 89 length -= toSkip; 90 91 if (length!=0) { 92 if (!fill()) return false; 93 } 94 } 95 96 return true; 97 } 98 99 //------------------------------------------------------------------------------ 100 85 101 bool BlockingStream::write(const void* src, size_t length) 86 102 { … … 105 121 { 106 122 WritingBuffer& writingBuffer = stream.getWritingBuffer(); 107 while ( !failed()) {108 if ( isInterrupted()) return false;123 while (*this) { 124 if (checkInterrupted()) return false; 109 125 110 126 if (writingBuffer.write()) { … … 112 128 } else if (writingBuffer.failed()) { 113 129 setErrorCode(writingBuffer.getErrorCode()); 114 return false;115 130 } else { 116 131 Waiter* waiter = stream.getWaiter(); … … 132 147 readingBuffer.reset(); 133 148 readingOffset = 0; 134 while ( true) {135 if ( isInterrupted()) return false;149 while (*this) { 150 if (checkInterrupted()) return false; 136 151 137 152 if (readingBuffer.read()) { 138 return !readingBuffer.isEmpty(); 153 eof = readingBuffer.isEmpty(); 154 return !eof; 139 155 } else if (readingBuffer.failed()) { 140 156 setErrorCode(readingBuffer.getErrorCode()); 141 return false;142 157 } else { 143 158 Waiter* waiter = stream.getWaiter(); … … 149 164 } 150 165 } 166 return false; 151 167 } 152 168 -
src/xplcommon/BlockingStream.h
r30 r31 35 35 36 36 #include "BufferedStream.h" 37 37 38 #include "Waiter.h" 38 39 #include "WaitableEvent.h" … … 76 77 size_t readingOffset; 77 78 79 /** 80 * Indicate if the end-of-file has been reached while reading. 81 */ 82 bool eof; 83 78 84 public: 79 85 /** … … 83 89 */ 84 90 BlockingStream(BufferedStream& stream); 91 92 /** 93 * Determine if the stream has neither failed nor been 94 * interrupted. 95 */ 96 operator bool() const; 85 97 86 98 /** … … 102 114 */ 103 115 bool read(void* dest, size_t length); 116 117 /** 118 * Skip the given number of bytes. 119 */ 120 bool skip(size_t length); 104 121 105 122 /** … … 125 142 */ 126 143 bool fill(); 144 145 /** 146 * Check for the stream being interrupted. 147 * 148 * @return if the stream is interrupted, false otherwise 149 */ 150 bool checkInterrupted(); 127 151 }; 128 152 … … 135 159 event(stream.getWaiter()), 136 160 interrupted(false), 137 readingOffset(0) 161 readingOffset(0), 162 eof(false) 138 163 { 164 } 165 166 //------------------------------------------------------------------------------ 167 168 inline BlockingStream::operator bool() const 169 { 170 return !failed() && !interrupted && !eof; 139 171 } 140 172 … … 144 176 { 145 177 event.fire(); 178 } 179 180 //------------------------------------------------------------------------------ 181 182 inline bool BlockingStream::isInterrupted() 183 { 184 return interrupted; 146 185 } 147 186 -
src/xplcommon/Makefile.am
r30 r31 11 11 libxplcommon_la_SOURCES= \ 12 12 PseudoRandom.cc \ 13 BlockingStream.cc 13 BlockingStream.cc \ 14 DataStream.cc 14 15 15 16 if TARGET_API_POSIX … … 39 40 LocalConnector.h \ 40 41 LocalClientSocket.h \ 41 BlockingStream.h 42 BlockingStream.h \ 43 DataStream.h -
test/testblkstream.cc
r30 r31 37 37 #include <xplcommon/LocalClientSocket.h> 38 38 #include <xplcommon/LocalConnector.h> 39 #include <xplcommon/ BlockingStream.h>39 #include <xplcommon/DataStream.h> 40 40 #include <xplcommon/WaitableEvent.h> 41 41 #include <xplcommon/PseudoRandom.h> … … 59 59 using xplcommon::LocalConnector; 60 60 using xplcommon::BufferedStream; 61 using xplcommon:: BlockingStream;61 using xplcommon::DataStream; 62 62 using xplcommon::PseudoRandom; 63 63 … … 69 69 { 70 70 private: 71 BlockingStream* dataStream; 71 static const uint8_t OP_ADD = 1; 72 73 static const uint8_t OP_SUB = 2; 74 75 static const uint8_t OP_MUL = 3; 76 77 static const uint8_t OP_DIV = 4; 78 79 private: 80 static int32_t perform(int32_t value, uint8_t operation, int32_t x); 81 82 DataStream* dataStream; 72 83 73 84 protected: … … 83 94 //------------------------------------------------------------------------------ 84 95 96 inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x) 97 { 98 switch (operation) { 99 case OP_ADD: 100 return value + x; 101 case OP_SUB: 102 return value - x; 103 case OP_MUL: 104 return value * x; 105 case OP_DIV: 106 return value / x; 107 default: 108 return value; 109 } 110 } 111 112 //------------------------------------------------------------------------------ 113 85 114 inline TestThread::TestThread() : 86 115 dataStream(0) … … 99 128 void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient) 100 129 { 101 static const size_t bufferSize = 1024; 102 103 static const size_t minToWrite = 16; 104 static const size_t maxToWrite = 6124; 105 106 static const size_t reportingInterval = 100000000; 130 static const size_t reportInterval = 100000; 107 131 108 132 const char* prefix = isClient ? "Client" : "Server"; 109 133 110 dataStream = new BlockingStream(stream);134 dataStream = new DataStream(stream); 111 135 PseudoRandom random(seed); 112 136 113 114 size_t numBytesRead = 0;115 size_t nextReportRead = reportingInterval;116 size_t numBytesWritten = 0;117 size_t nextReportWritten = reportingInterval;118 119 137 bool first = true; 120 138 121 unsigned char* buffer = new unsigned char[bufferSize]; 122 123 while(true) { 124 if (isClient || !first) { 125 unsigned dataSize = random.nextUnsigned(maxToWrite, minToWrite); 126 *reinterpret_cast<uint32_t*>(buffer) = dataSize; 127 while(dataSize>0) { 128 unsigned toWrite = min(static_cast<size_t>(dataSize), 129 bufferSize); 130 if (!dataStream->write(buffer, toWrite)) break; 131 dataSize -= toWrite; 132 numBytesWritten += toWrite; 133 if (numBytesWritten>=nextReportWritten) { 134 printf("%s: written %lu bytes\n", prefix, 135 static_cast<unsigned long>(numBytesWritten)); 136 nextReportWritten += reportingInterval; 139 size_t numRequests = 0; 140 size_t nextReportRequests = reportInterval; 141 142 int32_t value = 0; 143 if (isClient) { 144 value = random.next(); 145 dataStream->writeS32(value); 146 } else { 147 value = dataStream->readS32(); 148 } 149 150 int32_t newValue = value; 151 152 while(*dataStream) { 153 if (!first || isClient) { 154 unsigned numOperations = random.nextUnsigned(20, 1); 155 dataStream->writeU16(numOperations); 156 //printf("%s: numOperations=%u\n", prefix, numOperations); 157 for(unsigned i = 0; i<numOperations; ++i) { 158 double r = random.nextDouble(); 159 uint8_t operation = 160 (r<0.4) ? OP_ADD : ((r<0.8) ? OP_SUB : 161 ((r<0.98) ? OP_MUL : OP_DIV)); 162 int32_t x = random.next(); 163 164 dataStream->writeU8(operation); 165 dataStream->writeS32(x); 166 167 newValue = perform(newValue, operation, x); 168 } 169 if (!dataStream->flush()) break; 170 171 if (isClient) { 172 value = newValue; 173 ++numRequests; 174 if (numRequests>=nextReportRequests) { 175 printf("%s: after %lu requests, value=%d\n", 176 prefix, static_cast<unsigned long>(numRequests), 177 value); 178 nextReportRequests += reportInterval; 137 179 } 138 180 } 139 if (!dataStream->flush()) break;140 } 141 142 uint32_t dataSize = 0;143 if (!dataStream->read(&dataSize, sizeof(dataSize))) break;144 assert(dataSize>=minToWrite);145 assert(dataSize<=maxToWrite);146 dataSize -= sizeof(dataSize);147 numBytesRead += 4; 148 while(dataSize>0) {149 unsigned toRead = min(static_cast<size_t>(dataSize),150 bufferSize);151 if (!dataStream->read(buffer, toRead)) break;152 dataSize -= toRead;153 numBytesRead += toRead;154 if (numBytesRead>=nextReportRead) {155 printf("%s: read %lu bytes\n", prefix,156 static_cast<unsigned long>(numBytesRead));157 nextReportRead += reportingInterval;181 } 182 183 unsigned numOperations = dataStream->readU16(); 184 for (unsigned i = 0; *dataStream && i<numOperations; ++i) { 185 uint8_t operation = dataStream->readU8(); 186 int32_t x = dataStream->readS32(); 187 newValue = perform(newValue, operation, x); 188 } 189 190 if (*dataStream) { 191 if (!isClient) { 192 value = newValue; 193 ++numRequests; 194 if (numRequests>=nextReportRequests) { 195 printf("%s: after %lu requests, value=%d\n", 196 prefix, static_cast<unsigned long>(numRequests), 197 value); 198 nextReportRequests += reportInterval; 199 } 158 200 } 159 201 } 160 202 203 161 204 first = false; 162 205 } 163 164 delete [] buffer;165 206 166 207 if (dataStream->failed()) { … … 171 212 } 172 213 173 printf("%s: written %lu bytes and read %lu bytes\n", prefix, 174 static_cast<unsigned long>(numBytesWritten), 175 static_cast<unsigned long>(numBytesRead)); 214 printf("%s: value=%d, %lu requests\n", prefix, value, 215 static_cast<unsigned long>(numRequests)); 176 216 } 177 217 … … 275 315 276 316 Thread::sleep(60000); 277 //Thread::sleep( 5000);278 printf("Signalling the serverthread\n");279 serverThread.interrupt();280 //clientThread.interrupt();281 printf("Signalled the serverthread\n");317 //Thread::sleep(1000); 318 printf("Signalling the client thread\n"); 319 //serverThread.interrupt(); 320 clientThread.interrupt(); 321 printf("Signalled the client thread\n"); 282 322 283 323 printf("Waiting for the client thread\n");
Note:
See TracChangeset
for help on using the changeset viewer.