diff options
Diffstat (limited to 'ssnet.py')
-rw-r--r-- | ssnet.py | 101 |
1 files changed, 84 insertions, 17 deletions
@@ -1,6 +1,12 @@ import struct, socket, errno, select if not globals().get('skip_imports'): from helpers import * + +# these don't exist in the socket module in python 2.3! +SHUT_RD = 0 +SHUT_WR = 1 +SHUT_RDWR = 2 + HDR_LEN = 8 @@ -31,13 +37,30 @@ cmd_to_name = { +def _add(l, elem): + if not elem in l: + l.append(elem) + + +def _fds(l): + out = [] + for i in l: + try: + out.append(i.fileno()) + except AttributeError: + out.append(i) + out.sort() + return out + + def _nb_clean(func, *args): try: return func(*args) except OSError, e: - if e.errno not in (errno.EWOULDBLOCK, errno.EAGAIN, errno.EPIPE): + if e.errno not in (errno.EWOULDBLOCK, errno.EAGAIN): raise else: + debug3('%s: err was: %s\n' % (func.__name__, e)) return None @@ -69,21 +92,30 @@ class SockWrapper: debug1('%r: error was: %r\n' % (self, self.exc)) def __repr__(self): - return 'SW:%s' % (self.peername,) + if self.rsock == self.wsock: + fds = '#%d' % self.rsock.fileno() + else: + fds = '#%d,%d' % (self.rsock.fileno(), self.wsock.fileno()) + return 'SW%s:%s' % (fds, self.peername) def seterr(self, e): if not self.exc: self.exc = e def try_connect(self): + if self.connect_to and self.shut_write: + self.noread() + self.connect_to = None if not self.connect_to: return # already connected self.rsock.setblocking(False) + debug3('%r: trying connect to %r\n' % (self, self.connect_to)) try: self.rsock.connect(self.connect_to) # connected successfully (Linux) self.connect_to = None except socket.error, e: + debug3('%r: connect result: %r\n' % (self, e)) if e.args[0] in [errno.EINPROGRESS, errno.EALREADY]: pass # not connected yet elif e.args[0] == errno.EISCONN: @@ -102,14 +134,14 @@ class SockWrapper: if not self.shut_read: debug2('%r: done reading\n' % self) self.shut_read = True - #self.rsock.shutdown(socket.SHUT_RD) # doesn't do anything anyway + #self.rsock.shutdown(SHUT_RD) # doesn't do anything anyway def nowrite(self): if not self.shut_write: debug2('%r: done writing\n' % self) self.shut_write = True try: - self.wsock.shutdown(socket.SHUT_WR) + self.wsock.shutdown(SHUT_WR) except socket.error, e: self.seterr(e) @@ -159,7 +191,7 @@ class SockWrapper: wrote = outwrap.write(self.buf[0]) self.buf[0] = self.buf[0][wrote:] while self.buf and not self.buf[0]: - self.buf[0:1] = [] + self.buf.pop(0) if not self.buf and self.shut_read: outwrap.nowrite() @@ -167,12 +199,13 @@ class SockWrapper: class Handler: def __init__(self, socks = None, callback = None): self.ok = True - self.socks = set(socks or []) + self.socks = socks or [] if callback: self.callback = callback def pre_select(self, r, w, x): - r |= self.socks + for i in self.socks: + _add(r, i) def callback(self): log('--no callback defined-- %r\n' % self) @@ -181,7 +214,7 @@ class Handler: v = s.recv(4096) if not v: log('--closed-- %r\n' % self) - self.socks = set() + self.socks = [] self.ok = False @@ -194,20 +227,20 @@ class Proxy(Handler): def pre_select(self, r, w, x): if self.wrap1.connect_to: - w.add(self.wrap1.rsock) + _add(w, self.wrap1.rsock) elif self.wrap1.buf: if not self.wrap2.too_full(): - w.add(self.wrap2.wsock) + _add(w, self.wrap2.wsock) elif not self.wrap1.shut_read: - r.add(self.wrap1.rsock) + _add(r, self.wrap1.rsock) if self.wrap2.connect_to: - w.add(self.wrap2.rsock) + _add(w, self.wrap2.rsock) elif self.wrap2.buf: if not self.wrap1.too_full(): - w.add(self.wrap1.wsock) + _add(w, self.wrap1.wsock) elif not self.wrap2.shut_read: - r.add(self.wrap2.rsock) + _add(r, self.wrap2.rsock) def callback(self): self.wrap1.try_connect() @@ -216,6 +249,12 @@ class Proxy(Handler): self.wrap2.fill() self.wrap1.copy_to(self.wrap2) self.wrap2.copy_to(self.wrap1) + if self.wrap1.buf and self.wrap2.shut_write: + self.wrap1.buf = [] + self.wrap1.noread() + if self.wrap2.buf and self.wrap1.shut_write: + self.wrap2.buf = [] + self.wrap2.noread() if (self.wrap1.shut_read and self.wrap2.shut_read and not self.wrap1.buf and not self.wrap2.buf): self.ok = False @@ -247,7 +286,10 @@ class Mux(Handler): return self.chani def amount_queued(self): - return sum(len(b) for b in self.outbuf) + total = 0 + for b in self.outbuf: + total += len(b) + return total def check_fullness(self): if self.fullness > 32768: @@ -346,9 +388,9 @@ class Mux(Handler): break def pre_select(self, r, w, x): - r.add(self.rsock) + _add(r, self.rsock) if self.outbuf: - w.add(self.wsock) + _add(w, self.wsock) def callback(self): (r,w,x) = select.select([self.rsock], [self.wsock], [], 0) @@ -420,3 +462,28 @@ def connect_dst(ip, port): return SockWrapper(outsock, outsock, connect_to = (ip,port), peername = '%s:%d' % (ip,port)) + + +def runonce(handlers, mux): + r = [] + w = [] + x = [] + handlers = filter(lambda s: s.ok, handlers) + for s in handlers: + s.pre_select(r,w,x) + debug2('Waiting: %d r=%r w=%r x=%r (fullness=%d/%d)\n' + % (len(handlers), _fds(r), _fds(w), _fds(x), + mux.fullness, mux.too_full)) + (r,w,x) = select.select(r,w,x) + debug2(' Ready: %d r=%r w=%r x=%r\n' + % (len(handlers), _fds(r), _fds(w), _fds(x))) + ready = r+w+x + did = {} + for h in handlers: + for s in h.socks: + if s in ready: + h.callback() + did[s] = 1 + for s in ready: + if not s in did: + raise Fatal('socket %r was not used by any handler' % s) |