diff options
author | David Mazieres <dm@uun.org> | 2014-02-01 21:53:17 -0800 |
---|---|---|
committer | David Mazieres <dm@uun.org> | 2014-02-01 21:53:17 -0800 |
commit | ee173ba581dd31e0f3915bccaa9436b507c47e42 (patch) | |
tree | f08a27ac36362220d3dfd251b3b5c4516377eed0 | |
parent | 653595169eee8c07da396d66f077806d3ac33163 (diff) |
use lock guards
-rw-r--r-- | infinibuf.cc | 95 |
1 files changed, 40 insertions, 55 deletions
diff --git a/infinibuf.cc b/infinibuf.cc index 70f95b7..0c238db 100644 --- a/infinibuf.cc +++ b/infinibuf.cc @@ -26,8 +26,7 @@ protected: int errno_{0}; const int startpos_; // For putback - long icount=0; - long ocount=0; + virtual void notempty() {} public: explicit infinibuf(int sp = default_startpos_) @@ -38,12 +37,7 @@ public: virtual ~infinibuf() { for (char *p : data_) delete[] p; } infinibuf &operator= (const infinibuf &) = delete; - // These functions are not thread safe - size_t size() { - return data_.size() <= 1 ? ppos_ - gpos_ - : ((chunksize_ - gpos_) + (ppos_ - startpos_) - + chunksize_ * (data_.size() - 2)); - } + // These functions are never thread safe bool empty() { return data_.size() == 1 && gpos_ == ppos_; } bool eof() { return eof_; } int err() { return errno_; } @@ -54,6 +48,7 @@ public: int gsize() { return (data_.size() > 1 ? chunksize_ : ppos_) - gpos_; } char *egptr() { return gptr() + gsize(); } void gbump(int n); + virtual void gwait() {} char *pbase() { return data_.back(); } char *pptr() { return pbase() + ppos_; } @@ -62,13 +57,14 @@ public: void pbump(int n); void peof() { eof_ = true; if (empty()) notempty(); } - // These functions may be thread safe in some derived classes + // These functions are thread safe for some subtypes virtual void lock() {} virtual void unlock() {} - virtual void notempty() {} - virtual void gwait() {} bool output(int fd); bool input(int fd); + + static void output_loop(infinibuf *ib, int fd); + static void input_loop(infinibuf *ib, int fd); }; class infinibuf_infd : public infinibuf { @@ -107,7 +103,6 @@ public: void infinibuf::gbump(int n) { - icount += n; gpos_ += n; assert (gpos_ > 0 && gpos_ <= chunksize_); if (gpos_ == chunksize_) { @@ -126,7 +121,6 @@ infinibuf::pbump(int n) assert (!eof_); bool wasempty (empty()); ppos_ += n; - ocount += n; if (ppos_ == chunksize_) { char *chunk = new char[chunksize_]; memcpy(chunk, data_.back() + chunksize_ - startpos_, startpos_); @@ -140,14 +134,12 @@ infinibuf::pbump(int n) bool infinibuf::output(int fd) { + unique_lock<infinibuf> lk (*this); for (;;) { - lock(); char *p = gptr(); size_t nmax = gsize(); bool iseof = eof(); int error = err(); - unlock(); - if (error) throw runtime_error (string("infinibuf::output: ") + strerror(error)); else if (!nmax && iseof) { @@ -157,18 +149,17 @@ infinibuf::output(int fd) } if (!nmax) return true; + + lk.unlock(); ssize_t n = write(fd, p, nmax); - if (n > 0) { - lock(); + lk.lock(); + + if (n > 0) gbump(n); - unlock(); - } else { if (errno == EAGAIN) return true; - lock(); err(error = errno); - unlock(); } } } @@ -176,33 +167,47 @@ infinibuf::output(int fd) bool infinibuf::input (int fd) { - lock(); + unique_lock<infinibuf> lk (*this); char *p = pptr(); size_t nmax = psize(); int error = err(); - unlock(); - if (error) throw runtime_error (string("infinibuf::input: ") + strerror(error)); + + lk.unlock(); ssize_t n = read(fd, p, nmax); + lk.lock(); + if (n < 0) { if (errno == EAGAIN) return true; - lock(); err(errno); - unlock(); throw runtime_error (string("infinibuf::input: ") + strerror(errno)); } - lock(); if (n > 0) pbump(n); else peof(); - unlock(); return n > 0; } +void +infinibuf::output_loop (infinibuf *ib, int fd) +{ + while (ib->output(fd)) { + lock_guard<infinibuf> _lk (*ib); + ib->gwait(); + } +} + +void +infinibuf::input_loop (infinibuf *ib, int fd) +{ + while (ib->input(fd)) + ; +} + class infinistreambuf : public streambuf { protected: infinibuf *ib_; @@ -217,13 +222,12 @@ public: infinistreambuf::int_type infinistreambuf::underflow() { - ib_->lock(); + lock_guard<infinibuf> _lk (*ib_); ib_->gbump(gptr() - ib_->gptr()); while (ib_->gsize() == 0 && !ib_->eof()) ib_->gwait(); setg(ib_->eback(), ib_->gptr(), ib_->egptr()); bool eof = ib_->eof() && ib_->gsize() == 0; - ib_->unlock(); return eof ? traits_type::eof() : traits_type::to_int_type (*gptr()); } @@ -240,54 +244,35 @@ infinistreambuf::overflow(int_type ch) int infinistreambuf::sync() { - ib_->lock(); + lock_guard<infinibuf> _lk (*ib_); ib_->pbump(pptr() - ib_->pptr()); setp(ib_->pptr(), ib_->epptr()); int err = ib_->err(); - ib_->unlock(); return err ? -1 : 0; } infinistreambuf::infinistreambuf (infinibuf *ib) : ib_(ib) { - ib->lock(); + lock_guard<infinibuf> _lk (*ib_); setg(ib_->eback(), ib_->gptr(), ib_->egptr()); setp(ib_->pptr(), ib_->epptr()); - ib->unlock(); } -#if 1 -void -reader(infinibuf_mt *_ib, int fd) -{ - while (_ib->input(fd)) - ; -} - -void -writer(infinibuf_mt *_ib, int fd) -{ - while (_ib->output(fd)) { - _ib->lock(); - _ib->gwait(); - _ib->unlock(); - } -} - +#if 0 int main (int argc, char **argv) { infinibuf_mt iib; infinistreambuf inb (&iib); istream xin (&inb); - thread it (reader, &iib, 0); + thread it (infinibuf::input_loop, &iib, 0); infinibuf_mt oib; infinistreambuf outb (&oib); ostream xout (&outb); - thread ot (writer, &oib, 1); + thread ot (infinibuf::output_loop, &oib, 1); xin.tie (&xout); #if 0 |