// Copyright (c) 2012 by István Váradi // This file is part of libxplcommon, a common utility library for // development related to X-Plane // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // 1. Redistributions of source code must retain the above copyright notice, this // list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // The views and conclusions contained in the software and documentation are those // of the authors and should not be interpreted as representing official policies, // either expressed or implied, of the FreeBSD Project. //------------------------------------------------------------------------------ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //------------------------------------------------------------------------------ using xplcommon::Thread; using xplcommon::Waiter; using xplcommon::LocalServerSocket; using xplcommon::LocalAcceptor; using xplcommon::LocalSocket; using xplcommon::LocalClientSocket; using xplcommon::Connector; using xplcommon::ReadingBuffer; using xplcommon::WritingBuffer; using xplcommon::WaitableEvent; using std::min; //------------------------------------------------------------------------------ void communicate(ReadingBuffer& readingBuffer, WritingBuffer& writingBuffer, Waiter& waiter, unsigned seed, const char* prefix, WaitableEvent* quitEvent = 0) { static const size_t minToWrite = 2076; static const size_t maxToWrite = 1345670; static const size_t rangeToWrite = maxToWrite - minToWrite; static const size_t reportingInterval = 100000; static const int sleepThreshold = static_cast(RAND_MAX * 0.95); static const int sleepRemainder = RAND_MAX - sleepThreshold; static const unsigned minSleep = 10; static const unsigned maxSleep = 250; static const unsigned sleepRange = maxSleep - minSleep; size_t numBytesRead = 0; size_t nextReportRead = reportingInterval; size_t numWriteBlocked = 0; size_t numBytesWritten = 0; size_t nextReportWritten = reportingInterval; size_t numReadBlocked = 0; size_t numMillisSlept = 0; bool toQuit = false; while(!toQuit) { int r = rand_r(&seed); size_t toWrite = minToWrite + static_cast( static_cast(r) * rangeToWrite / RAND_MAX ); while(toWrite>0) { writingBuffer.reset(); size_t length = min(writingBuffer.getCapacity(), toWrite); writingBuffer.addLength(length); if (quitEvent!=0 && quitEvent->check()) { toQuit = true; break; } if (writingBuffer.write()) { numBytesWritten += length; toWrite -= length; length = 0; if (numBytesWritten>=nextReportWritten) { printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n", prefix, numBytesWritten, numWriteBlocked, numMillisSlept); nextReportWritten += reportingInterval; } } else if (writingBuffer.failed()) { printf("%s: writing failed with error: %lu\n", prefix, static_cast(writingBuffer.getErrorCode())); toQuit = true; break; } else { ++numWriteBlocked; } if (readingBuffer.read()) { numBytesRead += readingBuffer.getLength(); if (numBytesRead>=nextReportRead) { printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n", prefix, numBytesRead, numReadBlocked, numMillisSlept); nextReportRead += reportingInterval; } readingBuffer.reset(); } else if (readingBuffer.failed()) { printf("%s: reading failed with error: %lu\n", prefix, static_cast(readingBuffer.getErrorCode())); toQuit = true; break; } else { ++numReadBlocked; } r = rand_r(&seed); if (r>=sleepThreshold) { unsigned toSleep = minSleep + static_cast(static_cast(r - sleepThreshold) * sleepRange / sleepRemainder); assert(toSleep>=minSleep); assert(toSleep(waiter.getErrorCode())); toQuit = true; break; } } } } printf("%s: written %zu bytes with %zu blocking\n", prefix, numBytesWritten, numWriteBlocked); printf("%s: read %zu bytes with %zu blocking\n", prefix, numBytesRead, numReadBlocked); printf("%s: slept %zu ms\n", prefix, numMillisSlept); } //------------------------------------------------------------------------------ class ServerThread : public Thread { private: Waiter waiter; WaitableEvent event; public: ServerThread(); void signal(); virtual void run(); }; //------------------------------------------------------------------------------ ServerThread::ServerThread() : event(&waiter) { } //------------------------------------------------------------------------------ void ServerThread::signal() { event.fire(); } //------------------------------------------------------------------------------ void ServerThread::run() { unsigned seed = static_cast(time(0)*1.2); printf("ServerThread::run: seed=%u\n", seed); LocalServerSocket serverSocket("test", &waiter); LocalAcceptor& acceptor = serverSocket.getAcceptor(); while(!acceptor.accept()) { if (acceptor.failed()) { printf("ServerThread::run: acceptor failed...\n"); return; } printf("ServerThread::run: waiting...\n"); waiter.wait(); } printf("ServerThread::run: waiting done, received connection\n"); LocalSocket* socket = acceptor.getSocket(); communicate(socket->getReadingBuffer(), socket->getWritingBuffer(), waiter, seed, "ServerThread", &event); delete socket; } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ class ClientThread : public Thread { public: virtual void run(); }; //------------------------------------------------------------------------------ void ClientThread::run() { unsigned seed = static_cast(time(0)*1.8); printf("ClientThread::run: seed=%u\n", seed); printf("ClientThread::run: sleeping\n"); sleep(500); printf("ClientThread::run: connecting\n"); Waiter waiter; LocalClientSocket socket("test", &waiter); Connector& connector = socket.getConnector(); while(!connector.connect()) { if (connector.failed()) { printf("ClientThread::run: connector failed...\n"); return; } printf("ClientThread::run: waiting...\n"); waiter.wait(); } printf("ClientThread::run: connected\n"); communicate(socket.getReadingBuffer(), socket.getWritingBuffer(), waiter, seed, "ClientThread"); } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ int main() { signal(SIGPIPE, SIG_IGN); ServerThread serverThread; ClientThread clientThread; printf("Starting threads\n"); serverThread.start(); clientThread.start(); Thread::sleep(60000); serverThread.signal(); printf("Signalled the server thread\n"); printf("Waiting for the client thread\n"); clientThread.join(); printf("Waiting for the server thread\n"); serverThread.join(); printf("Both threads returned\n"); return 0; } //------------------------------------------------------------------------------ // Local Variables: // mode: C++ // c-basic-offset: 4 // indent-tabs-mode: nil // End: