Changeset 21:eb59943050c9 in xplcommon


Ignore:
Timestamp:
12/31/12 14:17:32 (11 years ago)
Author:
István Váradi <ivaradi@…>
Branch:
default
hg-Phase:
(<MercurialRepository 2 'hg:/home/ivaradi/xplane/hg/xplcommon' '/'>, 'public')
Message:

Added the implementation of the local sockets for Win32 and it seems to work

Files:
22 added
3 deleted
20 edited
1 moved

Legend:

Unmodified
Added
Removed
  • src/xplcommon/BufferedStream.h

    r17 r21  
    5858public:
    5959    /**
    60      * Destroy the buffered waitable.
     60     * Destroy the buffered stream.
    6161     */
    6262    virtual ~BufferedStream();
     
    6464    /**
    6565     * Get the reference of the reading buffer. Note, that it can be a
    66      * 0 reference, if the waitable has no reading buffer.
     66     * 0 reference, if the stream has no reading buffer.
    6767     */
    6868    ReadingBuffer& getReadingBuffer();
     
    7070    /**
    7171     * Get the reference of the writing buffer. Note, that it can be a
    72      * 0 reference, if the waitable has no writing buffer.
     72     * 0 reference, if the stream has no writing buffer.
    7373     */
    7474    WritingBuffer& getWritingBuffer();
  • src/xplcommon/LocalAcceptor.h

    r17 r21  
    3232//------------------------------------------------------------------------------
    3333
    34 #include "Acceptor.h"
     34#include "Failable.h"
    3535
    3636//------------------------------------------------------------------------------
     
    4848 * connection.
    4949 */
    50 class LocalAcceptor : public Acceptor
     50class LocalAcceptor : public FailableReference<LocalAcceptor>
    5151{
    5252public:
     53
     54    /**
     55     * Start accepting a connection.
     56     *
     57     * If a connection is already accepted, but not retrieved yet, it
     58     * returns true.
     59     *
     60     * If a new socket becomes immediately available, it returns true.
     61     *
     62     * If some error occurs, the acceptor is marked as failed.
     63     */
     64    bool accept();
     65
    5366    /**
    5467     * Get the local socket accepted last. If no socket was accepted,
    55      * return 0. The local socket's waiter will be the given one
    56      * (which can be 0).
     68     * return 0. The local socket's waiter will be the given one. If
     69     * the server socket has no waiter, this one should be 0. If the
     70     * server socket has a waiter, this one should not be 0. In other
     71     * words, if the server socket is non-blocking, the accepted one
     72     * should also be non-blocking, if the server socket is blocking,
     73     * the accepted one should also be blocking.
    5774     */
    5875    LocalSocket* getSocket(Waiter* waiter,
  • src/xplcommon/LocalClientSocket.h

    r17 r21  
    3232//------------------------------------------------------------------------------
    3333
    34 #include "ClientSocket.h"
     34#include "BufferedStream.h"
    3535
    3636//------------------------------------------------------------------------------
     
    4444//------------------------------------------------------------------------------
    4545
     46class LocalConnector;
     47
     48//------------------------------------------------------------------------------
     49
    4650/**
    4751 * Client socket that connects to a server socket locally. On Linux it
     
    5054 * The server socket's name should be given.
    5155 */
    52 class LocalClientSocket : public ClientSocket
     56class LocalClientSocket : public BufferedStream
    5357{
    5458public:
     
    6569                      size_t readingCapacity = DEFAULT_CAPACITY,
    6670                      size_t writingCapacity = DEFAULT_CAPACITY);
     71
     72    /**
     73     * Get the connector. Use this object to connect to another socket.
     74     */
     75    LocalConnector& getConnector();
    6776};
    6877
  • src/xplcommon/LocalConnector.h

    r17 r21  
    2828// either expressed or implied, of the FreeBSD Project.
    2929
    30 #ifndef XPLCOMMON_CONNECTOR_H
    31 #define XPLCOMMON_CONNECTOR_H
     30#ifndef XPLCOMMON_LOCALCONNECTOR_H
     31#define XPLCOMMON_LOCALCONNECTOR_H
    3232//------------------------------------------------------------------------------
    3333
     
    3636//------------------------------------------------------------------------------
    3737
    38 #if 0 // The public interface of a connector for a client socket
     38#if 0 // The public interface of a connector for a local client socket
    3939
    4040//------------------------------------------------------------------------------
     
    4949 * only acquired from a client socket.
    5050 */
    51 class Connector : public FailableReference<Connector>
     51class LocalConnector : public FailableReference<LocalConnector>
    5252{
    5353public:
     
    7373//------------------------------------------------------------------------------
    7474
    75 #endif // 0, The public interface of a connector for a client socket
     75#endif // 0, The public interface of a connector for a local client socket
    7676
    7777//------------------------------------------------------------------------------
     
    8585//------------------------------------------------------------------------------
    8686
    87 #include "win32/Connector.h"
     87#include "win32/LocalConnector.h"
    8888
    89 namespace xplcommon { typedef win32::Connector Connector; }
     89namespace xplcommon { typedef win32::LocalConnector LocalConnector; }
    9090
    9191//------------------------------------------------------------------------------
     
    9595//------------------------------------------------------------------------------
    9696
    97 #include "posix/Connector.h"
     97#include "posix/LocalConnector.h"
    9898
    99 namespace xplcommon { typedef posix::Connector Connector; }
     99namespace xplcommon { typedef posix::LocalConnector LocalConnector; }
    100100
    101101//------------------------------------------------------------------------------
     
    104104
    105105//------------------------------------------------------------------------------
    106 #endif // XPLCOMMON_CONNECTOR_H
     106#endif // XPLCOMMON_LOCALCONNECTOR_H
    107107
    108108// Local Variables:
  • src/xplcommon/Makefile.am

    r17 r21  
    3232        ReadingBuffer.h         \
    3333        WritingBuffer.h         \
    34         Acceptor.h              \
    3534        LocalAcceptor.h         \
    3635        LocalSocket.h           \
    3736        LocalServerSocket.h     \
    38         Connector.h             \
    39         ClientSocket.h          \
     37        LocalConnector.h        \
    4038        LocalClientSocket.h
  • src/xplcommon/ReadingBuffer.h

    r17 r21  
    9595//------------------------------------------------------------------------------
    9696
    97 // #include "win32/ReadingBuffer.h"
     97#include "win32/ReadingBuffer.h"
    9898
    99 // namespace xplcommon { typedef win32::ReadingBuffer ReadingBuffer; }
     99namespace xplcommon { typedef win32::ReadingBuffer ReadingBuffer; }
    100100
    101101//------------------------------------------------------------------------------
  • src/xplcommon/WritingBuffer.h

    r17 r21  
    101101//------------------------------------------------------------------------------
    102102
    103 // #include "win32/WritingBuffer.h"
     103#include "win32/WritingBuffer.h"
    104104
    105 // namespace xplcommon { typedef win32::WritingBuffer WritingBuffer; }
     105namespace xplcommon { typedef win32::WritingBuffer WritingBuffer; }
    106106
    107107//------------------------------------------------------------------------------
  • src/xplcommon/posix/ClientSocket.cc

    r16 r21  
    4141
    4242using xplcommon::posix::ClientSocket;
    43 using xplcommon::posix::Connector;
    44 
    45 //------------------------------------------------------------------------------
    46 
    47 ClientSocket::~ClientSocket()
    48 {
    49     delete connector;
    50 }
    51 
    52 //------------------------------------------------------------------------------
    53 
    54 Connector& ClientSocket::getConnector()
    55 {
    56     if (connector==0) {
    57         connector = createConnector();
    58     }
    59     return *connector;
    60 }
    6143
    6244//------------------------------------------------------------------------------
     
    6446void ClientSocket::handleEvents(short events)
    6547{
    66     if (connector!=0 && connector->connecting && (events&POLLOUT)==POLLOUT) {
    67         connector->handleWritable();
     48    if (connector.connecting && (events&POLLOUT)==POLLOUT) {
     49        connector.handleWritable();
    6850        events &= ~POLLOUT;
    6951    }
  • src/xplcommon/posix/ClientSocket.h

    r14 r21  
    5151private:
    5252    /**
    53      * The connector, if any. It is created only, if requested.
     53     * The connector.
    5454     */
    55     Connector* connector;
     55    Connector& connector;
    5656
    5757protected:
     
    5959     * Construct the socket with the given parameters.
    6060     */
    61     ClientSocket(int domain, int type, Waiter* waiter = 0, int protocol = 0,
     61    ClientSocket(int domain, int type, Connector& connector,
     62                 Waiter* waiter = 0, int protocol = 0,
    6263                 size_t readingCapacity = DEFAULT_CAPACITY,
    6364                 size_t writingCapacity = DEFAULT_CAPACITY);
    6465
    65 public:
    66     /**
    67      * Destroy the socket.
    68      */
    69     virtual ~ClientSocket();
    70 
    71     /**
    72      * Get the connector. Use this object to connect to another socket.
    73      */
    74     Connector& getConnector();
    75 
    7666protected:
    77     /**
    78      * Create a new connector of the correct type.
    79      */
    80     virtual Connector* createConnector() = 0;
    81 
    8267    /**
    8368     * Handle any events on the file descriptor. If a connection is in
     
    10590//------------------------------------------------------------------------------
    10691
    107 inline ClientSocket::ClientSocket(int domain, int type, Waiter* waiter,
    108                                   int protocol, size_t readingCapacity,
     92inline ClientSocket::ClientSocket(int domain, int type, Connector& connector,
     93                                  Waiter* waiter, int protocol,
     94                                  size_t readingCapacity,
    10995                                  size_t writingCapacity) :
    11096    Socket(domain, type, waiter, protocol, readingCapacity, writingCapacity),
    111     connector(0)
     97    connector(connector)
    11298{
    11399}
  • src/xplcommon/posix/LocalClientSocket.cc

    r15 r21  
    3232#include "LocalClientSocket.h"
    3333
    34 #include "Connector.h"
     34#include "LocalConnector.h"
    3535#include "LocalServerSocket.h"
    3636
     
    4040
    4141using xplcommon::posix::LocalClientSocket;
    42 using xplcommon::posix::Connector;
    43 
    44 //------------------------------------------------------------------------------
    45 
    46 /**
    47  * The connector class used by the local client.
    48  */
    49 class LocalClientSocket::LocalConnector : public Connector
    50 {
    51 private:
    52     /**
    53      * The reference to the corresponding socket's address.
    54      */
    55     const struct sockaddr_un& address;
    56 
    57 public:
    58     /**
    59      * Create the connector for the given local client socket.
    60      */
    61     LocalConnector(LocalClientSocket* socket);
    62 
    63 protected:
    64     /**
    65      * Get the address.
    66      */
    67     virtual const struct sockaddr* getAddress(size_t& addrlen);
    68 };
    69 
    70 //------------------------------------------------------------------------------
    71 
    72 inline
    73 LocalClientSocket::LocalConnector::LocalConnector(LocalClientSocket* socket) :
    74     Connector(socket),
    75     address(socket->sun)
    76 {
    77 }
    78 
    79 //------------------------------------------------------------------------------
    80 
    81 const struct sockaddr*
    82 LocalClientSocket::LocalConnector::getAddress(size_t& addrlen)
    83 {
    84     addrlen = sizeof(address);
    85     return reinterpret_cast<const struct sockaddr*>(&address);
    86 }
    8742
    8843//------------------------------------------------------------------------------
     
    9247                                     size_t readingCapacity,
    9348                                     size_t writingCapacity) :
    94     ClientSocket(AF_UNIX, SOCK_STREAM, waiter, 0,
    95                  readingCapacity, writingCapacity)
     49    ClientSocket(AF_UNIX, SOCK_STREAM, connector, waiter, 0,
     50                 readingCapacity, writingCapacity),
     51    connector(this, name)
    9652{
    97     sun.sun_family = AF_UNIX;
    98     LocalServerSocket::setupPath(sun, name);
    99 }
    100 
    101 //------------------------------------------------------------------------------
    102 
    103 Connector* LocalClientSocket::createConnector()
    104 {
    105     return new LocalConnector(this);
    10653}
    10754
  • src/xplcommon/posix/LocalClientSocket.h

    r15 r21  
    3434#include "ClientSocket.h"
    3535
    36 #include <sys/un.h>
     36#include "LocalConnector.h"
    3737
    3838//------------------------------------------------------------------------------
     
    5353private:
    5454    /**
    55      * The connector class.
    56      */
    57     class LocalConnector;
    58 
    59     /**
    6055     * The address to use for connecting.
    6156     */
    62     struct sockaddr_un sun;
     57    LocalConnector connector;
    6358
    6459public:
     
    7671                      size_t writingCapacity = DEFAULT_CAPACITY);
    7772
    78 protected:
    7973    /**
    80      * Create a new connector of the correct type.
     74     * Get the connector for this client.
    8175     */
    82     virtual Connector* createConnector();
     76    LocalConnector& getConnector();
    8377
    8478    friend class LocalConnector;
    8579};
     80
     81//------------------------------------------------------------------------------
     82// Inline definitions
     83//------------------------------------------------------------------------------
     84
     85inline LocalConnector& LocalClientSocket::getConnector()
     86{
     87    return connector;
     88}
    8689
    8790//------------------------------------------------------------------------------
  • src/xplcommon/posix/Makefile.am

    r16 r21  
    1515        ClientSocket.cc         \
    1616        LocalServerSocket.cc    \
     17        LocalConnector.cc       \
    1718        LocalClientSocket.cc
    1819
     
    3536        LocalAcceptor.h         \
    3637        LocalServerSocket.h     \
     38        LocalConnector.h        \
    3739        LocalClientSocket.h
  • src/xplcommon/posix/ReadingBuffer.cc

    r16 r21  
    3434#include "BufferedStream.h"
    3535
     36#include <cassert>
     37
    3638#include <poll.h>
    3739
     
    6062    } else {
    6163        stream.events &= ~POLLIN;
    62         addLength(result);
     64        size_t added = addLength(result);
     65        assert(added==result);
     66        assert(getLength()==result);
    6367    }
    6468    return result>=0;
  • src/xplcommon/posix/Waitable.h

    r14 r21  
    9393
    9494    /**
    95      * Indicate if this waitable is valid, i.e. the file descriptor is
    96      * non-negative.
    97      */
    98     bool isValid() const;
    99 
    100     /**
    10195     * Determine if the waitable is ready or not, i.e. if there is no
    10296     * need to wait for it.
     
    143137//------------------------------------------------------------------------------
    144138
    145 inline bool Waitable::isValid() const
    146 {
    147     return fd>=0;
    148 }
    149 
    150 //------------------------------------------------------------------------------
    151 
    152139inline bool Waitable::ready()
    153140{
  • src/xplcommon/win32/Event.cc

    r20 r21  
    4040//------------------------------------------------------------------------------
    4141
     42bool Event::isFired() const
     43{
     44    DWORD result = WaitForSingleObject(handle, (waiter==0) ? INFINITE : 0);
     45    if (result==WAIT_FAILED) {
     46        eventFailable.setErrorCode(GetLastError());
     47        return false;
     48    } else if (result==WAIT_OBJECT_0) {
     49        return true;
     50    } else {
     51        return false;
     52    }
     53}
     54
     55//------------------------------------------------------------------------------
     56
    4257void Event::addTo(Waiter& w)
    4358{
  • src/xplcommon/win32/Event.h

    r20 r21  
    3232//------------------------------------------------------------------------------
    3333
    34 #include "../Failable.h"
     34#include "EventFailable.h"
    3535
    3636#include <windows.h>
     
    4949 * Wrapper for an event.
    5050 */
    51 class Event : public ::xplcommon::Failable
     51class Event
    5252{
    5353private:
    5454    /**
     55     * The object receiving the failure codes.
     56     */
     57    EventFailable& eventFailable;
     58
     59    /**
    5560     * The handle of the event.
    5661     */
     
    6671     * Construct the event.
    6772     */
    68     Event();
     73    Event(EventFailable& eventFailable);
    6974
    7075    /**
     
    96101
    97102    /**
     103     * Determine if the event is being waited for, i.e. it is
     104     * associated with a waiter.
     105     */
     106    bool isWaited() const;
     107
     108    /**
    98109     * Set the event.
    99110     */
    100111    void fire();
     112
     113    /**
     114     * Determine if the event is fired or not.
     115     */
     116    bool isFired() const;
    101117
    102118    /**
     
    106122     */
    107123    bool clear();
     124
     125protected:
     126    /**
     127     * Set the error code on the failable object.
     128     */
     129    void setErrorCode(errorCode_t errorCode);
    108130};
    109131
     
    112134//------------------------------------------------------------------------------
    113135
    114 inline Event::Event() :
     136inline Event::Event(EventFailable& eventFailable) :
     137    eventFailable(eventFailable),
    115138    handle(CreateEvent(0, true, false, 0)),
    116139    waiter(0)
    117140{
    118     if (handle==0) setErrorCode(GetLastError());
     141    if (handle==0) eventFailable.setErrorCode(GetLastError());
    119142}
    120143
     
    143166//------------------------------------------------------------------------------
    144167
     168inline bool Event::isWaited() const
     169{
     170    return waiter!=0;
     171}
     172
     173//------------------------------------------------------------------------------
     174
    145175inline void Event::fire()
    146176{
    147177    if (!SetEvent(handle)) {
    148         setErrorCode(GetLastError());
     178        eventFailable.setErrorCode(GetLastError());
    149179    }
    150180}
     
    155185{
    156186    if (!ResetEvent(handle)) {
    157         setErrorCode(GetLastError());
     187        eventFailable.setErrorCode(GetLastError());
    158188        return false;
    159189    } else {
    160190        return true;
    161191    }
     192}
     193
     194//------------------------------------------------------------------------------
     195
     196inline void Event::setErrorCode(errorCode_t errorCode)
     197{
     198    eventFailable.setErrorCode(errorCode);
    162199}
    163200
  • src/xplcommon/win32/Makefile.am

    r20 r21  
    66        Overlapped.cc           \
    77        Waiter.cc               \
    8         WaitableEvent.cc
     8        BufferedStream.cc       \
     9        ReadingBuffer.cc        \
     10        WritingBuffer.cc        \
     11        LocalAcceptor.cc        \
     12        LocalServerSocketBase.cc\
     13        LocalConnector.cc       \
     14        LocalClientSocket.cc
    915
    1016include_xplcommon_win32dir=$(includedir)/xplcommon/win32
    1117include_xplcommon_win32_HEADERS=\
    1218        Thread.h                \
     19        EventFailable.h         \
    1320        Event.h                 \
    1421        Overlapped.h            \
    1522        Waiter.h                \
    16         WaitableEvent.h
     23        WaitableEvent.h         \
     24        Overlappable.h          \
     25        Completer.h             \
     26        StreamBuffer.h          \
     27        BufferedStream.h        \
     28        ReadingBuffer.h         \
     29        WritingBuffer.h         \
     30        LocalSocket.h           \
     31        LocalAcceptor.h         \
     32        LocalServerSocketBase.h \
     33        LocalServerSocket.h     \
     34        LocalConnector.h        \
     35        LocalClientSocket.h
  • src/xplcommon/win32/Overlapped.cc

    r20 r21  
    4646//------------------------------------------------------------------------------
    4747
     48bool Overlapped::getResult(DWORD& size, HANDLE file)
     49{
     50    bool result = GetOverlappedResult(file, &overlapped, &size, FALSE);
     51    if (!result) {
     52        DWORD error = GetLastError();
     53        if (error!=ERROR_IO_PENDING && error!=ERROR_IO_INCOMPLETE) {
     54            reset();
     55            setErrorCode(error);
     56        }
     57    } else {
     58        reset();
     59    }
     60    return result;
     61}
     62
     63//------------------------------------------------------------------------------
     64
    4865// Local Variables:
    4966// mode: C++
  • src/xplcommon/win32/Overlapped.h

    r20 r21  
    5959     * 0, and the event handle will be set.
    6060     */
    61     Overlapped();
     61    Overlapped(EventFailable& eventFailable);
    6262
    6363    /**
     
    7575     */
    7676    const OVERLAPPED* get() const;
     77
     78    /**
     79     * Get the result of the overlapped operation.
     80     *
     81     * If the operation has completed, the event will be cleared.
     82     *
     83     * @param size on successful return, it will contain the number of
     84     * bytes read or written, which may be 0 for an end-of-file
     85     * condition
     86     * @param file the handle of the file on which the operation in
     87     * question was performed
     88     *
     89     * @return true if the operation has completed, or false if it has
     90     * not, or a failure has occured.
     91     */
     92    bool getResult(DWORD& size, HANDLE file);
    7793};
    7894
     
    8197//------------------------------------------------------------------------------
    8298
    83 inline Overlapped::Overlapped()
     99inline Overlapped::Overlapped(EventFailable& eventFailable) :
     100    Event(eventFailable)
    84101{
    85102    reset();
  • src/xplcommon/win32/WaitableEvent.h

    r20 r21  
    3333
    3434#include "Event.h"
     35#include "EventFailable.h"
    3536
    3637//------------------------------------------------------------------------------
     
    4344 * An event which can be waited for.
    4445 */
    45 class WaitableEvent : public Event
     46class WaitableEvent : public Event, public EventFailable
    4647{
    4748public:
     
    6263//------------------------------------------------------------------------------
    6364
    64 inline WaitableEvent::WaitableEvent(Waiter* waiter)
     65inline WaitableEvent::WaitableEvent(Waiter* waiter) :
     66    Event(static_cast<EventFailable&>(*this))
    6567{
    6668    if (waiter!=0) addTo(*waiter);
     69}
     70
     71//------------------------------------------------------------------------------
     72
     73inline bool WaitableEvent::check()
     74{
     75    if (isFired()) {
     76        clear();
     77        return true;
     78    } else {
     79        return false;
     80    }
    6781}
    6882
  • test/testlocsock.cc

    r19 r21  
    3636#include <xplcommon/LocalSocket.h>
    3737#include <xplcommon/LocalClientSocket.h>
    38 #include <xplcommon/Connector.h>
     38#include <xplcommon/LocalConnector.h>
    3939#include <xplcommon/ReadingBuffer.h>
    4040#include <xplcommon/WritingBuffer.h>
     
    5757using xplcommon::LocalSocket;
    5858using xplcommon::LocalClientSocket;
    59 using xplcommon::Connector;
     59using xplcommon::LocalConnector;
    6060using xplcommon::ReadingBuffer;
    6161using xplcommon::WritingBuffer;
     
    6666//------------------------------------------------------------------------------
    6767
    68 void communicate(ReadingBuffer& readingBuffer, WritingBuffer& writingBuffer,
     68void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer,
    6969                 Waiter& waiter, unsigned seed, const char* prefix,
    7070                 WaitableEvent* quitEvent = 0)
     
    7474    static const size_t rangeToWrite = maxToWrite - minToWrite;
    7575
    76     static const size_t reportingInterval = 100000;
    77 
    78     static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.95);
     76    static const size_t reportingInterval = 10000;
     77
     78    static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.99);
    7979    static const int sleepRemainder = RAND_MAX - sleepThreshold;
    8080    static const unsigned minSleep = 10;
     
    9393
    9494    while(!toQuit) {
     95#if TARGET_API_POSIX
    9596        int r = rand_r(&seed);
     97#endif
     98#if TARGET_API_WIN32
     99        int r = rand();
     100#endif
    96101        size_t toWrite = minToWrite +
    97102            static_cast<size_t>( static_cast<double>(r) *
    98103                                 rangeToWrite / RAND_MAX );
    99104
     105        size_t lastWriteLength = 0;
     106        if (writingBuffer!=0) {
     107            lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
     108            writingBuffer->reset();
     109            writingBuffer->addLength(lastWriteLength);
     110        }
     111
    100112        while(toWrite>0) {
    101             writingBuffer.reset();
    102             size_t length = min(writingBuffer.getCapacity(), toWrite);
    103             writingBuffer.addLength(length);
    104 
    105113            if (quitEvent!=0 && quitEvent->check()) {
    106114                toQuit = true;
     
    108116            }
    109117
    110             if (writingBuffer.write()) {
    111                 numBytesWritten += length;
    112                 toWrite -= length;
    113                 length = 0;
    114 
    115                 if (numBytesWritten>=nextReportWritten) {
    116                     printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n",
    117                            prefix, numBytesWritten, numWriteBlocked,
    118                            numMillisSlept);
    119                     nextReportWritten += reportingInterval;
     118            bool hasPending = false;
     119
     120            if (writingBuffer!=0) {
     121                size_t loopCount = 0;
     122                while(toWrite>0) {
     123                    ++loopCount;
     124                    if (writingBuffer->write()) {
     125                        numBytesWritten += lastWriteLength;
     126                        toWrite -= lastWriteLength;
     127
     128                        if (numBytesWritten>=nextReportWritten) {
     129                            printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n",
     130                                   prefix,
     131                                   static_cast<unsigned long>(numBytesWritten),
     132                                   static_cast<unsigned long>(numWriteBlocked),
     133                                   static_cast<unsigned long>(numMillisSlept));
     134                            nextReportWritten += reportingInterval;
     135                        }
     136
     137                        if (toWrite>0) {
     138                            if (quitEvent!=0 && quitEvent->check()) {
     139                                toQuit = true;
     140                                break;
     141                            } else {
     142                                lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
     143                                writingBuffer->addLength(lastWriteLength);
     144                                if (loopCount>10) {
     145                                    break;
     146                                }
     147                            }
     148                        }
     149                    } else if (writingBuffer->failed()) {
     150                        printf("%s: writing failed with error: %lu\n",
     151                               prefix,
     152                               static_cast<unsigned long>(writingBuffer->getErrorCode()));
     153                        toQuit = true;
     154                        break;
     155                    } else {
     156                        ++numWriteBlocked;
     157                        hasPending = true;
     158                        break;
     159                    }
    120160                }
    121             } else if (writingBuffer.failed()) {
    122                 printf("%s: writing failed with error: %lu\n",
    123                        prefix,
    124                        static_cast<unsigned long>(writingBuffer.getErrorCode()));
    125                 toQuit = true;
    126                 break;
    127             } else {
    128                 ++numWriteBlocked;
     161                if (toQuit) break;
    129162            }
    130163
    131             if (readingBuffer.read()) {
    132                 numBytesRead += readingBuffer.getLength();
    133                 if (numBytesRead>=nextReportRead) {
    134                     printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n",
    135                            prefix, numBytesRead, numReadBlocked, numMillisSlept);
    136                     nextReportRead += reportingInterval;
     164            if (readingBuffer!=0) {
     165                size_t loopCount = 0;
     166                while(true) {
     167                    ++loopCount;
     168                    if (readingBuffer->read()) {
     169                        size_t l = readingBuffer->getLength();
     170                        if (l==0) {
     171                            printf("%s: reading encountered end of file\n", prefix);
     172                            toQuit = true;
     173                            break;
     174                        }
     175                        numBytesRead += l;
     176                        if (numBytesRead>=nextReportRead) {
     177                            printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n",
     178                                   prefix,
     179                                   static_cast<unsigned long>(numBytesRead),
     180                                   static_cast<unsigned long>(numReadBlocked),
     181                                   static_cast<unsigned long>(numMillisSlept));
     182                            nextReportRead += reportingInterval;
     183                        }
     184                        readingBuffer->reset();
     185                        if (quitEvent!=0 && quitEvent->check()) {
     186                            toQuit = true;
     187                            break;
     188                        } else if (loopCount>10) {
     189                            break;
     190                        }
     191                    } else if (readingBuffer->failed()) {
     192                        printf("%s: reading failed with error: %lu\n",
     193                               prefix,
     194                               static_cast<unsigned long>(readingBuffer->getErrorCode()));
     195                        toQuit = true;
     196                        break;
     197                    } else {
     198                        ++numReadBlocked;
     199                        hasPending = true;
     200                        break;
     201                    }
    137202                }
    138                 readingBuffer.reset();
    139             } else if (readingBuffer.failed()) {
    140                 printf("%s: reading failed with error: %lu\n",
    141                        prefix,
    142                        static_cast<unsigned long>(readingBuffer.getErrorCode()));
    143                 toQuit = true;
    144                 break;
    145             } else {
    146                 ++numReadBlocked;
     203                if (toQuit) break;
    147204            }
    148205
     206#if TARGET_API_POSIX
    149207            r = rand_r(&seed);
     208#endif
     209#if TARGET_API_WIN32
     210            r = rand();
     211#endif
    150212            if (r>=sleepThreshold) {
    151213                unsigned toSleep = minSleep +
     
    158220            }
    159221
    160             if (length!=0) {
     222            if (toWrite>0 && hasPending) {
    161223                waiter.wait();
    162224                if (waiter.failed()) {
     
    171233    }
    172234
    173     printf("%s: written %zu bytes with %zu blocking\n",
    174            prefix, numBytesWritten, numWriteBlocked);
    175     printf("%s: read %zu bytes with %zu blocking\n",
    176            prefix, numBytesRead, numReadBlocked);
    177     printf("%s: slept %zu ms\n", prefix, numMillisSlept);
     235    printf("%s: written %lu bytes with %lu blocking\n",
     236           prefix,
     237           static_cast<unsigned long>(numBytesWritten),
     238           static_cast<unsigned long>(numWriteBlocked));
     239    printf("%s: read %lu bytes with %lu blocking\n",
     240           prefix,
     241           static_cast<unsigned long>(numBytesRead),
     242           static_cast<unsigned long>(numReadBlocked));
     243    printf("%s: slept %lu ms\n", prefix,
     244           static_cast<unsigned long>(numMillisSlept));
    178245}
    179246
     
    219286    LocalAcceptor& acceptor = serverSocket.getAcceptor();
    220287
     288    bool eventFired = false;
    221289    while(!acceptor.accept()) {
     290        eventFired = event.check();
     291        if (eventFired) break;
    222292        if (acceptor.failed()) {
    223             printf("ServerThread::run: acceptor failed...\n");
     293            printf("ServerThread::run: acceptor failed...: %lu\n",
     294                   (unsigned long)acceptor.getErrorCode());
    224295            return;
    225296        }
     
    228299    }
    229300
     301    if (eventFired) {
     302        printf("ServerThread::run: waiting done, quitting\n");
     303        return;
     304    }
     305
    230306    printf("ServerThread::run: waiting done, received connection\n");
    231307    LocalSocket* socket = acceptor.getSocket();
    232308
    233     communicate(socket->getReadingBuffer(), socket->getWritingBuffer(),
     309    communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(),
    234310                waiter, seed, "ServerThread", &event);
    235311
     
    243319class ClientThread : public Thread
    244320{
     321private:
     322    Waiter waiter;
     323
     324    WaitableEvent event;
     325
    245326public:
     327    ClientThread();
     328
     329    void signal();
     330
    246331    virtual void run();
    247332};
     333
     334//------------------------------------------------------------------------------
     335
     336ClientThread::ClientThread() :
     337    event(&waiter)
     338{
     339}
     340
     341//------------------------------------------------------------------------------
     342
     343void ClientThread::signal()
     344{
     345    event.fire();
     346}
    248347
    249348//------------------------------------------------------------------------------
     
    258357    printf("ClientThread::run: connecting\n");
    259358
    260     Waiter waiter;
    261 
    262359    LocalClientSocket socket("test", &waiter);
    263360
    264     Connector& connector = socket.getConnector();
     361    LocalConnector& connector = socket.getConnector();
    265362
    266363    while(!connector.connect()) {
     
    275372    printf("ClientThread::run: connected\n");
    276373
    277     communicate(socket.getReadingBuffer(), socket.getWritingBuffer(),
    278                 waiter, seed, "ClientThread");
     374    communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(),
     375                waiter, seed, "ClientThread", &event);
    279376}
    280377
     
    284381int main()
    285382{
     383#if TARGET_API_POSIX
    286384    signal(SIGPIPE, SIG_IGN);
     385#endif
    287386
    288387    ServerThread serverThread;
     
    294393
    295394    Thread::sleep(60000);
    296     serverThread.signal();
     395    //Thread::sleep(5000);
     396    printf("Signalling the server thread\n");
     397    //serverThread.signal();
     398    clientThread.signal();
    297399    printf("Signalled the server thread\n");
    298400
Note: See TracChangeset for help on using the changeset viewer.