source: xplcommon/test/testlocsock.cc@ 19:54c3f68d2d46

Last change on this file since 19:54c3f68d2d46 was 19:54c3f68d2d46, checked in by István Váradi <ivaradi@…>, 12 years ago

Fixed problems with non-blocking mode and extended the socket test program to actually communicate

File size: 9.9 KB
RevLine 
[18]1// Copyright (c) 2012 by István Váradi
2
3// This file is part of libxplcommon, a common utility library for
4// development related to X-Plane
5
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are met:
8
9// 1. Redistributions of source code must retain the above copyright notice, this
10// list of conditions and the following disclaimer.
11// 2. Redistributions in binary form must reproduce the above copyright notice,
12// this list of conditions and the following disclaimer in the documentation
13// and/or other materials provided with the distribution.
14
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26// The views and conclusions contained in the software and documentation are those
27// of the authors and should not be interpreted as representing official policies,
28// either expressed or implied, of the FreeBSD Project.
29
30//------------------------------------------------------------------------------
31
32#include <xplcommon/Thread.h>
33#include <xplcommon/Waiter.h>
34#include <xplcommon/LocalServerSocket.h>
35#include <xplcommon/LocalAcceptor.h>
36#include <xplcommon/LocalSocket.h>
37#include <xplcommon/LocalClientSocket.h>
38#include <xplcommon/Connector.h>
39#include <xplcommon/ReadingBuffer.h>
40#include <xplcommon/WritingBuffer.h>
[19]41#include <xplcommon/WaitableEvent.h>
[18]42
[19]43#include <cstdlib>
[18]44#include <cstdio>
[19]45#include <cassert>
46
47#include <signal.h>
48
49#include <sys/time.h>
[18]50
51//------------------------------------------------------------------------------
52
53using xplcommon::Thread;
54using xplcommon::Waiter;
55using xplcommon::LocalServerSocket;
56using xplcommon::LocalAcceptor;
57using xplcommon::LocalSocket;
58using xplcommon::LocalClientSocket;
59using xplcommon::Connector;
[19]60using xplcommon::ReadingBuffer;
61using xplcommon::WritingBuffer;
62using xplcommon::WaitableEvent;
63
64using std::min;
65
66//------------------------------------------------------------------------------
67
68void communicate(ReadingBuffer& readingBuffer, WritingBuffer& writingBuffer,
69 Waiter& waiter, unsigned seed, const char* prefix,
70 WaitableEvent* quitEvent = 0)
71{
72 static const size_t minToWrite = 2076;
73 static const size_t maxToWrite = 1345670;
74 static const size_t rangeToWrite = maxToWrite - minToWrite;
75
76 static const size_t reportingInterval = 100000;
77
78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.95);
79 static const int sleepRemainder = RAND_MAX - sleepThreshold;
80 static const unsigned minSleep = 10;
81 static const unsigned maxSleep = 250;
82 static const unsigned sleepRange = maxSleep - minSleep;
83
84 size_t numBytesRead = 0;
85 size_t nextReportRead = reportingInterval;
86 size_t numWriteBlocked = 0;
87 size_t numBytesWritten = 0;
88 size_t nextReportWritten = reportingInterval;
89 size_t numReadBlocked = 0;
90 size_t numMillisSlept = 0;
91
92 bool toQuit = false;
93
94 while(!toQuit) {
95 int r = rand_r(&seed);
96 size_t toWrite = minToWrite +
97 static_cast<size_t>( static_cast<double>(r) *
98 rangeToWrite / RAND_MAX );
99
100 while(toWrite>0) {
101 writingBuffer.reset();
102 size_t length = min(writingBuffer.getCapacity(), toWrite);
103 writingBuffer.addLength(length);
104
105 if (quitEvent!=0 && quitEvent->check()) {
106 toQuit = true;
107 break;
108 }
109
110 if (writingBuffer.write()) {
111 numBytesWritten += length;
112 toWrite -= length;
113 length = 0;
114
115 if (numBytesWritten>=nextReportWritten) {
116 printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n",
117 prefix, numBytesWritten, numWriteBlocked,
118 numMillisSlept);
119 nextReportWritten += reportingInterval;
120 }
121 } else if (writingBuffer.failed()) {
122 printf("%s: writing failed with error: %lu\n",
123 prefix,
124 static_cast<unsigned long>(writingBuffer.getErrorCode()));
125 toQuit = true;
126 break;
127 } else {
128 ++numWriteBlocked;
129 }
130
131 if (readingBuffer.read()) {
132 numBytesRead += readingBuffer.getLength();
133 if (numBytesRead>=nextReportRead) {
134 printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n",
135 prefix, numBytesRead, numReadBlocked, numMillisSlept);
136 nextReportRead += reportingInterval;
137 }
138 readingBuffer.reset();
139 } else if (readingBuffer.failed()) {
140 printf("%s: reading failed with error: %lu\n",
141 prefix,
142 static_cast<unsigned long>(readingBuffer.getErrorCode()));
143 toQuit = true;
144 break;
145 } else {
146 ++numReadBlocked;
147 }
148
149 r = rand_r(&seed);
150 if (r>=sleepThreshold) {
151 unsigned toSleep = minSleep +
152 static_cast<unsigned>(static_cast<double>(r - sleepThreshold) *
153 sleepRange / sleepRemainder);
154 assert(toSleep>=minSleep);
155 assert(toSleep<maxSleep);
156 Thread::sleep(toSleep);
157 numMillisSlept += toSleep;
158 }
159
160 if (length!=0) {
161 waiter.wait();
162 if (waiter.failed()) {
163 printf("%s: waiting failed with error: %lu\n",
164 prefix,
165 static_cast<unsigned long>(waiter.getErrorCode()));
166 toQuit = true;
167 break;
168 }
169 }
170 }
171 }
172
173 printf("%s: written %zu bytes with %zu blocking\n",
174 prefix, numBytesWritten, numWriteBlocked);
175 printf("%s: read %zu bytes with %zu blocking\n",
176 prefix, numBytesRead, numReadBlocked);
177 printf("%s: slept %zu ms\n", prefix, numMillisSlept);
178}
[18]179
180//------------------------------------------------------------------------------
181
182class ServerThread : public Thread
183{
[19]184private:
185 Waiter waiter;
186
187 WaitableEvent event;
188
189public:
190 ServerThread();
191
192 void signal();
193
[18]194 virtual void run();
195};
196
197//------------------------------------------------------------------------------
198
[19]199ServerThread::ServerThread() :
200 event(&waiter)
201{
202}
203
204//------------------------------------------------------------------------------
205
206void ServerThread::signal()
207{
208 event.fire();
209}
210
211//------------------------------------------------------------------------------
212
[18]213void ServerThread::run()
214{
[19]215 unsigned seed = static_cast<unsigned>(time(0)*1.2);
216 printf("ServerThread::run: seed=%u\n", seed);
[18]217
218 LocalServerSocket serverSocket("test", &waiter);
219 LocalAcceptor& acceptor = serverSocket.getAcceptor();
220
[19]221 while(!acceptor.accept()) {
222 if (acceptor.failed()) {
223 printf("ServerThread::run: acceptor failed...\n");
224 return;
[18]225 }
[19]226 printf("ServerThread::run: waiting...\n");
227 waiter.wait();
228 }
[18]229
[19]230 printf("ServerThread::run: waiting done, received connection\n");
231 LocalSocket* socket = acceptor.getSocket();
232
233 communicate(socket->getReadingBuffer(), socket->getWritingBuffer(),
234 waiter, seed, "ServerThread", &event);
235
236
237 delete socket;
[18]238}
239
240//------------------------------------------------------------------------------
241//------------------------------------------------------------------------------
242
243class ClientThread : public Thread
244{
245public:
246 virtual void run();
247};
248
249//------------------------------------------------------------------------------
250
251void ClientThread::run()
252{
[19]253 unsigned seed = static_cast<unsigned>(time(0)*1.8);
254 printf("ClientThread::run: seed=%u\n", seed);
255
[18]256 printf("ClientThread::run: sleeping\n");
[19]257 sleep(500);
[18]258 printf("ClientThread::run: connecting\n");
259
260 Waiter waiter;
261
262 LocalClientSocket socket("test", &waiter);
263
264 Connector& connector = socket.getConnector();
265
266 while(!connector.connect()) {
267 if (connector.failed()) {
268 printf("ClientThread::run: connector failed...\n");
269 return;
270 }
271 printf("ClientThread::run: waiting...\n");
272 waiter.wait();
273 }
274
275 printf("ClientThread::run: connected\n");
[19]276
277 communicate(socket.getReadingBuffer(), socket.getWritingBuffer(),
278 waiter, seed, "ClientThread");
[18]279}
280
281//------------------------------------------------------------------------------
282//------------------------------------------------------------------------------
283
284int main()
285{
[19]286 signal(SIGPIPE, SIG_IGN);
287
[18]288 ServerThread serverThread;
289 ClientThread clientThread;
290
291 printf("Starting threads\n");
292 serverThread.start();
293 clientThread.start();
294
[19]295 Thread::sleep(60000);
296 serverThread.signal();
297 printf("Signalled the server thread\n");
298
[18]299 printf("Waiting for the client thread\n");
300 clientThread.join();
301 printf("Waiting for the server thread\n");
302 serverThread.join();
303 printf("Both threads returned\n");
304
305 return 0;
306}
307
308//------------------------------------------------------------------------------
309
310// Local Variables:
311// mode: C++
312// c-basic-offset: 4
313// indent-tabs-mode: nil
314// End:
Note: See TracBrowser for help on using the repository browser.