Changeset 21:eb59943050c9 in xplcommon for test
- Timestamp:
- 12/31/12 14:17:32 (12 years ago)
- Branch:
- default
- Phase:
- public
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
test/testlocsock.cc
r19 r21 36 36 #include <xplcommon/LocalSocket.h> 37 37 #include <xplcommon/LocalClientSocket.h> 38 #include <xplcommon/ Connector.h>38 #include <xplcommon/LocalConnector.h> 39 39 #include <xplcommon/ReadingBuffer.h> 40 40 #include <xplcommon/WritingBuffer.h> … … 57 57 using xplcommon::LocalSocket; 58 58 using xplcommon::LocalClientSocket; 59 using xplcommon:: Connector;59 using xplcommon::LocalConnector; 60 60 using xplcommon::ReadingBuffer; 61 61 using xplcommon::WritingBuffer; … … 66 66 //------------------------------------------------------------------------------ 67 67 68 void communicate(ReadingBuffer & readingBuffer, WritingBuffer&writingBuffer,68 void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer, 69 69 Waiter& waiter, unsigned seed, const char* prefix, 70 70 WaitableEvent* quitEvent = 0) … … 74 74 static const size_t rangeToWrite = maxToWrite - minToWrite; 75 75 76 static const size_t reportingInterval = 10000 0;77 78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.9 5);76 static const size_t reportingInterval = 10000; 77 78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.99); 79 79 static const int sleepRemainder = RAND_MAX - sleepThreshold; 80 80 static const unsigned minSleep = 10; … … 93 93 94 94 while(!toQuit) { 95 #if TARGET_API_POSIX 95 96 int r = rand_r(&seed); 97 #endif 98 #if TARGET_API_WIN32 99 int r = rand(); 100 #endif 96 101 size_t toWrite = minToWrite + 97 102 static_cast<size_t>( static_cast<double>(r) * 98 103 rangeToWrite / RAND_MAX ); 99 104 105 size_t lastWriteLength = 0; 106 if (writingBuffer!=0) { 107 lastWriteLength = min(writingBuffer->getCapacity(), toWrite); 108 writingBuffer->reset(); 109 writingBuffer->addLength(lastWriteLength); 110 } 111 100 112 while(toWrite>0) { 101 writingBuffer.reset();102 size_t length = min(writingBuffer.getCapacity(), toWrite);103 writingBuffer.addLength(length);104 105 113 if (quitEvent!=0 && quitEvent->check()) { 106 114 toQuit = true; … … 108 116 } 109 117 110 if (writingBuffer.write()) { 111 numBytesWritten += length; 112 toWrite -= length; 113 length = 0; 114 115 if (numBytesWritten>=nextReportWritten) { 116 printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n", 117 prefix, numBytesWritten, numWriteBlocked, 118 numMillisSlept); 119 nextReportWritten += reportingInterval; 118 bool hasPending = false; 119 120 if (writingBuffer!=0) { 121 size_t loopCount = 0; 122 while(toWrite>0) { 123 ++loopCount; 124 if (writingBuffer->write()) { 125 numBytesWritten += lastWriteLength; 126 toWrite -= lastWriteLength; 127 128 if (numBytesWritten>=nextReportWritten) { 129 printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n", 130 prefix, 131 static_cast<unsigned long>(numBytesWritten), 132 static_cast<unsigned long>(numWriteBlocked), 133 static_cast<unsigned long>(numMillisSlept)); 134 nextReportWritten += reportingInterval; 135 } 136 137 if (toWrite>0) { 138 if (quitEvent!=0 && quitEvent->check()) { 139 toQuit = true; 140 break; 141 } else { 142 lastWriteLength = min(writingBuffer->getCapacity(), toWrite); 143 writingBuffer->addLength(lastWriteLength); 144 if (loopCount>10) { 145 break; 146 } 147 } 148 } 149 } else if (writingBuffer->failed()) { 150 printf("%s: writing failed with error: %lu\n", 151 prefix, 152 static_cast<unsigned long>(writingBuffer->getErrorCode())); 153 toQuit = true; 154 break; 155 } else { 156 ++numWriteBlocked; 157 hasPending = true; 158 break; 159 } 120 160 } 121 } else if (writingBuffer.failed()) { 122 printf("%s: writing failed with error: %lu\n", 123 prefix, 124 static_cast<unsigned long>(writingBuffer.getErrorCode())); 125 toQuit = true; 126 break; 127 } else { 128 ++numWriteBlocked; 161 if (toQuit) break; 129 162 } 130 163 131 if (readingBuffer.read()) { 132 numBytesRead += readingBuffer.getLength(); 133 if (numBytesRead>=nextReportRead) { 134 printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n", 135 prefix, numBytesRead, numReadBlocked, numMillisSlept); 136 nextReportRead += reportingInterval; 164 if (readingBuffer!=0) { 165 size_t loopCount = 0; 166 while(true) { 167 ++loopCount; 168 if (readingBuffer->read()) { 169 size_t l = readingBuffer->getLength(); 170 if (l==0) { 171 printf("%s: reading encountered end of file\n", prefix); 172 toQuit = true; 173 break; 174 } 175 numBytesRead += l; 176 if (numBytesRead>=nextReportRead) { 177 printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n", 178 prefix, 179 static_cast<unsigned long>(numBytesRead), 180 static_cast<unsigned long>(numReadBlocked), 181 static_cast<unsigned long>(numMillisSlept)); 182 nextReportRead += reportingInterval; 183 } 184 readingBuffer->reset(); 185 if (quitEvent!=0 && quitEvent->check()) { 186 toQuit = true; 187 break; 188 } else if (loopCount>10) { 189 break; 190 } 191 } else if (readingBuffer->failed()) { 192 printf("%s: reading failed with error: %lu\n", 193 prefix, 194 static_cast<unsigned long>(readingBuffer->getErrorCode())); 195 toQuit = true; 196 break; 197 } else { 198 ++numReadBlocked; 199 hasPending = true; 200 break; 201 } 137 202 } 138 readingBuffer.reset(); 139 } else if (readingBuffer.failed()) { 140 printf("%s: reading failed with error: %lu\n", 141 prefix, 142 static_cast<unsigned long>(readingBuffer.getErrorCode())); 143 toQuit = true; 144 break; 145 } else { 146 ++numReadBlocked; 203 if (toQuit) break; 147 204 } 148 205 206 #if TARGET_API_POSIX 149 207 r = rand_r(&seed); 208 #endif 209 #if TARGET_API_WIN32 210 r = rand(); 211 #endif 150 212 if (r>=sleepThreshold) { 151 213 unsigned toSleep = minSleep + … … 158 220 } 159 221 160 if ( length!=0) {222 if (toWrite>0 && hasPending) { 161 223 waiter.wait(); 162 224 if (waiter.failed()) { … … 171 233 } 172 234 173 printf("%s: written %zu bytes with %zu blocking\n", 174 prefix, numBytesWritten, numWriteBlocked); 175 printf("%s: read %zu bytes with %zu blocking\n", 176 prefix, numBytesRead, numReadBlocked); 177 printf("%s: slept %zu ms\n", prefix, numMillisSlept); 235 printf("%s: written %lu bytes with %lu blocking\n", 236 prefix, 237 static_cast<unsigned long>(numBytesWritten), 238 static_cast<unsigned long>(numWriteBlocked)); 239 printf("%s: read %lu bytes with %lu blocking\n", 240 prefix, 241 static_cast<unsigned long>(numBytesRead), 242 static_cast<unsigned long>(numReadBlocked)); 243 printf("%s: slept %lu ms\n", prefix, 244 static_cast<unsigned long>(numMillisSlept)); 178 245 } 179 246 … … 219 286 LocalAcceptor& acceptor = serverSocket.getAcceptor(); 220 287 288 bool eventFired = false; 221 289 while(!acceptor.accept()) { 290 eventFired = event.check(); 291 if (eventFired) break; 222 292 if (acceptor.failed()) { 223 printf("ServerThread::run: acceptor failed...\n"); 293 printf("ServerThread::run: acceptor failed...: %lu\n", 294 (unsigned long)acceptor.getErrorCode()); 224 295 return; 225 296 } … … 228 299 } 229 300 301 if (eventFired) { 302 printf("ServerThread::run: waiting done, quitting\n"); 303 return; 304 } 305 230 306 printf("ServerThread::run: waiting done, received connection\n"); 231 307 LocalSocket* socket = acceptor.getSocket(); 232 308 233 communicate( socket->getReadingBuffer(),socket->getWritingBuffer(),309 communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(), 234 310 waiter, seed, "ServerThread", &event); 235 311 … … 243 319 class ClientThread : public Thread 244 320 { 321 private: 322 Waiter waiter; 323 324 WaitableEvent event; 325 245 326 public: 327 ClientThread(); 328 329 void signal(); 330 246 331 virtual void run(); 247 332 }; 333 334 //------------------------------------------------------------------------------ 335 336 ClientThread::ClientThread() : 337 event(&waiter) 338 { 339 } 340 341 //------------------------------------------------------------------------------ 342 343 void ClientThread::signal() 344 { 345 event.fire(); 346 } 248 347 249 348 //------------------------------------------------------------------------------ … … 258 357 printf("ClientThread::run: connecting\n"); 259 358 260 Waiter waiter;261 262 359 LocalClientSocket socket("test", &waiter); 263 360 264 Connector& connector = socket.getConnector();361 LocalConnector& connector = socket.getConnector(); 265 362 266 363 while(!connector.connect()) { … … 275 372 printf("ClientThread::run: connected\n"); 276 373 277 communicate( socket.getReadingBuffer(),socket.getWritingBuffer(),278 waiter, seed, "ClientThread" );374 communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(), 375 waiter, seed, "ClientThread", &event); 279 376 } 280 377 … … 284 381 int main() 285 382 { 383 #if TARGET_API_POSIX 286 384 signal(SIGPIPE, SIG_IGN); 385 #endif 287 386 288 387 ServerThread serverThread; … … 294 393 295 394 Thread::sleep(60000); 296 serverThread.signal(); 395 //Thread::sleep(5000); 396 printf("Signalling the server thread\n"); 397 //serverThread.signal(); 398 clientThread.signal(); 297 399 printf("Signalled the server thread\n"); 298 400
Note:
See TracChangeset
for help on using the changeset viewer.