Changeset 19:54c3f68d2d46 in xplcommon
- Timestamp:
- 12/29/12 18:47:52 (12 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
src/xplcommon/posix/ServerSocket.cc
r14 r19 48 48 { 49 49 if (fd<0) setErrorCodeFromErrno(); 50 else if (waiter!=0) setNonBlocking(fd); 50 51 } 51 52 -
src/xplcommon/posix/Socket.h
r16 r19 89 89 { 90 90 if (fd<0) setErrorCodeFromErrno(); 91 else if (waiter!=0) setNonBlocking(fd); 91 92 } 92 93 … … 97 98 BufferedStream(waiter, fd, readingCapacity, writingCapacity) 98 99 { 100 if (waiter!=0) setNonBlocking(fd); 99 101 } 100 102 -
test/testlocsock.cc
r18 r19 39 39 #include <xplcommon/ReadingBuffer.h> 40 40 #include <xplcommon/WritingBuffer.h> 41 41 #include <xplcommon/WaitableEvent.h> 42 43 #include <cstdlib> 42 44 #include <cstdio> 45 #include <cassert> 46 47 #include <signal.h> 48 49 #include <sys/time.h> 43 50 44 51 //------------------------------------------------------------------------------ … … 51 58 using xplcommon::LocalClientSocket; 52 59 using xplcommon::Connector; 60 using xplcommon::ReadingBuffer; 61 using xplcommon::WritingBuffer; 62 using xplcommon::WaitableEvent; 63 64 using std::min; 65 66 //------------------------------------------------------------------------------ 67 68 void communicate(ReadingBuffer& readingBuffer, WritingBuffer& writingBuffer, 69 Waiter& waiter, unsigned seed, const char* prefix, 70 WaitableEvent* quitEvent = 0) 71 { 72 static const size_t minToWrite = 2076; 73 static const size_t maxToWrite = 1345670; 74 static const size_t rangeToWrite = maxToWrite - minToWrite; 75 76 static const size_t reportingInterval = 100000; 77 78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.95); 79 static const int sleepRemainder = RAND_MAX - sleepThreshold; 80 static const unsigned minSleep = 10; 81 static const unsigned maxSleep = 250; 82 static const unsigned sleepRange = maxSleep - minSleep; 83 84 size_t numBytesRead = 0; 85 size_t nextReportRead = reportingInterval; 86 size_t numWriteBlocked = 0; 87 size_t numBytesWritten = 0; 88 size_t nextReportWritten = reportingInterval; 89 size_t numReadBlocked = 0; 90 size_t numMillisSlept = 0; 91 92 bool toQuit = false; 93 94 while(!toQuit) { 95 int r = rand_r(&seed); 96 size_t toWrite = minToWrite + 97 static_cast<size_t>( static_cast<double>(r) * 98 rangeToWrite / RAND_MAX ); 99 100 while(toWrite>0) { 101 writingBuffer.reset(); 102 size_t length = min(writingBuffer.getCapacity(), toWrite); 103 writingBuffer.addLength(length); 104 105 if (quitEvent!=0 && quitEvent->check()) { 106 toQuit = true; 107 break; 108 } 109 110 if (writingBuffer.write()) { 111 numBytesWritten += length; 112 toWrite -= length; 113 length = 0; 114 115 if (numBytesWritten>=nextReportWritten) { 116 printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n", 117 prefix, numBytesWritten, numWriteBlocked, 118 numMillisSlept); 119 nextReportWritten += reportingInterval; 120 } 121 } else if (writingBuffer.failed()) { 122 printf("%s: writing failed with error: %lu\n", 123 prefix, 124 static_cast<unsigned long>(writingBuffer.getErrorCode())); 125 toQuit = true; 126 break; 127 } else { 128 ++numWriteBlocked; 129 } 130 131 if (readingBuffer.read()) { 132 numBytesRead += readingBuffer.getLength(); 133 if (numBytesRead>=nextReportRead) { 134 printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n", 135 prefix, numBytesRead, numReadBlocked, numMillisSlept); 136 nextReportRead += reportingInterval; 137 } 138 readingBuffer.reset(); 139 } else if (readingBuffer.failed()) { 140 printf("%s: reading failed with error: %lu\n", 141 prefix, 142 static_cast<unsigned long>(readingBuffer.getErrorCode())); 143 toQuit = true; 144 break; 145 } else { 146 ++numReadBlocked; 147 } 148 149 r = rand_r(&seed); 150 if (r>=sleepThreshold) { 151 unsigned toSleep = minSleep + 152 static_cast<unsigned>(static_cast<double>(r - sleepThreshold) * 153 sleepRange / sleepRemainder); 154 assert(toSleep>=minSleep); 155 assert(toSleep<maxSleep); 156 Thread::sleep(toSleep); 157 numMillisSlept += toSleep; 158 } 159 160 if (length!=0) { 161 waiter.wait(); 162 if (waiter.failed()) { 163 printf("%s: waiting failed with error: %lu\n", 164 prefix, 165 static_cast<unsigned long>(waiter.getErrorCode())); 166 toQuit = true; 167 break; 168 } 169 } 170 } 171 } 172 173 printf("%s: written %zu bytes with %zu blocking\n", 174 prefix, numBytesWritten, numWriteBlocked); 175 printf("%s: read %zu bytes with %zu blocking\n", 176 prefix, numBytesRead, numReadBlocked); 177 printf("%s: slept %zu ms\n", prefix, numMillisSlept); 178 } 53 179 54 180 //------------------------------------------------------------------------------ … … 56 182 class ServerThread : public Thread 57 183 { 184 private: 185 Waiter waiter; 186 187 WaitableEvent event; 188 189 public: 190 ServerThread(); 191 192 void signal(); 193 58 194 virtual void run(); 59 195 }; … … 61 197 //------------------------------------------------------------------------------ 62 198 199 ServerThread::ServerThread() : 200 event(&waiter) 201 { 202 } 203 204 //------------------------------------------------------------------------------ 205 206 void ServerThread::signal() 207 { 208 event.fire(); 209 } 210 211 //------------------------------------------------------------------------------ 212 63 213 void ServerThread::run() 64 214 { 65 printf("ServerThread::run\n"); 66 67 Waiter waiter; 215 unsigned seed = static_cast<unsigned>(time(0)*1.2); 216 printf("ServerThread::run: seed=%u\n", seed); 68 217 69 218 LocalServerSocket serverSocket("test", &waiter); 70 219 LocalAcceptor& acceptor = serverSocket.getAcceptor(); 71 220 72 while(true) { 73 while(!acceptor.accept()) { 74 if (acceptor.failed()) { 75 printf("ServerThread::run: acceptor failed...\n"); 76 return; 77 } 78 printf("ServerThread::run: waiting...\n"); 79 waiter.wait(); 221 while(!acceptor.accept()) { 222 if (acceptor.failed()) { 223 printf("ServerThread::run: acceptor failed...\n"); 224 return; 80 225 } 81 82 printf("ServerThread::run: waiting done, received connection\n"); 83 LocalSocket* socket = acceptor.getSocket(); 84 delete socket; 85 break; 226 printf("ServerThread::run: waiting...\n"); 227 waiter.wait(); 86 228 } 229 230 printf("ServerThread::run: waiting done, received connection\n"); 231 LocalSocket* socket = acceptor.getSocket(); 232 233 communicate(socket->getReadingBuffer(), socket->getWritingBuffer(), 234 waiter, seed, "ServerThread", &event); 235 236 237 delete socket; 87 238 } 88 239 … … 100 251 void ClientThread::run() 101 252 { 253 unsigned seed = static_cast<unsigned>(time(0)*1.8); 254 printf("ClientThread::run: seed=%u\n", seed); 255 102 256 printf("ClientThread::run: sleeping\n"); 103 sleep( 2000);257 sleep(500); 104 258 printf("ClientThread::run: connecting\n"); 105 259 … … 120 274 121 275 printf("ClientThread::run: connected\n"); 276 277 communicate(socket.getReadingBuffer(), socket.getWritingBuffer(), 278 waiter, seed, "ClientThread"); 122 279 } 123 280 … … 127 284 int main() 128 285 { 286 signal(SIGPIPE, SIG_IGN); 287 129 288 ServerThread serverThread; 130 289 ClientThread clientThread; … … 133 292 serverThread.start(); 134 293 clientThread.start(); 294 295 Thread::sleep(60000); 296 serverThread.signal(); 297 printf("Signalled the server thread\n"); 135 298 136 299 printf("Waiting for the client thread\n");
Note:
See TracChangeset
for help on using the changeset viewer.