summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Mazieres <dm@uun.org>2014-02-01 21:53:17 -0800
committerDavid Mazieres <dm@uun.org>2014-02-01 21:53:17 -0800
commitee173ba581dd31e0f3915bccaa9436b507c47e42 (patch)
treef08a27ac36362220d3dfd251b3b5c4516377eed0
parent653595169eee8c07da396d66f077806d3ac33163 (diff)
use lock guards
-rw-r--r--infinibuf.cc95
1 files changed, 40 insertions, 55 deletions
diff --git a/infinibuf.cc b/infinibuf.cc
index 70f95b7..0c238db 100644
--- a/infinibuf.cc
+++ b/infinibuf.cc
@@ -26,8 +26,7 @@ protected:
int errno_{0};
const int startpos_; // For putback
- long icount=0;
- long ocount=0;
+ virtual void notempty() {}
public:
explicit infinibuf(int sp = default_startpos_)
@@ -38,12 +37,7 @@ public:
virtual ~infinibuf() { for (char *p : data_) delete[] p; }
infinibuf &operator= (const infinibuf &) = delete;
- // These functions are not thread safe
- size_t size() {
- return data_.size() <= 1 ? ppos_ - gpos_
- : ((chunksize_ - gpos_) + (ppos_ - startpos_)
- + chunksize_ * (data_.size() - 2));
- }
+ // These functions are never thread safe
bool empty() { return data_.size() == 1 && gpos_ == ppos_; }
bool eof() { return eof_; }
int err() { return errno_; }
@@ -54,6 +48,7 @@ public:
int gsize() { return (data_.size() > 1 ? chunksize_ : ppos_) - gpos_; }
char *egptr() { return gptr() + gsize(); }
void gbump(int n);
+ virtual void gwait() {}
char *pbase() { return data_.back(); }
char *pptr() { return pbase() + ppos_; }
@@ -62,13 +57,14 @@ public:
void pbump(int n);
void peof() { eof_ = true; if (empty()) notempty(); }
- // These functions may be thread safe in some derived classes
+ // These functions are thread safe for some subtypes
virtual void lock() {}
virtual void unlock() {}
- virtual void notempty() {}
- virtual void gwait() {}
bool output(int fd);
bool input(int fd);
+
+ static void output_loop(infinibuf *ib, int fd);
+ static void input_loop(infinibuf *ib, int fd);
};
class infinibuf_infd : public infinibuf {
@@ -107,7 +103,6 @@ public:
void
infinibuf::gbump(int n)
{
- icount += n;
gpos_ += n;
assert (gpos_ > 0 && gpos_ <= chunksize_);
if (gpos_ == chunksize_) {
@@ -126,7 +121,6 @@ infinibuf::pbump(int n)
assert (!eof_);
bool wasempty (empty());
ppos_ += n;
- ocount += n;
if (ppos_ == chunksize_) {
char *chunk = new char[chunksize_];
memcpy(chunk, data_.back() + chunksize_ - startpos_, startpos_);
@@ -140,14 +134,12 @@ infinibuf::pbump(int n)
bool
infinibuf::output(int fd)
{
+ unique_lock<infinibuf> lk (*this);
for (;;) {
- lock();
char *p = gptr();
size_t nmax = gsize();
bool iseof = eof();
int error = err();
- unlock();
-
if (error)
throw runtime_error (string("infinibuf::output: ") + strerror(error));
else if (!nmax && iseof) {
@@ -157,18 +149,17 @@ infinibuf::output(int fd)
}
if (!nmax)
return true;
+
+ lk.unlock();
ssize_t n = write(fd, p, nmax);
- if (n > 0) {
- lock();
+ lk.lock();
+
+ if (n > 0)
gbump(n);
- unlock();
- }
else {
if (errno == EAGAIN)
return true;
- lock();
err(error = errno);
- unlock();
}
}
}
@@ -176,33 +167,47 @@ infinibuf::output(int fd)
bool
infinibuf::input (int fd)
{
- lock();
+ unique_lock<infinibuf> lk (*this);
char *p = pptr();
size_t nmax = psize();
int error = err();
- unlock();
-
if (error)
throw runtime_error (string("infinibuf::input: ") + strerror(error));
+
+ lk.unlock();
ssize_t n = read(fd, p, nmax);
+ lk.lock();
+
if (n < 0) {
if (errno == EAGAIN)
return true;
- lock();
err(errno);
- unlock();
throw runtime_error (string("infinibuf::input: ") + strerror(errno));
}
- lock();
if (n > 0)
pbump(n);
else
peof();
- unlock();
return n > 0;
}
+void
+infinibuf::output_loop (infinibuf *ib, int fd)
+{
+ while (ib->output(fd)) {
+ lock_guard<infinibuf> _lk (*ib);
+ ib->gwait();
+ }
+}
+
+void
+infinibuf::input_loop (infinibuf *ib, int fd)
+{
+ while (ib->input(fd))
+ ;
+}
+
class infinistreambuf : public streambuf {
protected:
infinibuf *ib_;
@@ -217,13 +222,12 @@ public:
infinistreambuf::int_type
infinistreambuf::underflow()
{
- ib_->lock();
+ lock_guard<infinibuf> _lk (*ib_);
ib_->gbump(gptr() - ib_->gptr());
while (ib_->gsize() == 0 && !ib_->eof())
ib_->gwait();
setg(ib_->eback(), ib_->gptr(), ib_->egptr());
bool eof = ib_->eof() && ib_->gsize() == 0;
- ib_->unlock();
return eof ? traits_type::eof() : traits_type::to_int_type (*gptr());
}
@@ -240,54 +244,35 @@ infinistreambuf::overflow(int_type ch)
int
infinistreambuf::sync()
{
- ib_->lock();
+ lock_guard<infinibuf> _lk (*ib_);
ib_->pbump(pptr() - ib_->pptr());
setp(ib_->pptr(), ib_->epptr());
int err = ib_->err();
- ib_->unlock();
return err ? -1 : 0;
}
infinistreambuf::infinistreambuf (infinibuf *ib)
: ib_(ib)
{
- ib->lock();
+ lock_guard<infinibuf> _lk (*ib_);
setg(ib_->eback(), ib_->gptr(), ib_->egptr());
setp(ib_->pptr(), ib_->epptr());
- ib->unlock();
}
-#if 1
-void
-reader(infinibuf_mt *_ib, int fd)
-{
- while (_ib->input(fd))
- ;
-}
-
-void
-writer(infinibuf_mt *_ib, int fd)
-{
- while (_ib->output(fd)) {
- _ib->lock();
- _ib->gwait();
- _ib->unlock();
- }
-}
-
+#if 0
int
main (int argc, char **argv)
{
infinibuf_mt iib;
infinistreambuf inb (&iib);
istream xin (&inb);
- thread it (reader, &iib, 0);
+ thread it (infinibuf::input_loop, &iib, 0);
infinibuf_mt oib;
infinistreambuf outb (&oib);
ostream xout (&outb);
- thread ot (writer, &oib, 1);
+ thread ot (infinibuf::output_loop, &oib, 1);
xin.tie (&xout);
#if 0