summaryrefslogtreecommitdiffstats
path: root/ssnet.py
diff options
context:
space:
mode:
Diffstat (limited to 'ssnet.py')
-rw-r--r--ssnet.py101
1 files changed, 84 insertions, 17 deletions
diff --git a/ssnet.py b/ssnet.py
index 55efa04..f307629 100644
--- a/ssnet.py
+++ b/ssnet.py
@@ -1,6 +1,12 @@
import struct, socket, errno, select
if not globals().get('skip_imports'):
from helpers import *
+
+# these don't exist in the socket module in python 2.3!
+SHUT_RD = 0
+SHUT_WR = 1
+SHUT_RDWR = 2
+
HDR_LEN = 8
@@ -31,13 +37,30 @@ cmd_to_name = {
+def _add(l, elem):
+ if not elem in l:
+ l.append(elem)
+
+
+def _fds(l):
+ out = []
+ for i in l:
+ try:
+ out.append(i.fileno())
+ except AttributeError:
+ out.append(i)
+ out.sort()
+ return out
+
+
def _nb_clean(func, *args):
try:
return func(*args)
except OSError, e:
- if e.errno not in (errno.EWOULDBLOCK, errno.EAGAIN, errno.EPIPE):
+ if e.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
else:
+ debug3('%s: err was: %s\n' % (func.__name__, e))
return None
@@ -69,21 +92,30 @@ class SockWrapper:
debug1('%r: error was: %r\n' % (self, self.exc))
def __repr__(self):
- return 'SW:%s' % (self.peername,)
+ if self.rsock == self.wsock:
+ fds = '#%d' % self.rsock.fileno()
+ else:
+ fds = '#%d,%d' % (self.rsock.fileno(), self.wsock.fileno())
+ return 'SW%s:%s' % (fds, self.peername)
def seterr(self, e):
if not self.exc:
self.exc = e
def try_connect(self):
+ if self.connect_to and self.shut_write:
+ self.noread()
+ self.connect_to = None
if not self.connect_to:
return # already connected
self.rsock.setblocking(False)
+ debug3('%r: trying connect to %r\n' % (self, self.connect_to))
try:
self.rsock.connect(self.connect_to)
# connected successfully (Linux)
self.connect_to = None
except socket.error, e:
+ debug3('%r: connect result: %r\n' % (self, e))
if e.args[0] in [errno.EINPROGRESS, errno.EALREADY]:
pass # not connected yet
elif e.args[0] == errno.EISCONN:
@@ -102,14 +134,14 @@ class SockWrapper:
if not self.shut_read:
debug2('%r: done reading\n' % self)
self.shut_read = True
- #self.rsock.shutdown(socket.SHUT_RD) # doesn't do anything anyway
+ #self.rsock.shutdown(SHUT_RD) # doesn't do anything anyway
def nowrite(self):
if not self.shut_write:
debug2('%r: done writing\n' % self)
self.shut_write = True
try:
- self.wsock.shutdown(socket.SHUT_WR)
+ self.wsock.shutdown(SHUT_WR)
except socket.error, e:
self.seterr(e)
@@ -159,7 +191,7 @@ class SockWrapper:
wrote = outwrap.write(self.buf[0])
self.buf[0] = self.buf[0][wrote:]
while self.buf and not self.buf[0]:
- self.buf[0:1] = []
+ self.buf.pop(0)
if not self.buf and self.shut_read:
outwrap.nowrite()
@@ -167,12 +199,13 @@ class SockWrapper:
class Handler:
def __init__(self, socks = None, callback = None):
self.ok = True
- self.socks = set(socks or [])
+ self.socks = socks or []
if callback:
self.callback = callback
def pre_select(self, r, w, x):
- r |= self.socks
+ for i in self.socks:
+ _add(r, i)
def callback(self):
log('--no callback defined-- %r\n' % self)
@@ -181,7 +214,7 @@ class Handler:
v = s.recv(4096)
if not v:
log('--closed-- %r\n' % self)
- self.socks = set()
+ self.socks = []
self.ok = False
@@ -194,20 +227,20 @@ class Proxy(Handler):
def pre_select(self, r, w, x):
if self.wrap1.connect_to:
- w.add(self.wrap1.rsock)
+ _add(w, self.wrap1.rsock)
elif self.wrap1.buf:
if not self.wrap2.too_full():
- w.add(self.wrap2.wsock)
+ _add(w, self.wrap2.wsock)
elif not self.wrap1.shut_read:
- r.add(self.wrap1.rsock)
+ _add(r, self.wrap1.rsock)
if self.wrap2.connect_to:
- w.add(self.wrap2.rsock)
+ _add(w, self.wrap2.rsock)
elif self.wrap2.buf:
if not self.wrap1.too_full():
- w.add(self.wrap1.wsock)
+ _add(w, self.wrap1.wsock)
elif not self.wrap2.shut_read:
- r.add(self.wrap2.rsock)
+ _add(r, self.wrap2.rsock)
def callback(self):
self.wrap1.try_connect()
@@ -216,6 +249,12 @@ class Proxy(Handler):
self.wrap2.fill()
self.wrap1.copy_to(self.wrap2)
self.wrap2.copy_to(self.wrap1)
+ if self.wrap1.buf and self.wrap2.shut_write:
+ self.wrap1.buf = []
+ self.wrap1.noread()
+ if self.wrap2.buf and self.wrap1.shut_write:
+ self.wrap2.buf = []
+ self.wrap2.noread()
if (self.wrap1.shut_read and self.wrap2.shut_read and
not self.wrap1.buf and not self.wrap2.buf):
self.ok = False
@@ -247,7 +286,10 @@ class Mux(Handler):
return self.chani
def amount_queued(self):
- return sum(len(b) for b in self.outbuf)
+ total = 0
+ for b in self.outbuf:
+ total += len(b)
+ return total
def check_fullness(self):
if self.fullness > 32768:
@@ -346,9 +388,9 @@ class Mux(Handler):
break
def pre_select(self, r, w, x):
- r.add(self.rsock)
+ _add(r, self.rsock)
if self.outbuf:
- w.add(self.wsock)
+ _add(w, self.wsock)
def callback(self):
(r,w,x) = select.select([self.rsock], [self.wsock], [], 0)
@@ -420,3 +462,28 @@ def connect_dst(ip, port):
return SockWrapper(outsock, outsock,
connect_to = (ip,port),
peername = '%s:%d' % (ip,port))
+
+
+def runonce(handlers, mux):
+ r = []
+ w = []
+ x = []
+ handlers = filter(lambda s: s.ok, handlers)
+ for s in handlers:
+ s.pre_select(r,w,x)
+ debug2('Waiting: %d r=%r w=%r x=%r (fullness=%d/%d)\n'
+ % (len(handlers), _fds(r), _fds(w), _fds(x),
+ mux.fullness, mux.too_full))
+ (r,w,x) = select.select(r,w,x)
+ debug2(' Ready: %d r=%r w=%r x=%r\n'
+ % (len(handlers), _fds(r), _fds(w), _fds(x)))
+ ready = r+w+x
+ did = {}
+ for h in handlers:
+ for s in h.socks:
+ if s in ready:
+ h.callback()
+ did[s] = 1
+ for s in ready:
+ if not s in did:
+ raise Fatal('socket %r was not used by any handler' % s)