Changeset 19:54c3f68d2d46 in xplcommon


Ignore:
Timestamp:
12/29/12 18:47:52 (11 years ago)
Author:
István Váradi <ivaradi@…>
Branch:
default
Phase:
public
Message:

Fixed problems with non-blocking mode and extended the socket test program to actually communicate

Files:
3 edited

Legend:

Unmodified
Added
Removed
  • src/xplcommon/posix/ServerSocket.cc

    r14 r19  
    4848{
    4949    if (fd<0) setErrorCodeFromErrno();
     50    else if (waiter!=0) setNonBlocking(fd);
    5051}
    5152
  • src/xplcommon/posix/Socket.h

    r16 r19  
    8989{
    9090    if (fd<0) setErrorCodeFromErrno();
     91    else if (waiter!=0) setNonBlocking(fd);
    9192}
    9293
     
    9798    BufferedStream(waiter, fd, readingCapacity, writingCapacity)
    9899{
     100    if (waiter!=0) setNonBlocking(fd);
    99101}
    100102
  • test/testlocsock.cc

    r18 r19  
    3939#include <xplcommon/ReadingBuffer.h>
    4040#include <xplcommon/WritingBuffer.h>
    41 
     41#include <xplcommon/WaitableEvent.h>
     42
     43#include <cstdlib>
    4244#include <cstdio>
     45#include <cassert>
     46
     47#include <signal.h>
     48
     49#include <sys/time.h>
    4350
    4451//------------------------------------------------------------------------------
     
    5158using xplcommon::LocalClientSocket;
    5259using xplcommon::Connector;
     60using xplcommon::ReadingBuffer;
     61using xplcommon::WritingBuffer;
     62using xplcommon::WaitableEvent;
     63
     64using std::min;
     65
     66//------------------------------------------------------------------------------
     67
     68void communicate(ReadingBuffer& readingBuffer, WritingBuffer& writingBuffer,
     69                 Waiter& waiter, unsigned seed, const char* prefix,
     70                 WaitableEvent* quitEvent = 0)
     71{
     72    static const size_t minToWrite = 2076;
     73    static const size_t maxToWrite = 1345670;
     74    static const size_t rangeToWrite = maxToWrite - minToWrite;
     75
     76    static const size_t reportingInterval = 100000;
     77
     78    static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.95);
     79    static const int sleepRemainder = RAND_MAX - sleepThreshold;
     80    static const unsigned minSleep = 10;
     81    static const unsigned maxSleep = 250;
     82    static const unsigned sleepRange = maxSleep - minSleep;
     83
     84    size_t numBytesRead = 0;
     85    size_t nextReportRead = reportingInterval;
     86    size_t numWriteBlocked = 0;
     87    size_t numBytesWritten = 0;
     88    size_t nextReportWritten = reportingInterval;
     89    size_t numReadBlocked = 0;
     90    size_t numMillisSlept = 0;
     91
     92    bool toQuit = false;
     93
     94    while(!toQuit) {
     95        int r = rand_r(&seed);
     96        size_t toWrite = minToWrite +
     97            static_cast<size_t>( static_cast<double>(r) *
     98                                 rangeToWrite / RAND_MAX );
     99
     100        while(toWrite>0) {
     101            writingBuffer.reset();
     102            size_t length = min(writingBuffer.getCapacity(), toWrite);
     103            writingBuffer.addLength(length);
     104
     105            if (quitEvent!=0 && quitEvent->check()) {
     106                toQuit = true;
     107                break;
     108            }
     109
     110            if (writingBuffer.write()) {
     111                numBytesWritten += length;
     112                toWrite -= length;
     113                length = 0;
     114
     115                if (numBytesWritten>=nextReportWritten) {
     116                    printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n",
     117                           prefix, numBytesWritten, numWriteBlocked,
     118                           numMillisSlept);
     119                    nextReportWritten += reportingInterval;
     120                }
     121            } else if (writingBuffer.failed()) {
     122                printf("%s: writing failed with error: %lu\n",
     123                       prefix,
     124                       static_cast<unsigned long>(writingBuffer.getErrorCode()));
     125                toQuit = true;
     126                break;
     127            } else {
     128                ++numWriteBlocked;
     129            }
     130
     131            if (readingBuffer.read()) {
     132                numBytesRead += readingBuffer.getLength();
     133                if (numBytesRead>=nextReportRead) {
     134                    printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n",
     135                           prefix, numBytesRead, numReadBlocked, numMillisSlept);
     136                    nextReportRead += reportingInterval;
     137                }
     138                readingBuffer.reset();
     139            } else if (readingBuffer.failed()) {
     140                printf("%s: reading failed with error: %lu\n",
     141                       prefix,
     142                       static_cast<unsigned long>(readingBuffer.getErrorCode()));
     143                toQuit = true;
     144                break;
     145            } else {
     146                ++numReadBlocked;
     147            }
     148
     149            r = rand_r(&seed);
     150            if (r>=sleepThreshold) {
     151                unsigned toSleep = minSleep +
     152                    static_cast<unsigned>(static_cast<double>(r - sleepThreshold) *
     153                                          sleepRange / sleepRemainder);
     154                assert(toSleep>=minSleep);
     155                assert(toSleep<maxSleep);
     156                Thread::sleep(toSleep);
     157                numMillisSlept += toSleep;
     158            }
     159
     160            if (length!=0) {
     161                waiter.wait();
     162                if (waiter.failed()) {
     163                    printf("%s: waiting failed with error: %lu\n",
     164                           prefix,
     165                           static_cast<unsigned long>(waiter.getErrorCode()));
     166                    toQuit = true;
     167                    break;
     168                }
     169            }
     170        }
     171    }
     172
     173    printf("%s: written %zu bytes with %zu blocking\n",
     174           prefix, numBytesWritten, numWriteBlocked);
     175    printf("%s: read %zu bytes with %zu blocking\n",
     176           prefix, numBytesRead, numReadBlocked);
     177    printf("%s: slept %zu ms\n", prefix, numMillisSlept);
     178}
    53179
    54180//------------------------------------------------------------------------------
     
    56182class ServerThread : public Thread
    57183{
     184private:
     185    Waiter waiter;
     186
     187    WaitableEvent event;
     188
     189public:
     190    ServerThread();
     191
     192    void signal();
     193
    58194    virtual void run();
    59195};
     
    61197//------------------------------------------------------------------------------
    62198
     199ServerThread::ServerThread() :
     200    event(&waiter)
     201{
     202}
     203
     204//------------------------------------------------------------------------------
     205
     206void ServerThread::signal()
     207{
     208    event.fire();
     209}
     210
     211//------------------------------------------------------------------------------
     212
    63213void ServerThread::run()
    64214{
    65     printf("ServerThread::run\n");
    66 
    67     Waiter waiter;
     215    unsigned seed = static_cast<unsigned>(time(0)*1.2);
     216    printf("ServerThread::run: seed=%u\n", seed);
    68217
    69218    LocalServerSocket serverSocket("test", &waiter);
    70219    LocalAcceptor& acceptor = serverSocket.getAcceptor();
    71220
    72     while(true) {
    73         while(!acceptor.accept()) {
    74             if (acceptor.failed()) {
    75                 printf("ServerThread::run: acceptor failed...\n");
    76                 return;
    77             }
    78             printf("ServerThread::run: waiting...\n");
    79             waiter.wait();
     221    while(!acceptor.accept()) {
     222        if (acceptor.failed()) {
     223            printf("ServerThread::run: acceptor failed...\n");
     224            return;
    80225        }
    81 
    82         printf("ServerThread::run: waiting done, received connection\n");
    83         LocalSocket* socket = acceptor.getSocket();
    84         delete socket;
    85         break;
     226        printf("ServerThread::run: waiting...\n");
     227        waiter.wait();
    86228    }
     229
     230    printf("ServerThread::run: waiting done, received connection\n");
     231    LocalSocket* socket = acceptor.getSocket();
     232
     233    communicate(socket->getReadingBuffer(), socket->getWritingBuffer(),
     234                waiter, seed, "ServerThread", &event);
     235
     236
     237    delete socket;
    87238}
    88239
     
    100251void ClientThread::run()
    101252{
     253    unsigned seed = static_cast<unsigned>(time(0)*1.8);
     254    printf("ClientThread::run: seed=%u\n", seed);
     255
    102256    printf("ClientThread::run: sleeping\n");
    103     sleep(2000);
     257    sleep(500);
    104258    printf("ClientThread::run: connecting\n");
    105259
     
    120274
    121275    printf("ClientThread::run: connected\n");
     276
     277    communicate(socket.getReadingBuffer(), socket.getWritingBuffer(),
     278                waiter, seed, "ClientThread");
    122279}
    123280
     
    127284int main()
    128285{
     286    signal(SIGPIPE, SIG_IGN);
     287
    129288    ServerThread serverThread;
    130289    ClientThread clientThread;
     
    133292    serverThread.start();
    134293    clientThread.start();
     294
     295    Thread::sleep(60000);
     296    serverThread.signal();
     297    printf("Signalled the server thread\n");
    135298
    136299    printf("Waiting for the client thread\n");
Note: See TracChangeset for help on using the changeset viewer.