source: xplcommon/test/testblkstream.cc@ 32:4bef4944955f

Last change on this file since 32:4bef4944955f was 31:bbd688924703, checked in by István Váradi <ivaradi@…>, 12 years ago

Implemented the data stream

File size: 9.7 KB
Line 
1// Copyright (c) 2012 by István Váradi
2
3// This file is part of libxplcommon, a common utility library for
4// development related to X-Plane
5
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are met:
8
9// 1. Redistributions of source code must retain the above copyright notice, this
10// list of conditions and the following disclaimer.
11// 2. Redistributions in binary form must reproduce the above copyright notice,
12// this list of conditions and the following disclaimer in the documentation
13// and/or other materials provided with the distribution.
14
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26// The views and conclusions contained in the software and documentation are those
27// of the authors and should not be interpreted as representing official policies,
28// either expressed or implied, of the FreeBSD Project.
29
30//------------------------------------------------------------------------------
31
32#include <xplcommon/Thread.h>
33#include <xplcommon/Waiter.h>
34#include <xplcommon/LocalServerSocket.h>
35#include <xplcommon/LocalAcceptor.h>
36#include <xplcommon/LocalSocket.h>
37#include <xplcommon/LocalClientSocket.h>
38#include <xplcommon/LocalConnector.h>
39#include <xplcommon/DataStream.h>
40#include <xplcommon/WaitableEvent.h>
41#include <xplcommon/PseudoRandom.h>
42
43#include <cstdlib>
44#include <cstdio>
45#include <cassert>
46
47#include <signal.h>
48
49#include <sys/time.h>
50
51//------------------------------------------------------------------------------
52
53using xplcommon::Thread;
54using xplcommon::Waiter;
55using xplcommon::LocalServerSocket;
56using xplcommon::LocalAcceptor;
57using xplcommon::LocalSocket;
58using xplcommon::LocalClientSocket;
59using xplcommon::LocalConnector;
60using xplcommon::BufferedStream;
61using xplcommon::DataStream;
62using xplcommon::PseudoRandom;
63
64using std::min;
65
66//------------------------------------------------------------------------------
67
68class TestThread : public Thread
69{
70private:
71 static const uint8_t OP_ADD = 1;
72
73 static const uint8_t OP_SUB = 2;
74
75 static const uint8_t OP_MUL = 3;
76
77 static const uint8_t OP_DIV = 4;
78
79private:
80 static int32_t perform(int32_t value, uint8_t operation, int32_t x);
81
82 DataStream* dataStream;
83
84protected:
85 TestThread();
86
87public:
88 void interrupt();
89
90protected:
91 void communicate(BufferedStream& stream, unsigned seed, bool isClient);
92};
93
94//------------------------------------------------------------------------------
95
96inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x)
97{
98 switch (operation) {
99 case OP_ADD:
100 return value + x;
101 case OP_SUB:
102 return value - x;
103 case OP_MUL:
104 return value * x;
105 case OP_DIV:
106 return value / x;
107 default:
108 return value;
109 }
110}
111
112//------------------------------------------------------------------------------
113
114inline TestThread::TestThread() :
115 dataStream(0)
116{
117}
118
119//------------------------------------------------------------------------------
120
121inline void TestThread::interrupt()
122{
123 if (dataStream!=0) dataStream->interrupt();
124}
125
126//------------------------------------------------------------------------------
127
128void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient)
129{
130 static const size_t reportInterval = 100000;
131
132 const char* prefix = isClient ? "Client" : "Server";
133
134 dataStream = new DataStream(stream);
135 PseudoRandom random(seed);
136
137 bool first = true;
138
139 size_t numRequests = 0;
140 size_t nextReportRequests = reportInterval;
141
142 int32_t value = 0;
143 if (isClient) {
144 value = random.next();
145 dataStream->writeS32(value);
146 } else {
147 value = dataStream->readS32();
148 }
149
150 int32_t newValue = value;
151
152 while(*dataStream) {
153 if (!first || isClient) {
154 unsigned numOperations = random.nextUnsigned(20, 1);
155 dataStream->writeU16(numOperations);
156 //printf("%s: numOperations=%u\n", prefix, numOperations);
157 for(unsigned i = 0; i<numOperations; ++i) {
158 double r = random.nextDouble();
159 uint8_t operation =
160 (r<0.4) ? OP_ADD : ((r<0.8) ? OP_SUB :
161 ((r<0.98) ? OP_MUL : OP_DIV));
162 int32_t x = random.next();
163
164 dataStream->writeU8(operation);
165 dataStream->writeS32(x);
166
167 newValue = perform(newValue, operation, x);
168 }
169 if (!dataStream->flush()) break;
170
171 if (isClient) {
172 value = newValue;
173 ++numRequests;
174 if (numRequests>=nextReportRequests) {
175 printf("%s: after %lu requests, value=%d\n",
176 prefix, static_cast<unsigned long>(numRequests),
177 value);
178 nextReportRequests += reportInterval;
179 }
180 }
181 }
182
183 unsigned numOperations = dataStream->readU16();
184 for (unsigned i = 0; *dataStream && i<numOperations; ++i) {
185 uint8_t operation = dataStream->readU8();
186 int32_t x = dataStream->readS32();
187 newValue = perform(newValue, operation, x);
188 }
189
190 if (*dataStream) {
191 if (!isClient) {
192 value = newValue;
193 ++numRequests;
194 if (numRequests>=nextReportRequests) {
195 printf("%s: after %lu requests, value=%d\n",
196 prefix, static_cast<unsigned long>(numRequests),
197 value);
198 nextReportRequests += reportInterval;
199 }
200 }
201 }
202
203
204 first = false;
205 }
206
207 if (dataStream->failed()) {
208 printf("%s: data stream failed with error code: %lu\n", prefix,
209 static_cast<unsigned long>(dataStream->getErrorCode()));
210 } else if (dataStream->isInterrupted()) {
211 printf("%s: data stream was interrupted\n", prefix);
212 }
213
214 printf("%s: value=%d, %lu requests\n", prefix, value,
215 static_cast<unsigned long>(numRequests));
216}
217
218
219//------------------------------------------------------------------------------
220//------------------------------------------------------------------------------
221
222class ServerThread : public TestThread
223{
224public:
225 virtual void run();
226};
227
228//------------------------------------------------------------------------------
229
230void ServerThread::run()
231{
232 Waiter waiter;
233
234 unsigned seed = static_cast<unsigned>(time(0)*1.2);
235 printf("ServerThread::run: seed=%u\n", seed);
236
237 LocalServerSocket serverSocket("test", &waiter);
238 LocalAcceptor& acceptor = serverSocket.getAcceptor();
239
240 while(!acceptor.accept()) {
241 if (acceptor.failed()) {
242 printf("ServerThread::run: acceptor failed...: %lu\n",
243 (unsigned long)acceptor.getErrorCode());
244 return;
245 }
246 printf("ServerThread::run: waiting...\n");
247 waiter.wait();
248 }
249
250 printf("ServerThread::run: waiting done, received connection\n");
251 LocalSocket* socket = acceptor.getSocket();
252
253 acceptor.accept();
254
255 communicate(*socket, seed, false);
256
257 delete socket;
258}
259
260//------------------------------------------------------------------------------
261//------------------------------------------------------------------------------
262
263class ClientThread : public TestThread
264{
265public:
266 virtual void run();
267};
268
269//------------------------------------------------------------------------------
270
271void ClientThread::run()
272{
273 Waiter waiter;
274
275 unsigned seed = static_cast<unsigned>(time(0)*1.8);
276 printf("ClientThread::run: seed=%u\n", seed);
277
278 printf("ClientThread::run: sleeping\n");
279 sleep(500);
280 printf("ClientThread::run: connecting\n");
281
282 LocalClientSocket socket("test", &waiter);
283
284 LocalConnector& connector = socket.getConnector();
285
286 while(!connector.connect()) {
287 if (connector.failed()) {
288 printf("ClientThread::run: connector failed...\n");
289 return;
290 }
291 printf("ClientThread::run: waiting...\n");
292 waiter.wait();
293 }
294
295 printf("ClientThread::run: connected\n");
296
297 communicate(socket, seed, true);
298}
299
300//------------------------------------------------------------------------------
301//------------------------------------------------------------------------------
302
303int main()
304{
305#if TARGET_API_POSIX
306 signal(SIGPIPE, SIG_IGN);
307#endif
308
309 ServerThread serverThread;
310 ClientThread clientThread;
311
312 printf("Starting threads\n");
313 serverThread.start();
314 clientThread.start();
315
316 Thread::sleep(60000);
317 //Thread::sleep(1000);
318 printf("Signalling the client thread\n");
319 //serverThread.interrupt();
320 clientThread.interrupt();
321 printf("Signalled the client thread\n");
322
323 printf("Waiting for the client thread\n");
324 clientThread.join();
325 printf("Waiting for the server thread\n");
326 serverThread.join();
327 printf("Both threads returned\n");
328
329 return 0;
330}
331
332//------------------------------------------------------------------------------
333
334// Local Variables:
335// mode: C++
336// c-basic-offset: 4
337// indent-tabs-mode: nil
338// End:
Note: See TracBrowser for help on using the repository browser.