Very Simple Cross-Platform Library
BlockingStream.cc
1 // Copyright (c) 2013 by István Váradi
2 
3 // This file is part of VSCPL, a simple cross-platform utility library
4 
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
7 
8 // 1. Redistributions of source code must retain the above copyright notice, this
9 // list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright notice,
11 // this list of conditions and the following disclaimer in the documentation
12 // and/or other materials provided with the distribution.
13 
14 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18 // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 
25 // The views and conclusions contained in the software and documentation are those
26 // of the authors and should not be interpreted as representing official policies,
27 // either expressed or implied, of the FreeBSD Project.
28 
29 //------------------------------------------------------------------------------
30 
31 #include "BlockingStream.h"
32 
33 #include "ReadingBuffer.h"
34 #include "WritingBuffer.h"
35 #include "Waiter.h"
36 
37 #include <cstring>
38 #include <cstdio>
39 
40 //------------------------------------------------------------------------------
41 
43 
44 using std::min;
45 
46 //------------------------------------------------------------------------------
47 
48 inline bool BlockingStream::checkInterrupted()
49 {
50  if (!interrupted) {
51  interrupted = event.check();
52  if (!interrupted && event.failed()) {
54  }
55  }
56 
57  return interrupted;
58 }
59 
60 //------------------------------------------------------------------------------
61 
62 bool BlockingStream::read(void* dest, size_t length)
63 {
64  unsigned char* d = reinterpret_cast<unsigned char*>(dest);
65 
66  ReadingBuffer& readingBuffer = stream.getReadingBuffer();
67  while (length>0) {
68  size_t copied = readingBuffer.extract(d, length, readingOffset);
69  readingOffset += copied;
70  length -= copied;
71  d += copied;
72 
73  if (length!=0) {
74  if (!fill()) return false;
75  }
76  }
77 
78  return true;
79 }
80 
81 //------------------------------------------------------------------------------
82 
83 bool BlockingStream::skip(size_t length)
84 {
85  ReadingBuffer& readingBuffer = stream.getReadingBuffer();
86  while (length>0) {
87  size_t toSkip = min(length, readingBuffer.getLength() - readingOffset);
88  readingOffset += toSkip;
89  length -= toSkip;
90 
91  if (length!=0) {
92  if (!fill()) return false;
93  }
94  }
95 
96  return true;
97 }
98 
99 //------------------------------------------------------------------------------
100 
101 bool BlockingStream::write(const void* src, size_t length)
102 {
103  const unsigned char* s = reinterpret_cast<const unsigned char*>(src);
104 
105  WritingBuffer& writingBuffer = stream.getWritingBuffer();
106  while(length>0) {
107  size_t copied = writingBuffer.append(s, length);
108  length -= copied;
109  s += copied;
110 
111  if (length==0) break;
112  if (!flush()) return false;
113  }
114 
115  return true;
116 }
117 
118 //------------------------------------------------------------------------------
119 
121 {
122  WritingBuffer& writingBuffer = stream.getWritingBuffer();
123  while (*this) {
124  if (checkInterrupted()) return false;
125 
126  if (writingBuffer.write()) {
127  return true;
128  } else if (writingBuffer.failed()) {
129  setErrorCode(writingBuffer.getErrorCode());
130  } else {
131  Waiter* waiter = stream.getWaiter();
132  waiter->wait();
133  if (waiter->failed()) {
134  setErrorCode(waiter->getErrorCode());
135  }
136  }
137  }
138  return false;
139 }
140 
141 //------------------------------------------------------------------------------
142 
144 {
145  ReadingBuffer& readingBuffer = stream.getReadingBuffer();
146 
147  readingBuffer.reset();
148  readingOffset = 0;
149  while (*this) {
150  if (checkInterrupted()) return false;
151 
152  if (readingBuffer.read()) {
153  eof = readingBuffer.isEmpty();
154  return !eof;
155  } else if (readingBuffer.failed()) {
156  setErrorCode(readingBuffer.getErrorCode());
157  } else {
158  Waiter* waiter = stream.getWaiter();
159  waiter->wait();
160  if (waiter->failed()) {
161  setErrorCode(waiter->getErrorCode());
162  return false;
163  }
164  }
165  }
166  return false;
167 }
168 
169 //------------------------------------------------------------------------------
170 
171 // Local Variables:
172 // mode: C++
173 // c-basic-offset: 4
174 // indent-tabs-mode: nil
175 // End:
bool write(const void *src, size_t length)
bool read(void *dest, size_t length)
size_t extract(void *dest, size_t size, size_t offset=0) const
Definition: Buffer.h:223
size_t append(const void *src, size_t size)
Definition: Buffer.h:213
errorCode_t getErrorCode() const
Definition: Failable.h:129
void setErrorCode(errorCode_t ec)
Definition: Failable.h:143