source: vscpl/test/testlocsock.cc@ 36:f9699d5dafe2

Last change on this file since 36:f9699d5dafe2 was 8:7b2598d1ea55, checked in by István Váradi <ivaradi@…>, 12 years ago

Imported the I/O library for POSIX and the pseudo-random generator

File size: 13.5 KB
Line 
1// Copyright (c) 2012 by István Váradi
2
3// This file is part of libxplcommon, a common utility library for
4// development related to X-Plane
5
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are met:
8
9// 1. Redistributions of source code must retain the above copyright notice, this
10// list of conditions and the following disclaimer.
11// 2. Redistributions in binary form must reproduce the above copyright notice,
12// this list of conditions and the following disclaimer in the documentation
13// and/or other materials provided with the distribution.
14
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26// The views and conclusions contained in the software and documentation are those
27// of the authors and should not be interpreted as representing official policies,
28// either expressed or implied, of the FreeBSD Project.
29
30//------------------------------------------------------------------------------
31
32#include <hu/varadiistvan/scpl/io/Waiter.h>
33#include <hu/varadiistvan/scpl/io/LocalServerSocket.h>
34#include <hu/varadiistvan/scpl/io/LocalAcceptor.h>
35#include <hu/varadiistvan/scpl/io/LocalSocket.h>
36#include <hu/varadiistvan/scpl/io/LocalClientSocket.h>
37#include <hu/varadiistvan/scpl/io/LocalConnector.h>
38#include <hu/varadiistvan/scpl/io/ReadingBuffer.h>
39#include <hu/varadiistvan/scpl/io/WritingBuffer.h>
40#include <hu/varadiistvan/scpl/io/WaitableEvent.h>
41
42#include <hu/varadiistvan/scpl/Thread.h>
43#include <hu/varadiistvan/scpl/PseudoRandom.h>
44
45#include <cstdlib>
46#include <cstdio>
47#include <cassert>
48
49#include <signal.h>
50
51#include <sys/time.h>
52
53//------------------------------------------------------------------------------
54
55using hu::varadiistvan::scpl::io::Waiter;
56using hu::varadiistvan::scpl::io::LocalServerSocket;
57using hu::varadiistvan::scpl::io::LocalAcceptor;
58using hu::varadiistvan::scpl::io::LocalSocket;
59using hu::varadiistvan::scpl::io::LocalClientSocket;
60using hu::varadiistvan::scpl::io::LocalConnector;
61using hu::varadiistvan::scpl::io::ReadingBuffer;
62using hu::varadiistvan::scpl::io::WritingBuffer;
63using hu::varadiistvan::scpl::io::WaitableEvent;
64
65using hu::varadiistvan::scpl::Thread;
66using hu::varadiistvan::scpl::PseudoRandom;
67
68using std::min;
69
70//------------------------------------------------------------------------------
71
72void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer,
73 Waiter& waiter, unsigned seed, const char* prefix,
74 WaitableEvent* quitEvent = 0)
75{
76 static const size_t minToWrite = 2076;
77 static const size_t maxToWrite = 1345670;
78
79 static const size_t reportingInterval = 10000;
80
81 static const int sleepThreshold = static_cast<int>(PseudoRandom::MAX * 0.99);
82 static const int sleepRemainder = PseudoRandom::MAX - sleepThreshold;
83 static const unsigned minSleep = 10;
84 static const unsigned maxSleep = 250;
85 static const unsigned sleepRange = maxSleep - minSleep;
86
87 size_t numBytesRead = 0;
88 size_t nextReportRead = reportingInterval;
89 size_t numWriteBlocked = 0;
90 size_t numBytesWritten = 0;
91 size_t nextReportWritten = reportingInterval;
92 size_t numReadBlocked = 0;
93 size_t numMillisSlept = 0;
94
95 bool toQuit = false;
96
97 PseudoRandom random(seed);
98 while(!toQuit) {
99 size_t toWrite = random.nextUnsigned(maxToWrite, minToWrite);
100
101 size_t lastWriteLength = 0;
102 if (writingBuffer!=0) {
103 lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
104 writingBuffer->reset();
105 writingBuffer->addLength(lastWriteLength);
106 }
107
108 while(toWrite>0) {
109 if (quitEvent!=0 && quitEvent->check()) {
110 toQuit = true;
111 break;
112 }
113
114 bool hasPending = false;
115
116 if (writingBuffer!=0) {
117 size_t loopCount = 0;
118 while(toWrite>0) {
119 ++loopCount;
120 if (writingBuffer->write()) {
121 numBytesWritten += lastWriteLength;
122 toWrite -= lastWriteLength;
123
124 if (numBytesWritten>=nextReportWritten) {
125 printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n",
126 prefix,
127 static_cast<unsigned long>(numBytesWritten),
128 static_cast<unsigned long>(numWriteBlocked),
129 static_cast<unsigned long>(numMillisSlept));
130 nextReportWritten += reportingInterval;
131 }
132
133 if (toWrite>0) {
134 if (quitEvent!=0 && quitEvent->check()) {
135 toQuit = true;
136 break;
137 } else {
138 lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
139 writingBuffer->addLength(lastWriteLength);
140 if (loopCount>10) {
141 break;
142 }
143 }
144 }
145 } else if (writingBuffer->failed()) {
146 printf("%s: writing failed with error: %lu\n",
147 prefix,
148 static_cast<unsigned long>(writingBuffer->getErrorCode()));
149 toQuit = true;
150 break;
151 } else {
152 ++numWriteBlocked;
153 hasPending = true;
154 break;
155 }
156 }
157 if (toQuit) break;
158 }
159
160 if (readingBuffer!=0) {
161 size_t loopCount = 0;
162 while(true) {
163 ++loopCount;
164 if (readingBuffer->read()) {
165 size_t l = readingBuffer->getLength();
166 if (l==0) {
167 printf("%s: reading encountered end of file\n", prefix);
168 toQuit = true;
169 break;
170 }
171 numBytesRead += l;
172 if (numBytesRead>=nextReportRead) {
173 printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n",
174 prefix,
175 static_cast<unsigned long>(numBytesRead),
176 static_cast<unsigned long>(numReadBlocked),
177 static_cast<unsigned long>(numMillisSlept));
178 nextReportRead += reportingInterval;
179 }
180 readingBuffer->reset();
181 if (quitEvent!=0 && quitEvent->check()) {
182 toQuit = true;
183 break;
184 } else if (loopCount>10) {
185 break;
186 }
187 } else if (readingBuffer->failed()) {
188 printf("%s: reading failed with error: %lu\n",
189 prefix,
190 static_cast<unsigned long>(readingBuffer->getErrorCode()));
191 toQuit = true;
192 break;
193 } else {
194 ++numReadBlocked;
195 hasPending = true;
196 break;
197 }
198 }
199 if (toQuit) break;
200 }
201
202 int r = random.next();
203 if (r>=sleepThreshold) {
204 unsigned toSleep = minSleep +
205 static_cast<unsigned>(static_cast<double>(r - sleepThreshold) *
206 sleepRange / sleepRemainder);
207 assert(toSleep>=minSleep);
208 assert(toSleep<maxSleep);
209 Thread::sleep(toSleep);
210 numMillisSlept += toSleep;
211 }
212
213 if (toWrite>0 && hasPending) {
214 waiter.wait();
215 if (waiter.failed()) {
216 printf("%s: waiting failed with error: %lu\n",
217 prefix,
218 static_cast<unsigned long>(waiter.getErrorCode()));
219 toQuit = true;
220 break;
221 }
222 }
223 }
224 }
225
226 printf("%s: written %lu bytes with %lu blocking\n",
227 prefix,
228 static_cast<unsigned long>(numBytesWritten),
229 static_cast<unsigned long>(numWriteBlocked));
230 printf("%s: read %lu bytes with %lu blocking\n",
231 prefix,
232 static_cast<unsigned long>(numBytesRead),
233 static_cast<unsigned long>(numReadBlocked));
234 printf("%s: slept %lu ms\n", prefix,
235 static_cast<unsigned long>(numMillisSlept));
236}
237
238//------------------------------------------------------------------------------
239
240class ServerThread : public Thread
241{
242private:
243 Waiter waiter;
244
245 WaitableEvent event;
246
247public:
248 ServerThread();
249
250 void signal();
251
252 virtual void run();
253};
254
255//------------------------------------------------------------------------------
256
257ServerThread::ServerThread() :
258 event(&waiter)
259{
260}
261
262//------------------------------------------------------------------------------
263
264void ServerThread::signal()
265{
266 event.fire();
267}
268
269//------------------------------------------------------------------------------
270
271void ServerThread::run()
272{
273 unsigned seed = static_cast<unsigned>(time(0)*1.2);
274 printf("ServerThread::run: seed=%u\n", seed);
275
276 LocalServerSocket serverSocket("test", &waiter);
277 LocalAcceptor& acceptor = serverSocket.getAcceptor();
278
279 bool eventFired = false;
280 while(!acceptor.accept()) {
281 eventFired = event.check();
282 if (eventFired) break;
283 if (acceptor.failed()) {
284 printf("ServerThread::run: acceptor failed...: %lu\n",
285 (unsigned long)acceptor.getErrorCode());
286 return;
287 }
288 printf("ServerThread::run: waiting...\n");
289 waiter.wait();
290 }
291
292 if (eventFired) {
293 printf("ServerThread::run: waiting done, quitting\n");
294 return;
295 }
296
297 printf("ServerThread::run: waiting done, received connection\n");
298 LocalSocket* socket = acceptor.getSocket();
299
300 communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(),
301 waiter, seed, "ServerThread", &event);
302
303
304 delete socket;
305}
306
307//------------------------------------------------------------------------------
308//------------------------------------------------------------------------------
309
310class ClientThread : public Thread
311{
312private:
313 Waiter waiter;
314
315 WaitableEvent event;
316
317public:
318 ClientThread();
319
320 void signal();
321
322 virtual void run();
323};
324
325//------------------------------------------------------------------------------
326
327ClientThread::ClientThread() :
328 event(&waiter)
329{
330}
331
332//------------------------------------------------------------------------------
333
334void ClientThread::signal()
335{
336 event.fire();
337}
338
339//------------------------------------------------------------------------------
340
341void ClientThread::run()
342{
343 unsigned seed = static_cast<unsigned>(time(0)*1.8);
344 printf("ClientThread::run: seed=%u\n", seed);
345
346 printf("ClientThread::run: sleeping\n");
347 sleep(500);
348 printf("ClientThread::run: connecting\n");
349
350 LocalClientSocket socket("test", &waiter);
351
352 LocalConnector& connector = socket.getConnector();
353
354 while(!connector.connect()) {
355 if (connector.failed()) {
356 printf("ClientThread::run: connector failed...\n");
357 return;
358 }
359 printf("ClientThread::run: waiting...\n");
360 waiter.wait();
361 }
362
363 printf("ClientThread::run: connected\n");
364
365 communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(),
366 waiter, seed, "ClientThread", &event);
367}
368
369//------------------------------------------------------------------------------
370//------------------------------------------------------------------------------
371
372int main()
373{
374#if TARGET_API_POSIX
375 signal(SIGPIPE, SIG_IGN);
376#endif
377
378 ServerThread serverThread;
379 ClientThread clientThread;
380
381 printf("Starting threads\n");
382 serverThread.start();
383 clientThread.start();
384
385 Thread::sleep(60000);
386 //Thread::sleep(5000);
387 printf("Signalling the server thread\n");
388 //serverThread.signal();
389 clientThread.signal();
390 printf("Signalled the server thread\n");
391
392 printf("Waiting for the client thread\n");
393 clientThread.join();
394 printf("Waiting for the server thread\n");
395 serverThread.join();
396 printf("Both threads returned\n");
397
398 return 0;
399}
400
401//------------------------------------------------------------------------------
402
403// Local Variables:
404// mode: C++
405// c-basic-offset: 4
406// indent-tabs-mode: nil
407// End:
Note: See TracBrowser for help on using the repository browser.