From 3b5f929b18492fec291d1ec95a91f54e5912c03b Mon Sep 17 00:00:00 2001 From: Bram Moolenaar Date: Thu, 28 Jan 2016 22:37:01 +0100 Subject: patch 7.4.1191 Problem: The channel feature isn't working yet. Solution: Add the connect(), disconnect(), sendexpr() and sendraw() functions. Add initial documentation. Add a demo server. --- src/channel.c | 284 +++++++++++++++++++++++++++++++++++++++++++------- src/eval.c | 243 +++++++++++++++++++++++++++++++++++++++++- src/proto/channel.pro | 9 +- src/proto/eval.pro | 1 + src/version.c | 2 + 5 files changed, 501 insertions(+), 38 deletions(-) (limited to 'src') diff --git a/src/channel.c b/src/channel.c index fc738c522d..952123c539 100644 --- a/src/channel.c +++ b/src/channel.c @@ -77,11 +77,11 @@ struct readqueue typedef struct readqueue queue_T; typedef struct { - sock_T ch_fd; /* the socket, -1 for a closed channel */ - int ch_idx; /* used by channel_poll_setup() */ - queue_T ch_head; /* dummy node, header for circular queue */ + sock_T ch_fd; /* the socket, -1 for a closed channel */ + int ch_idx; /* used by channel_poll_setup() */ + queue_T ch_head; /* dummy node, header for circular queue */ - int ch_error; /* When TRUE an error was reported. Avoids giving + int ch_error; /* When TRUE an error was reported. Avoids giving * pages full of error messages when the other side * has exited, only mention the first error until the * connection works again. */ @@ -89,13 +89,19 @@ typedef struct { XtInputId ch_inputHandler; /* Cookie for input */ #endif #ifdef FEAT_GUI_GTK - gint ch_inputHandler; /* Cookie for input */ + gint ch_inputHandler; /* Cookie for input */ #endif #ifdef FEAT_GUI_W32 - int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */ + int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */ #endif - void (*ch_close_cb)(void); /* callback invoked when channel is closed */ + void (*ch_close_cb)(void); /* callback for when channel is closed */ + + char_u *ch_callback; /* function to call when a msg is not handled */ + char_u *ch_req_callback; /* function to call for current request */ + int ch_will_block; /* do not use callback right now */ + + int ch_json_mode; } channel_T; /* @@ -190,7 +196,7 @@ channel_gui_register(int idx) channel->ch_inputHandler = XtAppAddInput((XtAppContext)app_context, channel->ch_fd, (XtPointer)(XtInputReadMask + XtInputExceptMask), - messageFromNetbeans, (XtPointer)idx); + messageFromNetbeans, (XtPointer)(long)idx); # else # ifdef FEAT_GUI_GTK /* @@ -382,13 +388,153 @@ channel_open(char *hostname, int port_in, void (*close_cb)(void)) return idx; } +/* + * Set the json mode of channel "idx" to TRUE or FALSE. + */ + void +channel_set_json_mode(int idx, int json_mode) +{ + channels[idx].ch_json_mode = json_mode; +} + +/* + * Set the callback for channel "idx". + */ + void +channel_set_callback(int idx, char_u *callback) +{ + vim_free(channels[idx].ch_callback); + channels[idx].ch_callback = vim_strsave(callback); +} + +/* + * Set the callback for channel "idx" for the next response. + */ + void +channel_set_req_callback(int idx, char_u *callback) +{ + vim_free(channels[idx].ch_req_callback); + channels[idx].ch_req_callback = callback == NULL + ? NULL : vim_strsave(callback); +} + +/* + * Set the flag that the callback for channel "idx" should not be used now. + */ + void +channel_will_block(int idx) +{ + channels[idx].ch_will_block = TRUE; +} + +/* + * Decode JSON "msg", which must have the form "[nr, expr]". + * Put "expr" in "tv". + * Return OK or FAIL. + */ + int +channel_decode_json(char_u *msg, typval_T *tv) +{ + js_read_T reader; + typval_T listtv; + + reader.js_buf = msg; + reader.js_eof = TRUE; + reader.js_used = 0; + json_decode(&reader, &listtv); + /* TODO: use the sequence number */ + if (listtv.v_type == VAR_LIST + && listtv.vval.v_list->lv_len == 2 + && listtv.vval.v_list->lv_first->li_tv.v_type == VAR_NUMBER) + { + /* Move the item from the list and then change the type to avoid the + * item being freed. */ + *tv = listtv.vval.v_list->lv_last->li_tv; + listtv.vval.v_list->lv_last->li_tv.v_type = VAR_NUMBER; + list_unref(listtv.vval.v_list); + return OK; + } + + /* give error message? */ + clear_tv(&listtv); + return FAIL; +} + +/* + * Invoke the "callback" on channel "idx". + */ + static void +invoke_callback(int idx, char_u *callback) +{ + typval_T argv[3]; + typval_T rettv; + int dummy; + char_u *msg; + int ret = OK; + + argv[0].v_type = VAR_NUMBER; + argv[0].vval.v_number = idx; + + /* Concatenate everything into one buffer. + * TODO: only read what the callback will use. + * TODO: avoid multiple allocations. */ + while (channel_collapse(idx) == OK) + ; + msg = channel_get(idx); + + if (channels[idx].ch_json_mode) + ret = channel_decode_json(msg, &argv[1]); + else + { + argv[1].v_type = VAR_STRING; + argv[1].vval.v_string = msg; + } + + if (ret == OK) + { + call_func(callback, (int)STRLEN(callback), + &rettv, 2, argv, 0L, 0L, &dummy, TRUE, NULL); + /* If an echo command was used the cursor needs to be put back where + * it belongs. */ + setcursor(); + cursor_on(); + out_flush(); + } + vim_free(msg); +} + +/* + * Invoke a callback for channel "idx" if needed. + */ + static void +may_invoke_callback(int idx) +{ + if (channels[idx].ch_will_block) + return; + if (channel_peek(idx) == NULL) + return; + + if (channels[idx].ch_req_callback != NULL) + { + /* invoke the one-time callback */ + invoke_callback(idx, channels[idx].ch_req_callback); + channels[idx].ch_req_callback = NULL; + return; + } + + if (channels[idx].ch_callback != NULL) + /* invoke the channel callback */ + invoke_callback(idx, channels[idx].ch_callback); +} + /* * Return TRUE when channel "idx" is open. + * Also returns FALSE or invalid "idx". */ int channel_is_open(int idx) { - return channels[idx].ch_fd >= 0; + return idx >= 0 && idx < channel_count && channels[idx].ch_fd >= 0; } /* @@ -407,6 +553,8 @@ channel_close(int idx) #ifdef FEAT_GUI channel_gui_unregister(idx); #endif + vim_free(channel->ch_callback); + channel->ch_callback = NULL; } } @@ -551,22 +699,64 @@ channel_clear(int idx) #define MAXMSGSIZE 4096 /* - * Read from channel "idx". The data is put in the read queue. + * Check for reading from "fd" with "timeout" msec. + * Return FAIL when there is nothing to read. */ - void -channel_read(int idx) + static int +channel_wait(int fd, int timeout) { - static char_u *buf = NULL; - int len = 0; - int readlen = 0; #ifdef HAVE_SELECT struct timeval tval; fd_set rfds; + int ret; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + tval.tv_sec = timeout / 1000; + tval.tv_usec = (timeout % 1000) * 1000; + for (;;) + { + ret = select(fd + 1, &rfds, NULL, NULL, &tval); +# ifdef EINTR + if (ret == -1 && errno == EINTR) + continue; +# endif + if (ret <= 0) + return FAIL; + break; + } #else -# ifdef HAVE_POLL struct pollfd fds; -# endif + + fds.fd = fd; + fds.events = POLLIN; + if (poll(&fds, 1, timeout) <= 0) + return FAIL; #endif + return OK; +} + +/* + * Return a unique ID to be used in a message. + */ + int +channel_get_id() +{ + static int next_id = 1; + + return next_id++; +} + +/* + * Read from channel "idx" for as long as there is something to read. + * The data is put in the read queue. + */ + void +channel_read(int idx) +{ + static char_u *buf = NULL; + int len = 0; + int readlen = 0; channel_T *channel = &channels[idx]; if (channel->ch_fd < 0) @@ -588,21 +778,8 @@ channel_read(int idx) * MAXMSGSIZE long. */ for (;;) { -#ifdef HAVE_SELECT - FD_ZERO(&rfds); - FD_SET(channel->ch_fd, &rfds); - tval.tv_sec = 0; - tval.tv_usec = 0; - if (select(channel->ch_fd + 1, &rfds, NULL, NULL, &tval) <= 0) + if (channel_wait(channel->ch_fd, 0) == FAIL) break; -#else -# ifdef HAVE_POLL - fds.fd = channel->ch_fd; - fds.events = POLLIN; - if (poll(&fds, 1, 0) <= 0) - break; -# endif -#endif len = sock_read(channel->ch_fd, buf, MAXMSGSIZE); if (len <= 0) break; /* error or nothing more to read */ @@ -641,12 +818,44 @@ channel_read(int idx) } } + may_invoke_callback(idx); + #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK) if (CH_HAS_GUI && gtk_main_level() > 0) gtk_main_quit(); #endif } +/* + * Read from channel "idx". Blocks until there is something to read or the + * timeout expires. + * Returns what was read in allocated memory. + * Returns NULL in case of error or timeout. + */ + char_u * +channel_read_block(int idx) +{ + if (channel_peek(idx) == NULL) + { + /* Wait for up to 2 seconds. + * TODO: use timeout set on the channel. */ + if (channel_wait(channels[idx].ch_fd, 2000) == FAIL) + { + channels[idx].ch_will_block = FALSE; + return NULL; + } + channel_read(idx); + } + + /* Concatenate everything into one buffer. + * TODO: avoid multiple allocations. */ + while (channel_collapse(idx) == OK) + ; + + channels[idx].ch_will_block = FALSE; + return channel_get(idx); +} + # if defined(FEAT_GUI_W32) || defined(PROTO) /* * Lookup the channel index from the socket. @@ -668,8 +877,9 @@ channel_socket2idx(sock_T fd) /* * Write "buf" (NUL terminated string) to channel "idx". * When "fun" is not NULL an error message might be given. + * Return FAIL or OK. */ - void + int channel_send(int idx, char_u *buf, char *fun) { channel_T *channel = &channels[idx]; @@ -683,8 +893,10 @@ channel_send(int idx, char_u *buf, char *fun) EMSG2("E630: %s(): write while not connected", fun); } channel->ch_error = TRUE; + return FAIL; } - else if (sock_write(channel->ch_fd, buf, len) != len) + + if (sock_write(channel->ch_fd, buf, len) != len) { if (!channel->ch_error && fun != NULL) { @@ -692,9 +904,11 @@ channel_send(int idx, char_u *buf, char *fun) EMSG2("E631: %s(): write failed", fun); } channel->ch_error = TRUE; + return FAIL; } - else - channel->ch_error = FALSE; + + channel->ch_error = FALSE; + return OK; } # if (defined(UNIX) && !defined(HAVE_SELECT)) || defined(PROTO) diff --git a/src/eval.c b/src/eval.c index 5f05c3de90..9a8201eb5b 100644 --- a/src/eval.c +++ b/src/eval.c @@ -458,7 +458,6 @@ static int get_env_tv(char_u **arg, typval_T *rettv, int evaluate); static int find_internal_func(char_u *name); static char_u *deref_func_name(char_u *name, int *lenp, int no_autoload); static int get_func_tv(char_u *name, int len, typval_T *rettv, char_u **arg, linenr_T firstline, linenr_T lastline, int *doesrange, int evaluate, dict_T *selfdict); -static int call_func(char_u *funcname, int len, typval_T *rettv, int argcount, typval_T *argvars, linenr_T firstline, linenr_T lastline, int *doesrange, int evaluate, dict_T *selfdict); static void emsg_funcname(char *ermsg, char_u *name); static int non_zero_arg(typval_T *argvars); @@ -516,6 +515,9 @@ static void f_copy(typval_T *argvars, typval_T *rettv); static void f_cos(typval_T *argvars, typval_T *rettv); static void f_cosh(typval_T *argvars, typval_T *rettv); #endif +#ifdef FEAT_CHANNEL +static void f_connect(typval_T *argvars, typval_T *rettv); +#endif static void f_count(typval_T *argvars, typval_T *rettv); static void f_cscope_connection(typval_T *argvars, typval_T *rettv); static void f_cursor(typval_T *argsvars, typval_T *rettv); @@ -524,6 +526,9 @@ static void f_delete(typval_T *argvars, typval_T *rettv); static void f_did_filetype(typval_T *argvars, typval_T *rettv); static void f_diff_filler(typval_T *argvars, typval_T *rettv); static void f_diff_hlID(typval_T *argvars, typval_T *rettv); +#ifdef FEAT_CHANNEL +static void f_disconnect(typval_T *argvars, typval_T *rettv); +#endif static void f_empty(typval_T *argvars, typval_T *rettv); static void f_escape(typval_T *argvars, typval_T *rettv); static void f_eval(typval_T *argvars, typval_T *rettv); @@ -698,6 +703,10 @@ static void f_searchdecl(typval_T *argvars, typval_T *rettv); static void f_searchpair(typval_T *argvars, typval_T *rettv); static void f_searchpairpos(typval_T *argvars, typval_T *rettv); static void f_searchpos(typval_T *argvars, typval_T *rettv); +#ifdef FEAT_CHANNEL +static void f_sendexpr(typval_T *argvars, typval_T *rettv); +static void f_sendraw(typval_T *argvars, typval_T *rettv); +#endif static void f_server2client(typval_T *argvars, typval_T *rettv); static void f_serverlist(typval_T *argvars, typval_T *rettv); static void f_setbufvar(typval_T *argvars, typval_T *rettv); @@ -8170,6 +8179,9 @@ static struct fst {"complete_check", 0, 0, f_complete_check}, #endif {"confirm", 1, 4, f_confirm}, +#ifdef FEAT_CHANNEL + {"connect", 2, 3, f_connect}, +#endif {"copy", 1, 1, f_copy}, #ifdef FEAT_FLOAT {"cos", 1, 1, f_cos}, @@ -8183,6 +8195,9 @@ static struct fst {"did_filetype", 0, 0, f_did_filetype}, {"diff_filler", 1, 1, f_diff_filler}, {"diff_hlID", 2, 2, f_diff_hlID}, +#ifdef FEAT_CHANNEL + {"disconnect", 1, 1, f_disconnect}, +#endif {"empty", 1, 1, f_empty}, {"escape", 2, 2, f_escape}, {"eval", 1, 1, f_eval}, @@ -8361,6 +8376,10 @@ static struct fst {"searchpair", 3, 7, f_searchpair}, {"searchpairpos", 3, 7, f_searchpairpos}, {"searchpos", 1, 4, f_searchpos}, +#ifdef FEAT_CHANNEL + {"sendexpr", 2, 3, f_sendexpr}, + {"sendraw", 2, 3, f_sendraw}, +#endif {"server2client", 2, 2, f_server2client}, {"serverlist", 0, 0, f_serverlist}, {"setbufvar", 3, 3, f_setbufvar}, @@ -8674,7 +8693,7 @@ get_func_tv(name, len, rettv, arg, firstline, lastline, doesrange, * Return FAIL when the function can't be called, OK otherwise. * Also returns OK when an error was encountered while executing the function. */ - static int + int call_func(funcname, len, rettv, argcount, argvars, firstline, lastline, doesrange, evaluate, selfdict) char_u *funcname; /* name of the function */ @@ -10293,6 +10312,83 @@ f_count(argvars, rettv) rettv->vval.v_number = n; } +#ifdef FEAT_CHANNEL +/* + * Get a callback from "arg". It can be a Funcref or a function name. + * When "arg" is zero return an empty string. + * Return NULL for an invalid argument. + */ + static char_u * +get_callback(typval_T *arg) +{ + if (arg->v_type == VAR_FUNC || arg->v_type == VAR_STRING) + return arg->vval.v_string; + if (arg->v_type == VAR_NUMBER && arg->vval.v_number == 0) + return (char_u *)""; + EMSG(_("E999: Invalid callback argument")); + return NULL; +} + +/* + * "connect()" function + */ + static void +f_connect(argvars, rettv) + typval_T *argvars; + typval_T *rettv; +{ + char_u *address; + char_u *mode; + char_u *callback = NULL; + char_u buf1[NUMBUFLEN]; + char_u *p; + int port; + int json_mode = FALSE; + + address = get_tv_string(&argvars[0]); + mode = get_tv_string_buf(&argvars[1], buf1); + if (argvars[2].v_type != VAR_UNKNOWN) + { + callback = get_callback(&argvars[2]); + if (callback == NULL) + return; + } + + /* parse address */ + p = vim_strchr(address, ':'); + if (p == NULL) + { + EMSG2(_(e_invarg2), address); + return; + } + *p++ = NUL; + port = atoi((char *)p); + if (*address == NUL || port <= 0) + { + p[-1] = ':'; + EMSG2(_(e_invarg2), address); + return; + } + + /* parse mode */ + if (STRCMP(mode, "json") == 0) + json_mode = TRUE; + else if (STRCMP(mode, "raw") != 0) + { + EMSG2(_(e_invarg2), mode); + return; + } + + rettv->vval.v_number = channel_open((char *)address, port, NULL); + if (rettv->vval.v_number >= 0) + { + channel_set_json_mode(rettv->vval.v_number, json_mode); + if (callback != NULL && *callback != NUL) + channel_set_callback(rettv->vval.v_number, callback); + } +} +#endif + /* * "cscope_connection([{num} , {dbpath} [, {prepend}]])" function * @@ -10545,6 +10641,46 @@ f_diff_hlID(argvars, rettv) #endif } +#ifdef FEAT_CHANNEL +/* + * Get the channel index from the handle argument. + * Returns -1 if the handle is invalid or the channel is closed. + */ + static int +get_channel_arg(typval_T *tv) +{ + int ch_idx; + + if (tv->v_type != VAR_NUMBER) + { + EMSG2(_(e_invarg2), get_tv_string(tv)); + return -1; + } + ch_idx = tv->vval.v_number; + + if (!channel_is_open(ch_idx)) + { + EMSGN(_("E999: not an open channel"), ch_idx); + return -1; + } + return ch_idx; +} + +/* + * "disconnect()" function + */ + static void +f_disconnect(argvars, rettv) + typval_T *argvars; + typval_T *rettv UNUSED; +{ + int ch_idx = get_channel_arg(&argvars[0]); + + if (ch_idx >= 0) + channel_close(ch_idx); +} +#endif + /* * "empty({expr})" function */ @@ -17378,6 +17514,109 @@ f_searchpos(argvars, rettv) list_append_number(rettv->vval.v_list, (varnumber_T)n); } +#ifdef FEAT_CHANNEL +/* + * common for "sendexpr()" and "sendraw()" + * Returns the channel index if the caller should read the response. + * Otherwise returns -1. + */ + static int +send_common(typval_T *argvars, char_u *text, char *fun) +{ + int ch_idx; + char_u *callback = NULL; + + ch_idx = get_channel_arg(&argvars[0]); + if (ch_idx < 0) + return -1; + + if (argvars[2].v_type != VAR_UNKNOWN) + { + callback = get_callback(&argvars[2]); + if (callback == NULL) + return -1; + } + /* Set the callback or clear it. An empty callback means no callback and + * not reading the response. */ + channel_set_req_callback(ch_idx, + callback != NULL && *callback == NUL ? NULL : callback); + if (callback == NULL) + channel_will_block(ch_idx); + + if (channel_send(ch_idx, text, fun) == OK && callback == NULL) + return ch_idx; + return -1; +} + +/* + * "sendexpr()" function + */ + static void +f_sendexpr(argvars, rettv) + typval_T *argvars; + typval_T *rettv; +{ + char_u *text; + char_u *resp; + typval_T nrtv; + typval_T listtv; + int ch_idx; + + /* return an empty string by default */ + rettv->v_type = VAR_STRING; + rettv->vval.v_string = NULL; + + nrtv.v_type = VAR_NUMBER; + nrtv.vval.v_number = channel_get_id(); + if (rettv_list_alloc(&listtv) == FAIL) + return; + if (list_append_tv(listtv.vval.v_list, &nrtv) == FAIL + || list_append_tv(listtv.vval.v_list, &argvars[1]) == FAIL) + { + list_unref(listtv.vval.v_list); + return; + } + + text = json_encode(&listtv); + list_unref(listtv.vval.v_list); + + ch_idx = send_common(argvars, text, "sendexpr"); + if (ch_idx >= 0) + { + /* TODO: read until the whole JSON message is received */ + /* TODO: only use the message with the right message ID */ + resp = channel_read_block(ch_idx); + if (resp != NULL) + { + channel_decode_json(resp, rettv); + vim_free(resp); + } + } +} + +/* + * "sendraw()" function + */ + static void +f_sendraw(argvars, rettv) + typval_T *argvars; + typval_T *rettv; +{ + char_u buf[NUMBUFLEN]; + char_u *text; + int ch_idx; + + /* return an empty string by default */ + rettv->v_type = VAR_STRING; + rettv->vval.v_string = NULL; + + text = get_tv_string_buf(&argvars[1], buf); + ch_idx = send_common(argvars, text, "sendraw"); + if (ch_idx >= 0) + rettv->vval.v_string = channel_read_block(ch_idx); +} +#endif + static void f_server2client(argvars, rettv) diff --git a/src/proto/channel.pro b/src/proto/channel.pro index 1cdef5e58f..110bb1da31 100644 --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -1,6 +1,11 @@ /* channel.c */ void channel_gui_register_all(void); int channel_open(char *hostname, int port_in, void (*close_cb)(void)); +void channel_set_json_mode(int idx, int json_mode); +void channel_set_callback(int idx, char_u *callback); +void channel_set_req_callback(int idx, char_u *callback); +void channel_will_block(int idx); +int channel_decode_json(char_u *msg, typval_T *tv); int channel_is_open(int idx); void channel_close(int idx); void channel_save(int idx, char_u *buf, int len); @@ -8,9 +13,11 @@ char_u *channel_peek(int idx); char_u *channel_get(int idx); int channel_collapse(int idx); void channel_clear(int idx); +int channel_get_id(void); void channel_read(int idx); +char_u *channel_read_block(int idx); int channel_socket2idx(sock_T fd); -void channel_send(int idx, char_u *buf, char *fun); +int channel_send(int idx, char_u *buf, char *fun); int channel_poll_setup(int nfd_in, void *fds_in); int channel_poll_check(int ret_in, void *fds_in); int channel_select_setup(int maxfd_in, void *rfds_in); diff --git a/src/proto/eval.pro b/src/proto/eval.pro index f6ad4b49e8..ea2096a5d5 100644 --- a/src/proto/eval.pro +++ b/src/proto/eval.pro @@ -82,6 +82,7 @@ long get_dict_number(dict_T *d, char_u *key); int string2float(char_u *text, float_T *value); char_u *get_function_name(expand_T *xp, int idx); char_u *get_expr_name(expand_T *xp, int idx); +int call_func(char_u *funcname, int len, typval_T *rettv, int argcount, typval_T *argvars, linenr_T firstline, linenr_T lastline, int *doesrange, int evaluate, dict_T *selfdict); int func_call(char_u *name, typval_T *args, dict_T *selfdict, typval_T *rettv); void dict_extend(dict_T *d1, dict_T *d2, char_u *action); void mzscheme_call_vim(char_u *name, typval_T *args, typval_T *rettv); diff --git a/src/version.c b/src/version.c index 287bc93f91..e464a4252d 100644 --- a/src/version.c +++ b/src/version.c @@ -746,6 +746,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ +/**/ + 1191, /**/ 1190, /**/ -- cgit v1.2.3