// Copyright (c) 2012 by István Váradi // This file is part of libxplcommon, a common utility library for // development related to X-Plane // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // 1. Redistributions of source code must retain the above copyright notice, this // list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // The views and conclusions contained in the software and documentation are those // of the authors and should not be interpreted as representing official policies, // either expressed or implied, of the FreeBSD Project. //------------------------------------------------------------------------------ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //------------------------------------------------------------------------------ using xplcommon::Thread; using xplcommon::Waiter; using xplcommon::LocalServerSocket; using xplcommon::LocalAcceptor; using xplcommon::LocalSocket; using xplcommon::LocalClientSocket; using xplcommon::LocalConnector; using xplcommon::BufferedStream; using xplcommon::BlockingStream; using xplcommon::PseudoRandom; using std::min; //------------------------------------------------------------------------------ class TestThread : public Thread { private: BlockingStream* dataStream; protected: TestThread(); public: void interrupt(); protected: void communicate(BufferedStream& stream, unsigned seed, bool isClient); }; //------------------------------------------------------------------------------ inline TestThread::TestThread() : dataStream(0) { } //------------------------------------------------------------------------------ inline void TestThread::interrupt() { if (dataStream!=0) dataStream->interrupt(); } //------------------------------------------------------------------------------ void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient) { static const size_t bufferSize = 1024; static const size_t minToWrite = 16; static const size_t maxToWrite = 6124; static const size_t reportingInterval = 100000000; const char* prefix = isClient ? "Client" : "Server"; dataStream = new BlockingStream(stream); PseudoRandom random(seed); size_t numBytesRead = 0; size_t nextReportRead = reportingInterval; size_t numBytesWritten = 0; size_t nextReportWritten = reportingInterval; bool first = true; unsigned char* buffer = new unsigned char[bufferSize]; while(true) { if (isClient || !first) { unsigned dataSize = random.nextUnsigned(maxToWrite, minToWrite); *reinterpret_cast(buffer) = dataSize; while(dataSize>0) { unsigned toWrite = min(static_cast(dataSize), bufferSize); if (!dataStream->write(buffer, toWrite)) break; dataSize -= toWrite; numBytesWritten += toWrite; if (numBytesWritten>=nextReportWritten) { printf("%s: written %lu bytes\n", prefix, static_cast(numBytesWritten)); nextReportWritten += reportingInterval; } } if (!dataStream->flush()) break; } uint32_t dataSize = 0; if (!dataStream->read(&dataSize, sizeof(dataSize))) break; assert(dataSize>=minToWrite); assert(dataSize<=maxToWrite); dataSize -= sizeof(dataSize); numBytesRead += 4; while(dataSize>0) { unsigned toRead = min(static_cast(dataSize), bufferSize); if (!dataStream->read(buffer, toRead)) break; dataSize -= toRead; numBytesRead += toRead; if (numBytesRead>=nextReportRead) { printf("%s: read %lu bytes\n", prefix, static_cast(numBytesRead)); nextReportRead += reportingInterval; } } first = false; } delete [] buffer; if (dataStream->failed()) { printf("%s: data stream failed with error code: %lu\n", prefix, static_cast(dataStream->getErrorCode())); } else if (dataStream->isInterrupted()) { printf("%s: data stream was interrupted\n", prefix); } printf("%s: written %lu bytes and read %lu bytes\n", prefix, static_cast(numBytesWritten), static_cast(numBytesRead)); } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ class ServerThread : public TestThread { public: virtual void run(); }; //------------------------------------------------------------------------------ void ServerThread::run() { Waiter waiter; unsigned seed = static_cast(time(0)*1.2); printf("ServerThread::run: seed=%u\n", seed); LocalServerSocket serverSocket("test", &waiter); LocalAcceptor& acceptor = serverSocket.getAcceptor(); while(!acceptor.accept()) { if (acceptor.failed()) { printf("ServerThread::run: acceptor failed...: %lu\n", (unsigned long)acceptor.getErrorCode()); return; } printf("ServerThread::run: waiting...\n"); waiter.wait(); } printf("ServerThread::run: waiting done, received connection\n"); LocalSocket* socket = acceptor.getSocket(); acceptor.accept(); communicate(*socket, seed, false); delete socket; } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ class ClientThread : public TestThread { public: virtual void run(); }; //------------------------------------------------------------------------------ void ClientThread::run() { Waiter waiter; unsigned seed = static_cast(time(0)*1.8); printf("ClientThread::run: seed=%u\n", seed); printf("ClientThread::run: sleeping\n"); sleep(500); printf("ClientThread::run: connecting\n"); LocalClientSocket socket("test", &waiter); LocalConnector& connector = socket.getConnector(); while(!connector.connect()) { if (connector.failed()) { printf("ClientThread::run: connector failed...\n"); return; } printf("ClientThread::run: waiting...\n"); waiter.wait(); } printf("ClientThread::run: connected\n"); communicate(socket, seed, true); } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ int main() { #if TARGET_API_POSIX signal(SIGPIPE, SIG_IGN); #endif ServerThread serverThread; ClientThread clientThread; printf("Starting threads\n"); serverThread.start(); clientThread.start(); Thread::sleep(60000); //Thread::sleep(5000); printf("Signalling the server thread\n"); serverThread.interrupt(); //clientThread.interrupt(); printf("Signalled the server thread\n"); printf("Waiting for the client thread\n"); clientThread.join(); printf("Waiting for the server thread\n"); serverThread.join(); printf("Both threads returned\n"); return 0; } //------------------------------------------------------------------------------ // Local Variables: // mode: C++ // c-basic-offset: 4 // indent-tabs-mode: nil // End: