source: vscpl/test/testblkstream.cc@ 32:fd970caf83eb

Last change on this file since 32:fd970caf83eb was 10:1ee8d0a23099, checked in by István Váradi <ivaradi@…>, 12 years ago

Imported the code of the blocking and data streams

File size: 10.0 KB
Line 
1// Copyright (c) 2012 by István Váradi
2
3// This file is part of libxplcommon, a common utility library for
4// development related to X-Plane
5
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are met:
8
9// 1. Redistributions of source code must retain the above copyright notice, this
10// list of conditions and the following disclaimer.
11// 2. Redistributions in binary form must reproduce the above copyright notice,
12// this list of conditions and the following disclaimer in the documentation
13// and/or other materials provided with the distribution.
14
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26// The views and conclusions contained in the software and documentation are those
27// of the authors and should not be interpreted as representing official policies,
28// either expressed or implied, of the FreeBSD Project.
29
30//------------------------------------------------------------------------------
31
32#include <hu/varadiistvan/scpl/io/Waiter.h>
33#include <hu/varadiistvan/scpl/io/LocalServerSocket.h>
34#include <hu/varadiistvan/scpl/io/LocalAcceptor.h>
35#include <hu/varadiistvan/scpl/io/LocalSocket.h>
36#include <hu/varadiistvan/scpl/io/LocalClientSocket.h>
37#include <hu/varadiistvan/scpl/io/LocalConnector.h>
38#include <hu/varadiistvan/scpl/io/DataStream.h>
39#include <hu/varadiistvan/scpl/io/WaitableEvent.h>
40
41#include <hu/varadiistvan/scpl/Thread.h>
42#include <hu/varadiistvan/scpl/PseudoRandom.h>
43
44#include <cstdlib>
45#include <cstdio>
46#include <cassert>
47
48#include <signal.h>
49
50#include <sys/time.h>
51
52//------------------------------------------------------------------------------
53
54using hu::varadiistvan::scpl::io::Waiter;
55using hu::varadiistvan::scpl::io::LocalServerSocket;
56using hu::varadiistvan::scpl::io::LocalAcceptor;
57using hu::varadiistvan::scpl::io::LocalSocket;
58using hu::varadiistvan::scpl::io::LocalClientSocket;
59using hu::varadiistvan::scpl::io::LocalConnector;
60using hu::varadiistvan::scpl::io::BufferedStream;
61using hu::varadiistvan::scpl::io::DataStream;
62
63using hu::varadiistvan::scpl::Thread;
64using hu::varadiistvan::scpl::PseudoRandom;
65
66using std::min;
67
68//------------------------------------------------------------------------------
69
70class TestThread : public Thread
71{
72private:
73 static const uint8_t OP_ADD = 1;
74
75 static const uint8_t OP_SUB = 2;
76
77 static const uint8_t OP_MUL = 3;
78
79 static const uint8_t OP_DIV = 4;
80
81private:
82 static int32_t perform(int32_t value, uint8_t operation, int32_t x);
83
84 DataStream* dataStream;
85
86protected:
87 TestThread();
88
89public:
90 void interrupt();
91
92protected:
93 void communicate(BufferedStream& stream, unsigned seed, bool isClient);
94};
95
96//------------------------------------------------------------------------------
97
98inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x)
99{
100 switch (operation) {
101 case OP_ADD:
102 return value + x;
103 case OP_SUB:
104 return value - x;
105 case OP_MUL:
106 return value * x;
107 case OP_DIV:
108 return value / x;
109 default:
110 return value;
111 }
112}
113
114//------------------------------------------------------------------------------
115
116inline TestThread::TestThread() :
117 dataStream(0)
118{
119}
120
121//------------------------------------------------------------------------------
122
123inline void TestThread::interrupt()
124{
125 if (dataStream!=0) dataStream->interrupt();
126}
127
128//------------------------------------------------------------------------------
129
130void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient)
131{
132 static const size_t reportInterval = 100000;
133
134 const char* prefix = isClient ? "Client" : "Server";
135
136 dataStream = new DataStream(stream);
137 PseudoRandom random(seed);
138
139 bool first = true;
140
141 size_t numRequests = 0;
142 size_t nextReportRequests = reportInterval;
143
144 int32_t value = 0;
145 if (isClient) {
146 value = random.next();
147 dataStream->writeS32(value);
148 } else {
149 value = dataStream->readS32();
150 }
151
152 int32_t newValue = value;
153
154 while(*dataStream) {
155 if (!first || isClient) {
156 unsigned numOperations = random.nextUnsigned(20, 1);
157 dataStream->writeU16(numOperations);
158 //printf("%s: numOperations=%u\n", prefix, numOperations);
159 for(unsigned i = 0; i<numOperations; ++i) {
160 double r = random.nextDouble();
161 uint8_t operation =
162 (r<0.4) ? OP_ADD : ((r<0.8) ? OP_SUB :
163 ((r<0.98) ? OP_MUL : OP_DIV));
164 int32_t x = random.next();
165
166 dataStream->writeU8(operation);
167 dataStream->writeS32(x);
168
169 newValue = perform(newValue, operation, x);
170 }
171 if (!dataStream->flush()) break;
172
173 if (isClient) {
174 value = newValue;
175 ++numRequests;
176 if (numRequests>=nextReportRequests) {
177 printf("%s: after %lu requests, value=%d\n",
178 prefix, static_cast<unsigned long>(numRequests),
179 value);
180 nextReportRequests += reportInterval;
181 }
182 }
183 }
184
185 unsigned numOperations = dataStream->readU16();
186 for (unsigned i = 0; *dataStream && i<numOperations; ++i) {
187 uint8_t operation = dataStream->readU8();
188 int32_t x = dataStream->readS32();
189 newValue = perform(newValue, operation, x);
190 }
191
192 if (*dataStream) {
193 if (!isClient) {
194 value = newValue;
195 ++numRequests;
196 if (numRequests>=nextReportRequests) {
197 printf("%s: after %lu requests, value=%d\n",
198 prefix, static_cast<unsigned long>(numRequests),
199 value);
200 nextReportRequests += reportInterval;
201 }
202 }
203 }
204
205
206 first = false;
207 }
208
209 if (dataStream->failed()) {
210 printf("%s: data stream failed with error code: %lu\n", prefix,
211 static_cast<unsigned long>(dataStream->getErrorCode()));
212 } else if (dataStream->isInterrupted()) {
213 printf("%s: data stream was interrupted\n", prefix);
214 }
215
216 printf("%s: value=%d, %lu requests\n", prefix, value,
217 static_cast<unsigned long>(numRequests));
218}
219
220
221//------------------------------------------------------------------------------
222//------------------------------------------------------------------------------
223
224class ServerThread : public TestThread
225{
226public:
227 virtual void run();
228};
229
230//------------------------------------------------------------------------------
231
232void ServerThread::run()
233{
234 Waiter waiter;
235
236 unsigned seed = static_cast<unsigned>(time(0)*1.2);
237 printf("ServerThread::run: seed=%u\n", seed);
238
239 LocalServerSocket serverSocket("test", &waiter);
240 LocalAcceptor& acceptor = serverSocket.getAcceptor();
241
242 while(!acceptor.accept()) {
243 if (acceptor.failed()) {
244 printf("ServerThread::run: acceptor failed...: %lu\n",
245 (unsigned long)acceptor.getErrorCode());
246 return;
247 }
248 printf("ServerThread::run: waiting...\n");
249 waiter.wait();
250 }
251
252 printf("ServerThread::run: waiting done, received connection\n");
253 LocalSocket* socket = acceptor.getSocket();
254
255 acceptor.accept();
256
257 communicate(*socket, seed, false);
258
259 delete socket;
260}
261
262//------------------------------------------------------------------------------
263//------------------------------------------------------------------------------
264
265class ClientThread : public TestThread
266{
267public:
268 virtual void run();
269};
270
271//------------------------------------------------------------------------------
272
273void ClientThread::run()
274{
275 Waiter waiter;
276
277 unsigned seed = static_cast<unsigned>(time(0)*1.8);
278 printf("ClientThread::run: seed=%u\n", seed);
279
280 printf("ClientThread::run: sleeping\n");
281 sleep(500);
282 printf("ClientThread::run: connecting\n");
283
284 LocalClientSocket socket("test", &waiter);
285
286 LocalConnector& connector = socket.getConnector();
287
288 while(!connector.connect()) {
289 if (connector.failed()) {
290 printf("ClientThread::run: connector failed...\n");
291 return;
292 }
293 printf("ClientThread::run: waiting...\n");
294 waiter.wait();
295 }
296
297 printf("ClientThread::run: connected\n");
298
299 communicate(socket, seed, true);
300}
301
302//------------------------------------------------------------------------------
303//------------------------------------------------------------------------------
304
305int main()
306{
307#if TARGET_API_POSIX
308 signal(SIGPIPE, SIG_IGN);
309#endif
310
311 ServerThread serverThread;
312 ClientThread clientThread;
313
314 printf("Starting threads\n");
315 serverThread.start();
316 clientThread.start();
317
318
319 Thread::sleep(60000);
320 //Thread::sleep(1000);
321 printf("Signalling the client thread\n");
322 //serverThread.interrupt();
323 clientThread.interrupt();
324 printf("Signalled the client thread\n");
325
326 printf("Waiting for the client thread\n");
327 clientThread.join();
328 printf("Waiting for the server thread\n");
329 serverThread.join();
330 printf("Both threads returned\n");
331
332 return 0;
333}
334
335//------------------------------------------------------------------------------
336
337// Local Variables:
338// mode: C++
339// c-basic-offset: 4
340// indent-tabs-mode: nil
341// End:
Note: See TracBrowser for help on using the repository browser.