#!/usr/bin/env python # # Server that will accept connections from a Vim channel. # Used by test_channel.vim. # # This requires Python 2.6 or later. from __future__ import print_function import json import socket import sys import time import threading try: # Python 3 import socketserver except ImportError: # Python 2 import SocketServer as socketserver class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): def setup(self): self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) def handle(self): print("=== socket opened ===") while True: try: received = self.request.recv(4096).decode('utf-8') except socket.error: print("=== socket error ===") break except IOError: print("=== socket closed ===") break if received == '': print("=== socket closed ===") break print("received: {0}".format(received)) # We may receive two messages at once. Take the part up to the # newline, which should be after the matching "]". todo = received while todo != '': splitidx = todo.find('\n') if splitidx < 0: used = todo todo = '' else: used = todo[:splitidx] todo = todo[splitidx + 1:] if used != received: print("using: {0}".format(used)) try: decoded = json.loads(used) except ValueError: print("json decoding failed") decoded = [-1, ''] # Send a response if the sequence number is positive. if decoded[0] >= 0: if decoded[1] == 'hello!': # simply send back a string response = "got it" elif decoded[1] == 'malformed1': cmd = '["ex",":"]wrong!["ex","smi"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" # Need to wait for Vim to give up, otherwise it # sometimes fails on OS X. time.sleep(0.2) elif decoded[1] == 'malformed2': cmd = '"unterminated string' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" # Need to wait for Vim to give up, otherwise the double # quote in the "ok" response terminates the string. time.sleep(0.2) elif decoded[1] == 'malformed3': cmd = '["ex","missing ]"' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" # Need to wait for Vim to give up, otherwise the ] # in the "ok" response terminates the list. time.sleep(0.2) elif decoded[1] == 'split': cmd = '["ex","let ' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) time.sleep(0.01) cmd = 'g:split = 123"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1].startswith("echo "): # send back the argument response = decoded[1][5:] elif decoded[1] == 'make change': # Send two ex commands at the same time, before # replying to the request. cmd = '["ex","call append(\\"$\\",\\"added1\\")"]' cmd += '["ex","call append(\\"$\\",\\"added2\\")"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'echoerr': cmd = '["ex","echoerr \\\"this is an error\\\""]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'bad command': cmd = '["ex","foo bar"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'do normal': # Send a normal command. cmd = '["normal","G$s more\u001b"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-works': # Send an eval request. We ignore the response. cmd = '["expr","\\"foo\\" . 123", -1]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-special': # Send an eval request. We ignore the response. cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-getline': # Send an eval request. We ignore the response. cmd = '["expr","getline(3)", -3]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-fails': # Send an eval request that will fail. cmd = '["expr","xxx", -4]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-error': # Send an eval request that works but the result can't # be encoded. cmd = '["expr","function(\\"tr\\")", -5]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-bad': # Send an eval request missing the third argument. cmd = '["expr","xxx"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'an expr': # Send an expr request. cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'call-func': cmd = '["call","MyFunction",[1,2,3], 0]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'redraw': cmd = '["redraw",""]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'redraw!': cmd = '["redraw","force"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'empty-request': cmd = '[]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'eval-result': # Send back the last received eval result. response = last_eval elif decoded[1] == 'call me': cmd = '[0,"we called you"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" elif decoded[1] == 'call me again': cmd = '[0,"we did call you"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "" elif decoded[1] == 'send zero': cmd = '[0,"zero index"]' print("sending: {0}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "sent zero" elif decoded[1] == 'close me': print("closing") self.request.close() response = "" elif decoded[1] == 'wait a bit': time.sleep(0.2) response = "waited" elif decoded[1] == '!quit!': # we're done self.server.shutdown() return elif decoded[1] == '!crash!': # Crash! 42 / 0 else: response = "what?" if response == "": print("no response") else: encoded = json.dumps([decoded[0], response]) print("sending: {0}".format(encoded)) self.request.sendall(encoded.encode('utf-8')) # Negative numbers are used for "eval" responses. elif decoded[0] < 0: last_eval = decoded class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass def writePortInFile(port): # Write the port number in Xportnr, so that the test knows it. f = open("Xportnr", "w") f.write("{0}".format(port)) f.close() def main(host, port, server_class=ThreadedTCPServer): # Wait half a second before opening the port to test waittime in ch_open(). # We do want to get the port number, get that first. We cannot open the # socket, guess a port is free. if len(sys.argv) >= 2 and sys.argv[1] == 'delay': port = 13684 writePortInFile(port) print("Wait for it...") time.sleep(0.5) server = server_class((host, port), ThreadedTCPRequestHandler) ip, port = server.server_address[0:2] # Start a thread with the server. That thread will then start a new thread # for each connection. server_thread = threading.Thread(target=server.serve_forever) server_thread.start() writePortInFile(port) print("Listening on port {0}".format(port)) # Main thread terminates, but the server continues running # until server.shutdown() is called. try: while server_thread.is_alive(): server_thread.join(1) except (KeyboardInterrupt, SystemExit): server.shutdown() if __name__ == "__main__": main("localhost", 0)