// Copyright (c) 2012 by István Váradi // This file is part of libxplcommon, a common utility library for // development related to X-Plane // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // 1. Redistributions of source code must retain the above copyright notice, this // list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // The views and conclusions contained in the software and documentation are those // of the authors and should not be interpreted as representing official policies, // either expressed or implied, of the FreeBSD Project. //------------------------------------------------------------------------------ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //------------------------------------------------------------------------------ using xplcommon::Thread; using xplcommon::Waiter; using xplcommon::LocalServerSocket; using xplcommon::LocalAcceptor; using xplcommon::LocalSocket; using xplcommon::LocalClientSocket; using xplcommon::LocalConnector; using xplcommon::BufferedStream; using xplcommon::DataStream; using xplcommon::PseudoRandom; using std::min; //------------------------------------------------------------------------------ class TestThread : public Thread { private: static const uint8_t OP_ADD = 1; static const uint8_t OP_SUB = 2; static const uint8_t OP_MUL = 3; static const uint8_t OP_DIV = 4; private: static int32_t perform(int32_t value, uint8_t operation, int32_t x); DataStream* dataStream; protected: TestThread(); public: void interrupt(); protected: void communicate(BufferedStream& stream, unsigned seed, bool isClient); }; //------------------------------------------------------------------------------ inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x) { switch (operation) { case OP_ADD: return value + x; case OP_SUB: return value - x; case OP_MUL: return value * x; case OP_DIV: return value / x; default: return value; } } //------------------------------------------------------------------------------ inline TestThread::TestThread() : dataStream(0) { } //------------------------------------------------------------------------------ inline void TestThread::interrupt() { if (dataStream!=0) dataStream->interrupt(); } //------------------------------------------------------------------------------ void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient) { static const size_t reportInterval = 100000; const char* prefix = isClient ? "Client" : "Server"; dataStream = new DataStream(stream); PseudoRandom random(seed); bool first = true; size_t numRequests = 0; size_t nextReportRequests = reportInterval; int32_t value = 0; if (isClient) { value = random.next(); dataStream->writeS32(value); } else { value = dataStream->readS32(); } int32_t newValue = value; while(*dataStream) { if (!first || isClient) { unsigned numOperations = random.nextUnsigned(20, 1); dataStream->writeU16(numOperations); //printf("%s: numOperations=%u\n", prefix, numOperations); for(unsigned i = 0; iwriteU8(operation); dataStream->writeS32(x); newValue = perform(newValue, operation, x); } if (!dataStream->flush()) break; if (isClient) { value = newValue; ++numRequests; if (numRequests>=nextReportRequests) { printf("%s: after %lu requests, value=%d\n", prefix, static_cast(numRequests), value); nextReportRequests += reportInterval; } } } unsigned numOperations = dataStream->readU16(); for (unsigned i = 0; *dataStream && ireadU8(); int32_t x = dataStream->readS32(); newValue = perform(newValue, operation, x); } if (*dataStream) { if (!isClient) { value = newValue; ++numRequests; if (numRequests>=nextReportRequests) { printf("%s: after %lu requests, value=%d\n", prefix, static_cast(numRequests), value); nextReportRequests += reportInterval; } } } first = false; } if (dataStream->failed()) { printf("%s: data stream failed with error code: %lu\n", prefix, static_cast(dataStream->getErrorCode())); } else if (dataStream->isInterrupted()) { printf("%s: data stream was interrupted\n", prefix); } printf("%s: value=%d, %lu requests\n", prefix, value, static_cast(numRequests)); } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ class ServerThread : public TestThread { public: virtual void run(); }; //------------------------------------------------------------------------------ void ServerThread::run() { Waiter waiter; unsigned seed = static_cast(time(0)*1.2); printf("ServerThread::run: seed=%u\n", seed); LocalServerSocket serverSocket("test", &waiter); LocalAcceptor& acceptor = serverSocket.getAcceptor(); while(!acceptor.accept()) { if (acceptor.failed()) { printf("ServerThread::run: acceptor failed...: %lu\n", (unsigned long)acceptor.getErrorCode()); return; } printf("ServerThread::run: waiting...\n"); waiter.wait(); } printf("ServerThread::run: waiting done, received connection\n"); LocalSocket* socket = acceptor.getSocket(); acceptor.accept(); communicate(*socket, seed, false); delete socket; } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ class ClientThread : public TestThread { public: virtual void run(); }; //------------------------------------------------------------------------------ void ClientThread::run() { Waiter waiter; unsigned seed = static_cast(time(0)*1.8); printf("ClientThread::run: seed=%u\n", seed); printf("ClientThread::run: sleeping\n"); sleep(500); printf("ClientThread::run: connecting\n"); LocalClientSocket socket("test", &waiter); LocalConnector& connector = socket.getConnector(); while(!connector.connect()) { if (connector.failed()) { printf("ClientThread::run: connector failed...\n"); return; } printf("ClientThread::run: waiting...\n"); waiter.wait(); } printf("ClientThread::run: connected\n"); communicate(socket, seed, true); } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ int main() { #if TARGET_API_POSIX signal(SIGPIPE, SIG_IGN); #endif ServerThread serverThread; ClientThread clientThread; printf("Starting threads\n"); serverThread.start(); clientThread.start(); Thread::sleep(60000); //Thread::sleep(1000); printf("Signalling the client thread\n"); //serverThread.interrupt(); clientThread.interrupt(); printf("Signalled the client thread\n"); printf("Waiting for the client thread\n"); clientThread.join(); printf("Waiting for the server thread\n"); serverThread.join(); printf("Both threads returned\n"); return 0; } //------------------------------------------------------------------------------ // Local Variables: // mode: C++ // c-basic-offset: 4 // indent-tabs-mode: nil // End: