source: xplcommon/test/testlocsock.cc@ 26:46f4e06241c7

Last change on this file since 26:46f4e06241c7 was 21:eb59943050c9, checked in by István Váradi <ivaradi@…>, 12 years ago

Added the implementation of the local sockets for Win32 and it seems to work

File size: 13.4 KB
Line 
1// Copyright (c) 2012 by István Váradi
2
3// This file is part of libxplcommon, a common utility library for
4// development related to X-Plane
5
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are met:
8
9// 1. Redistributions of source code must retain the above copyright notice, this
10// list of conditions and the following disclaimer.
11// 2. Redistributions in binary form must reproduce the above copyright notice,
12// this list of conditions and the following disclaimer in the documentation
13// and/or other materials provided with the distribution.
14
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26// The views and conclusions contained in the software and documentation are those
27// of the authors and should not be interpreted as representing official policies,
28// either expressed or implied, of the FreeBSD Project.
29
30//------------------------------------------------------------------------------
31
32#include <xplcommon/Thread.h>
33#include <xplcommon/Waiter.h>
34#include <xplcommon/LocalServerSocket.h>
35#include <xplcommon/LocalAcceptor.h>
36#include <xplcommon/LocalSocket.h>
37#include <xplcommon/LocalClientSocket.h>
38#include <xplcommon/LocalConnector.h>
39#include <xplcommon/ReadingBuffer.h>
40#include <xplcommon/WritingBuffer.h>
41#include <xplcommon/WaitableEvent.h>
42
43#include <cstdlib>
44#include <cstdio>
45#include <cassert>
46
47#include <signal.h>
48
49#include <sys/time.h>
50
51//------------------------------------------------------------------------------
52
53using xplcommon::Thread;
54using xplcommon::Waiter;
55using xplcommon::LocalServerSocket;
56using xplcommon::LocalAcceptor;
57using xplcommon::LocalSocket;
58using xplcommon::LocalClientSocket;
59using xplcommon::LocalConnector;
60using xplcommon::ReadingBuffer;
61using xplcommon::WritingBuffer;
62using xplcommon::WaitableEvent;
63
64using std::min;
65
66//------------------------------------------------------------------------------
67
68void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer,
69 Waiter& waiter, unsigned seed, const char* prefix,
70 WaitableEvent* quitEvent = 0)
71{
72 static const size_t minToWrite = 2076;
73 static const size_t maxToWrite = 1345670;
74 static const size_t rangeToWrite = maxToWrite - minToWrite;
75
76 static const size_t reportingInterval = 10000;
77
78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.99);
79 static const int sleepRemainder = RAND_MAX - sleepThreshold;
80 static const unsigned minSleep = 10;
81 static const unsigned maxSleep = 250;
82 static const unsigned sleepRange = maxSleep - minSleep;
83
84 size_t numBytesRead = 0;
85 size_t nextReportRead = reportingInterval;
86 size_t numWriteBlocked = 0;
87 size_t numBytesWritten = 0;
88 size_t nextReportWritten = reportingInterval;
89 size_t numReadBlocked = 0;
90 size_t numMillisSlept = 0;
91
92 bool toQuit = false;
93
94 while(!toQuit) {
95#if TARGET_API_POSIX
96 int r = rand_r(&seed);
97#endif
98#if TARGET_API_WIN32
99 int r = rand();
100#endif
101 size_t toWrite = minToWrite +
102 static_cast<size_t>( static_cast<double>(r) *
103 rangeToWrite / RAND_MAX );
104
105 size_t lastWriteLength = 0;
106 if (writingBuffer!=0) {
107 lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
108 writingBuffer->reset();
109 writingBuffer->addLength(lastWriteLength);
110 }
111
112 while(toWrite>0) {
113 if (quitEvent!=0 && quitEvent->check()) {
114 toQuit = true;
115 break;
116 }
117
118 bool hasPending = false;
119
120 if (writingBuffer!=0) {
121 size_t loopCount = 0;
122 while(toWrite>0) {
123 ++loopCount;
124 if (writingBuffer->write()) {
125 numBytesWritten += lastWriteLength;
126 toWrite -= lastWriteLength;
127
128 if (numBytesWritten>=nextReportWritten) {
129 printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n",
130 prefix,
131 static_cast<unsigned long>(numBytesWritten),
132 static_cast<unsigned long>(numWriteBlocked),
133 static_cast<unsigned long>(numMillisSlept));
134 nextReportWritten += reportingInterval;
135 }
136
137 if (toWrite>0) {
138 if (quitEvent!=0 && quitEvent->check()) {
139 toQuit = true;
140 break;
141 } else {
142 lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
143 writingBuffer->addLength(lastWriteLength);
144 if (loopCount>10) {
145 break;
146 }
147 }
148 }
149 } else if (writingBuffer->failed()) {
150 printf("%s: writing failed with error: %lu\n",
151 prefix,
152 static_cast<unsigned long>(writingBuffer->getErrorCode()));
153 toQuit = true;
154 break;
155 } else {
156 ++numWriteBlocked;
157 hasPending = true;
158 break;
159 }
160 }
161 if (toQuit) break;
162 }
163
164 if (readingBuffer!=0) {
165 size_t loopCount = 0;
166 while(true) {
167 ++loopCount;
168 if (readingBuffer->read()) {
169 size_t l = readingBuffer->getLength();
170 if (l==0) {
171 printf("%s: reading encountered end of file\n", prefix);
172 toQuit = true;
173 break;
174 }
175 numBytesRead += l;
176 if (numBytesRead>=nextReportRead) {
177 printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n",
178 prefix,
179 static_cast<unsigned long>(numBytesRead),
180 static_cast<unsigned long>(numReadBlocked),
181 static_cast<unsigned long>(numMillisSlept));
182 nextReportRead += reportingInterval;
183 }
184 readingBuffer->reset();
185 if (quitEvent!=0 && quitEvent->check()) {
186 toQuit = true;
187 break;
188 } else if (loopCount>10) {
189 break;
190 }
191 } else if (readingBuffer->failed()) {
192 printf("%s: reading failed with error: %lu\n",
193 prefix,
194 static_cast<unsigned long>(readingBuffer->getErrorCode()));
195 toQuit = true;
196 break;
197 } else {
198 ++numReadBlocked;
199 hasPending = true;
200 break;
201 }
202 }
203 if (toQuit) break;
204 }
205
206#if TARGET_API_POSIX
207 r = rand_r(&seed);
208#endif
209#if TARGET_API_WIN32
210 r = rand();
211#endif
212 if (r>=sleepThreshold) {
213 unsigned toSleep = minSleep +
214 static_cast<unsigned>(static_cast<double>(r - sleepThreshold) *
215 sleepRange / sleepRemainder);
216 assert(toSleep>=minSleep);
217 assert(toSleep<maxSleep);
218 Thread::sleep(toSleep);
219 numMillisSlept += toSleep;
220 }
221
222 if (toWrite>0 && hasPending) {
223 waiter.wait();
224 if (waiter.failed()) {
225 printf("%s: waiting failed with error: %lu\n",
226 prefix,
227 static_cast<unsigned long>(waiter.getErrorCode()));
228 toQuit = true;
229 break;
230 }
231 }
232 }
233 }
234
235 printf("%s: written %lu bytes with %lu blocking\n",
236 prefix,
237 static_cast<unsigned long>(numBytesWritten),
238 static_cast<unsigned long>(numWriteBlocked));
239 printf("%s: read %lu bytes with %lu blocking\n",
240 prefix,
241 static_cast<unsigned long>(numBytesRead),
242 static_cast<unsigned long>(numReadBlocked));
243 printf("%s: slept %lu ms\n", prefix,
244 static_cast<unsigned long>(numMillisSlept));
245}
246
247//------------------------------------------------------------------------------
248
249class ServerThread : public Thread
250{
251private:
252 Waiter waiter;
253
254 WaitableEvent event;
255
256public:
257 ServerThread();
258
259 void signal();
260
261 virtual void run();
262};
263
264//------------------------------------------------------------------------------
265
266ServerThread::ServerThread() :
267 event(&waiter)
268{
269}
270
271//------------------------------------------------------------------------------
272
273void ServerThread::signal()
274{
275 event.fire();
276}
277
278//------------------------------------------------------------------------------
279
280void ServerThread::run()
281{
282 unsigned seed = static_cast<unsigned>(time(0)*1.2);
283 printf("ServerThread::run: seed=%u\n", seed);
284
285 LocalServerSocket serverSocket("test", &waiter);
286 LocalAcceptor& acceptor = serverSocket.getAcceptor();
287
288 bool eventFired = false;
289 while(!acceptor.accept()) {
290 eventFired = event.check();
291 if (eventFired) break;
292 if (acceptor.failed()) {
293 printf("ServerThread::run: acceptor failed...: %lu\n",
294 (unsigned long)acceptor.getErrorCode());
295 return;
296 }
297 printf("ServerThread::run: waiting...\n");
298 waiter.wait();
299 }
300
301 if (eventFired) {
302 printf("ServerThread::run: waiting done, quitting\n");
303 return;
304 }
305
306 printf("ServerThread::run: waiting done, received connection\n");
307 LocalSocket* socket = acceptor.getSocket();
308
309 communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(),
310 waiter, seed, "ServerThread", &event);
311
312
313 delete socket;
314}
315
316//------------------------------------------------------------------------------
317//------------------------------------------------------------------------------
318
319class ClientThread : public Thread
320{
321private:
322 Waiter waiter;
323
324 WaitableEvent event;
325
326public:
327 ClientThread();
328
329 void signal();
330
331 virtual void run();
332};
333
334//------------------------------------------------------------------------------
335
336ClientThread::ClientThread() :
337 event(&waiter)
338{
339}
340
341//------------------------------------------------------------------------------
342
343void ClientThread::signal()
344{
345 event.fire();
346}
347
348//------------------------------------------------------------------------------
349
350void ClientThread::run()
351{
352 unsigned seed = static_cast<unsigned>(time(0)*1.8);
353 printf("ClientThread::run: seed=%u\n", seed);
354
355 printf("ClientThread::run: sleeping\n");
356 sleep(500);
357 printf("ClientThread::run: connecting\n");
358
359 LocalClientSocket socket("test", &waiter);
360
361 LocalConnector& connector = socket.getConnector();
362
363 while(!connector.connect()) {
364 if (connector.failed()) {
365 printf("ClientThread::run: connector failed...\n");
366 return;
367 }
368 printf("ClientThread::run: waiting...\n");
369 waiter.wait();
370 }
371
372 printf("ClientThread::run: connected\n");
373
374 communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(),
375 waiter, seed, "ClientThread", &event);
376}
377
378//------------------------------------------------------------------------------
379//------------------------------------------------------------------------------
380
381int main()
382{
383#if TARGET_API_POSIX
384 signal(SIGPIPE, SIG_IGN);
385#endif
386
387 ServerThread serverThread;
388 ClientThread clientThread;
389
390 printf("Starting threads\n");
391 serverThread.start();
392 clientThread.start();
393
394 Thread::sleep(60000);
395 //Thread::sleep(5000);
396 printf("Signalling the server thread\n");
397 //serverThread.signal();
398 clientThread.signal();
399 printf("Signalled the server thread\n");
400
401 printf("Waiting for the client thread\n");
402 clientThread.join();
403 printf("Waiting for the server thread\n");
404 serverThread.join();
405 printf("Both threads returned\n");
406
407 return 0;
408}
409
410//------------------------------------------------------------------------------
411
412// Local Variables:
413// mode: C++
414// c-basic-offset: 4
415// indent-tabs-mode: nil
416// End:
Note: See TracBrowser for help on using the repository browser.