Changeset 31:bbd688924703 in xplcommon for test
- Timestamp:
- 01/02/13 13:58:25 (12 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
test/testblkstream.cc
r30 r31 37 37 #include <xplcommon/LocalClientSocket.h> 38 38 #include <xplcommon/LocalConnector.h> 39 #include <xplcommon/ BlockingStream.h>39 #include <xplcommon/DataStream.h> 40 40 #include <xplcommon/WaitableEvent.h> 41 41 #include <xplcommon/PseudoRandom.h> … … 59 59 using xplcommon::LocalConnector; 60 60 using xplcommon::BufferedStream; 61 using xplcommon:: BlockingStream;61 using xplcommon::DataStream; 62 62 using xplcommon::PseudoRandom; 63 63 … … 69 69 { 70 70 private: 71 BlockingStream* dataStream; 71 static const uint8_t OP_ADD = 1; 72 73 static const uint8_t OP_SUB = 2; 74 75 static const uint8_t OP_MUL = 3; 76 77 static const uint8_t OP_DIV = 4; 78 79 private: 80 static int32_t perform(int32_t value, uint8_t operation, int32_t x); 81 82 DataStream* dataStream; 72 83 73 84 protected: … … 83 94 //------------------------------------------------------------------------------ 84 95 96 inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x) 97 { 98 switch (operation) { 99 case OP_ADD: 100 return value + x; 101 case OP_SUB: 102 return value - x; 103 case OP_MUL: 104 return value * x; 105 case OP_DIV: 106 return value / x; 107 default: 108 return value; 109 } 110 } 111 112 //------------------------------------------------------------------------------ 113 85 114 inline TestThread::TestThread() : 86 115 dataStream(0) … … 99 128 void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient) 100 129 { 101 static const size_t bufferSize = 1024; 102 103 static const size_t minToWrite = 16; 104 static const size_t maxToWrite = 6124; 105 106 static const size_t reportingInterval = 100000000; 130 static const size_t reportInterval = 100000; 107 131 108 132 const char* prefix = isClient ? "Client" : "Server"; 109 133 110 dataStream = new BlockingStream(stream);134 dataStream = new DataStream(stream); 111 135 PseudoRandom random(seed); 112 136 113 114 size_t numBytesRead = 0;115 size_t nextReportRead = reportingInterval;116 size_t numBytesWritten = 0;117 size_t nextReportWritten = reportingInterval;118 119 137 bool first = true; 120 138 121 unsigned char* buffer = new unsigned char[bufferSize]; 122 123 while(true) { 124 if (isClient || !first) { 125 unsigned dataSize = random.nextUnsigned(maxToWrite, minToWrite); 126 *reinterpret_cast<uint32_t*>(buffer) = dataSize; 127 while(dataSize>0) { 128 unsigned toWrite = min(static_cast<size_t>(dataSize), 129 bufferSize); 130 if (!dataStream->write(buffer, toWrite)) break; 131 dataSize -= toWrite; 132 numBytesWritten += toWrite; 133 if (numBytesWritten>=nextReportWritten) { 134 printf("%s: written %lu bytes\n", prefix, 135 static_cast<unsigned long>(numBytesWritten)); 136 nextReportWritten += reportingInterval; 139 size_t numRequests = 0; 140 size_t nextReportRequests = reportInterval; 141 142 int32_t value = 0; 143 if (isClient) { 144 value = random.next(); 145 dataStream->writeS32(value); 146 } else { 147 value = dataStream->readS32(); 148 } 149 150 int32_t newValue = value; 151 152 while(*dataStream) { 153 if (!first || isClient) { 154 unsigned numOperations = random.nextUnsigned(20, 1); 155 dataStream->writeU16(numOperations); 156 //printf("%s: numOperations=%u\n", prefix, numOperations); 157 for(unsigned i = 0; i<numOperations; ++i) { 158 double r = random.nextDouble(); 159 uint8_t operation = 160 (r<0.4) ? OP_ADD : ((r<0.8) ? OP_SUB : 161 ((r<0.98) ? OP_MUL : OP_DIV)); 162 int32_t x = random.next(); 163 164 dataStream->writeU8(operation); 165 dataStream->writeS32(x); 166 167 newValue = perform(newValue, operation, x); 168 } 169 if (!dataStream->flush()) break; 170 171 if (isClient) { 172 value = newValue; 173 ++numRequests; 174 if (numRequests>=nextReportRequests) { 175 printf("%s: after %lu requests, value=%d\n", 176 prefix, static_cast<unsigned long>(numRequests), 177 value); 178 nextReportRequests += reportInterval; 137 179 } 138 180 } 139 if (!dataStream->flush()) break;140 } 141 142 uint32_t dataSize = 0;143 if (!dataStream->read(&dataSize, sizeof(dataSize))) break;144 assert(dataSize>=minToWrite);145 assert(dataSize<=maxToWrite);146 dataSize -= sizeof(dataSize);147 numBytesRead += 4; 148 while(dataSize>0) {149 unsigned toRead = min(static_cast<size_t>(dataSize),150 bufferSize);151 if (!dataStream->read(buffer, toRead)) break;152 dataSize -= toRead;153 numBytesRead += toRead;154 if (numBytesRead>=nextReportRead) {155 printf("%s: read %lu bytes\n", prefix,156 static_cast<unsigned long>(numBytesRead));157 nextReportRead += reportingInterval;181 } 182 183 unsigned numOperations = dataStream->readU16(); 184 for (unsigned i = 0; *dataStream && i<numOperations; ++i) { 185 uint8_t operation = dataStream->readU8(); 186 int32_t x = dataStream->readS32(); 187 newValue = perform(newValue, operation, x); 188 } 189 190 if (*dataStream) { 191 if (!isClient) { 192 value = newValue; 193 ++numRequests; 194 if (numRequests>=nextReportRequests) { 195 printf("%s: after %lu requests, value=%d\n", 196 prefix, static_cast<unsigned long>(numRequests), 197 value); 198 nextReportRequests += reportInterval; 199 } 158 200 } 159 201 } 160 202 203 161 204 first = false; 162 205 } 163 164 delete [] buffer;165 206 166 207 if (dataStream->failed()) { … … 171 212 } 172 213 173 printf("%s: written %lu bytes and read %lu bytes\n", prefix, 174 static_cast<unsigned long>(numBytesWritten), 175 static_cast<unsigned long>(numBytesRead)); 214 printf("%s: value=%d, %lu requests\n", prefix, value, 215 static_cast<unsigned long>(numRequests)); 176 216 } 177 217 … … 275 315 276 316 Thread::sleep(60000); 277 //Thread::sleep( 5000);278 printf("Signalling the serverthread\n");279 serverThread.interrupt();280 //clientThread.interrupt();281 printf("Signalled the serverthread\n");317 //Thread::sleep(1000); 318 printf("Signalling the client thread\n"); 319 //serverThread.interrupt(); 320 clientThread.interrupt(); 321 printf("Signalled the client thread\n"); 282 322 283 323 printf("Waiting for the client thread\n");
Note:
See TracChangeset
for help on using the changeset viewer.