source: xplcommon/test/testblkstream.cc@ 29:54c2d451f8a0

Last change on this file since 29:54c2d451f8a0 was 29:54c2d451f8a0, checked in by István Váradi <ivaradi@…>, 12 years ago

Implemented a blocking stream and a corresponding trst program

File size: 8.9 KB
RevLine 
[29]1// Copyright (c) 2012 by István Váradi
2
3// This file is part of libxplcommon, a common utility library for
4// development related to X-Plane
5
6// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are met:
8
9// 1. Redistributions of source code must retain the above copyright notice, this
10// list of conditions and the following disclaimer.
11// 2. Redistributions in binary form must reproduce the above copyright notice,
12// this list of conditions and the following disclaimer in the documentation
13// and/or other materials provided with the distribution.
14
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26// The views and conclusions contained in the software and documentation are those
27// of the authors and should not be interpreted as representing official policies,
28// either expressed or implied, of the FreeBSD Project.
29
30//------------------------------------------------------------------------------
31
32#include <xplcommon/Thread.h>
33#include <xplcommon/Waiter.h>
34#include <xplcommon/LocalServerSocket.h>
35#include <xplcommon/LocalAcceptor.h>
36#include <xplcommon/LocalSocket.h>
37#include <xplcommon/LocalClientSocket.h>
38#include <xplcommon/LocalConnector.h>
39#include <xplcommon/BlockingStream.h>
40#include <xplcommon/WaitableEvent.h>
41#include <xplcommon/PseudoRandom.h>
42
43#include <cstdlib>
44#include <cstdio>
45#include <cassert>
46
47#include <signal.h>
48
49#include <sys/time.h>
50
51//------------------------------------------------------------------------------
52
53using xplcommon::Thread;
54using xplcommon::Waiter;
55using xplcommon::LocalServerSocket;
56using xplcommon::LocalAcceptor;
57using xplcommon::LocalSocket;
58using xplcommon::LocalClientSocket;
59using xplcommon::LocalConnector;
60using xplcommon::BufferedStream;
61using xplcommon::BlockingStream;
62using xplcommon::PseudoRandom;
63
64using std::min;
65
66//------------------------------------------------------------------------------
67
68class TestThread : public Thread
69{
70private:
71 BlockingStream* dataStream;
72
73protected:
74 TestThread();
75
76public:
77 void interrupt();
78
79protected:
80 void communicate(BufferedStream& stream, unsigned seed, bool isClient);
81};
82
83//------------------------------------------------------------------------------
84
85inline TestThread::TestThread() :
86 dataStream(0)
87{
88}
89
90//------------------------------------------------------------------------------
91
92inline void TestThread::interrupt()
93{
94 if (dataStream!=0) dataStream->interrupt();
95}
96
97//------------------------------------------------------------------------------
98
99void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient)
100{
101 static const size_t bufferSize = 1024;
102
103 static const size_t minToWrite = 16;
104 static const size_t maxToWrite = 6124;
105
106 static const size_t reportingInterval = 100000000;
107
108 const char* prefix = isClient ? "Client" : "Server";
109
110 dataStream = new BlockingStream(stream);
111 PseudoRandom random(seed);
112
113
114 size_t numBytesRead = 0;
115 size_t nextReportRead = reportingInterval;
116 size_t numBytesWritten = 0;
117 size_t nextReportWritten = reportingInterval;
118
119 bool first = true;
120
121 unsigned char* buffer = new unsigned char[bufferSize];
122
123 while(true) {
124 if (isClient || !first) {
125 unsigned dataSize = random.nextUnsigned(maxToWrite, minToWrite);
126 *reinterpret_cast<uint32_t*>(buffer) = dataSize;
127 while(dataSize>0) {
128 unsigned toWrite = min(static_cast<size_t>(dataSize),
129 bufferSize);
130 if (!dataStream->write(buffer, toWrite)) break;
131 dataSize -= toWrite;
132 numBytesWritten += toWrite;
133 if (numBytesWritten>=nextReportWritten) {
134 printf("%s: written %lu bytes\n", prefix,
135 static_cast<unsigned long>(numBytesWritten));
136 nextReportWritten += reportingInterval;
137 }
138 }
139 if (!dataStream->flush()) break;
140 }
141
142 uint32_t dataSize = 0;
143 if (!dataStream->read(&dataSize, sizeof(dataSize))) break;
144 assert(dataSize>=minToWrite);
145 assert(dataSize<=maxToWrite);
146 dataSize -= sizeof(dataSize);
147 numBytesRead += 4;
148 while(dataSize>0) {
149 unsigned toRead = min(static_cast<size_t>(dataSize),
150 bufferSize);
151 if (!dataStream->read(buffer, toRead)) break;
152 dataSize -= toRead;
153 numBytesRead += toRead;
154 if (numBytesRead>=nextReportRead) {
155 printf("%s: read %lu bytes\n", prefix,
156 static_cast<unsigned long>(numBytesRead));
157 nextReportRead += reportingInterval;
158 }
159 }
160
161 first = false;
162 }
163
164 delete [] buffer;
165
166 if (dataStream->failed()) {
167 printf("%s: data stream failed with error code: %lu\n", prefix,
168 static_cast<unsigned long>(dataStream->getErrorCode()));
169 } else if (dataStream->isInterrupted()) {
170 printf("%s: data stream was interrupted\n", prefix);
171 }
172
173 printf("%s: written %lu bytes and read %lu bytes\n", prefix,
174 static_cast<unsigned long>(numBytesWritten),
175 static_cast<unsigned long>(numBytesRead));
176}
177
178
179//------------------------------------------------------------------------------
180//------------------------------------------------------------------------------
181
182class ServerThread : public TestThread
183{
184public:
185 virtual void run();
186};
187
188//------------------------------------------------------------------------------
189
190void ServerThread::run()
191{
192 Waiter waiter;
193
194 unsigned seed = static_cast<unsigned>(time(0)*1.2);
195 printf("ServerThread::run: seed=%u\n", seed);
196
197 LocalServerSocket serverSocket("test", &waiter);
198 LocalAcceptor& acceptor = serverSocket.getAcceptor();
199
200 while(!acceptor.accept()) {
201 if (acceptor.failed()) {
202 printf("ServerThread::run: acceptor failed...: %lu\n",
203 (unsigned long)acceptor.getErrorCode());
204 return;
205 }
206 printf("ServerThread::run: waiting...\n");
207 waiter.wait();
208 }
209
210 printf("ServerThread::run: waiting done, received connection\n");
211 LocalSocket* socket = acceptor.getSocket();
212
213 acceptor.accept();
214
215 communicate(*socket, seed, false);
216
217 delete socket;
218}
219
220//------------------------------------------------------------------------------
221//------------------------------------------------------------------------------
222
223class ClientThread : public TestThread
224{
225public:
226 virtual void run();
227};
228
229//------------------------------------------------------------------------------
230
231void ClientThread::run()
232{
233 Waiter waiter;
234
235 unsigned seed = static_cast<unsigned>(time(0)*1.8);
236 printf("ClientThread::run: seed=%u\n", seed);
237
238 printf("ClientThread::run: sleeping\n");
239 sleep(500);
240 printf("ClientThread::run: connecting\n");
241
242 LocalClientSocket socket("test", &waiter);
243
244 LocalConnector& connector = socket.getConnector();
245
246 while(!connector.connect()) {
247 if (connector.failed()) {
248 printf("ClientThread::run: connector failed...\n");
249 return;
250 }
251 printf("ClientThread::run: waiting...\n");
252 waiter.wait();
253 }
254
255 printf("ClientThread::run: connected\n");
256
257 communicate(socket, seed, true);
258}
259
260//------------------------------------------------------------------------------
261//------------------------------------------------------------------------------
262
263int main()
264{
265#if TARGET_API_POSIX
266 signal(SIGPIPE, SIG_IGN);
267#endif
268
269 ServerThread serverThread;
270 ClientThread clientThread;
271
272 printf("Starting threads\n");
273 serverThread.start();
274 clientThread.start();
275
276 //Thread::sleep(60000);
277 Thread::sleep(5000);
278 printf("Signalling the server thread\n");
279 serverThread.interrupt();
280 //clientThread.interrupt();
281 printf("Signalled the server thread\n");
282
283 printf("Waiting for the client thread\n");
284 clientThread.join();
285 printf("Waiting for the server thread\n");
286 serverThread.join();
287 printf("Both threads returned\n");
288
289 return 0;
290}
291
292//------------------------------------------------------------------------------
293
294// Local Variables:
295// mode: C++
296// c-basic-offset: 4
297// indent-tabs-mode: nil
298// End:
Note: See TracBrowser for help on using the repository browser.