summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBram Moolenaar <Bram@vim.org>2016-03-20 16:40:37 +0100
committerBram Moolenaar <Bram@vim.org>2016-03-20 16:40:37 +0100
commitba61ac0d61f46de7d29c64bb0de6d25c2e378be0 (patch)
tree73b207b09bb0d57e5a9cf4f80cd514fbace293fd
parentac74d5e86cd16b42e81ba48f58f3d45c72758248 (diff)
patch 7.4.1617v7.4.1617
Problem: When a JSON message is split it isn't decoded. Solution: Wait a short time for the rest of the message to arrive.
-rw-r--r--src/channel.c163
-rw-r--r--src/json.c7
-rw-r--r--src/structs.h8
-rw-r--r--src/testdir/test_channel.py27
-rw-r--r--src/testdir/test_channel.vim9
-rw-r--r--src/version.c2
6 files changed, 181 insertions, 35 deletions
diff --git a/src/channel.c b/src/channel.c
index 0d3f452d5b..b3115adb71 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -1362,7 +1362,7 @@ channel_collapse(channel_T *channel, int part)
* Returns OK or FAIL.
*/
static int
-channel_save(channel_T *channel, int part, char_u *buf, int len)
+channel_save(channel_T *channel, int part, char_u *buf, int len, char *lead)
{
readq_T *node;
readq_T *head = &channel->ch_part[part].ch_head;
@@ -1403,9 +1403,9 @@ channel_save(channel_T *channel, int part, char_u *buf, int len)
head->rq_prev->rq_next = node;
head->rq_prev = node;
- if (log_fd != NULL)
+ if (log_fd != NULL && lead != NULL)
{
- ch_log_lead("RECV ", channel);
+ ch_log_lead(lead, channel);
fprintf(log_fd, "'");
if (fwrite(buf, len, 1, log_fd) != 1)
return FAIL;
@@ -1415,7 +1415,7 @@ channel_save(channel_T *channel, int part, char_u *buf, int len)
}
/*
- * Use the read buffer of "channel"/"part" and parse a JSON messages that is
+ * Use the read buffer of "channel"/"part" and parse a JSON message that is
* complete. The messages are added to the queue.
* Return TRUE if there is more to read.
*/
@@ -1425,7 +1425,9 @@ channel_parse_json(channel_T *channel, int part)
js_read_T reader;
typval_T listtv;
jsonq_T *item;
- jsonq_T *head = &channel->ch_part[part].ch_json_head;
+ chanpart_T *chanpart = &channel->ch_part[part];
+ jsonq_T *head = &chanpart->ch_json_head;
+ int status;
int ret;
if (channel_peek(channel, part) == NULL)
@@ -1438,15 +1440,23 @@ channel_parse_json(channel_T *channel, int part)
reader.js_fill = NULL;
/* reader.js_fill = channel_fill; */
reader.js_cookie = channel;
- ret = json_decode(&reader, &listtv,
- channel->ch_part[part].ch_mode == MODE_JS ? JSON_JS : 0);
- if (ret == OK)
+
+ /* When a message is incomplete we wait for a short while for more to
+ * arrive. After the delay drop the input, otherwise a truncated string
+ * or list will make us hang. */
+ status = json_decode(&reader, &listtv,
+ chanpart->ch_mode == MODE_JS ? JSON_JS : 0);
+ if (status == OK)
{
/* Only accept the response when it is a list with at least two
* items. */
if (listtv.v_type != VAR_LIST || listtv.vval.v_list->lv_len < 2)
{
- /* TODO: give error */
+ if (listtv.v_type != VAR_LIST)
+ ch_error(channel, "Did not receive a list, discarding");
+ else
+ ch_errorn(channel, "Expected list with two items, got %d",
+ listtv.vval.v_list->lv_len);
clear_tv(&listtv);
}
else
@@ -1477,22 +1487,71 @@ channel_parse_json(channel_T *channel, int part)
}
}
- /* Put the unread part back into the channel.
- * TODO: insert in front */
- if (reader.js_buf[reader.js_used] != NUL)
+ if (status == OK)
+ chanpart->ch_waiting = FALSE;
+ else if (status == MAYBE)
{
- if (ret == FAIL)
+ if (!chanpart->ch_waiting)
{
- ch_error(channel, "Decoding failed - discarding input");
- ret = FALSE;
+ /* First time encountering incomplete message, set a deadline of
+ * 100 msec. */
+ ch_log(channel, "Incomplete message - wait for more");
+ reader.js_used = 0;
+ chanpart->ch_waiting = TRUE;
+#ifdef WIN32
+ chanpart->ch_deadline = GetTickCount() + 100L;
+#else
+ gettimeofday(&chanpart->ch_deadline, NULL);
+ chanpart->ch_deadline.tv_usec += 100 * 1000;
+ if (chanpart->ch_deadline.tv_usec > 1000 * 1000)
+ {
+ chanpart->ch_deadline.tv_usec -= 1000 * 1000;
+ ++chanpart->ch_deadline.tv_sec;
+ }
+#endif
}
else
{
- channel_save(channel, part, reader.js_buf + reader.js_used,
- (int)(reader.js_end - reader.js_buf) - reader.js_used);
- ret = TRUE;
+ int timeout;
+#ifdef WIN32
+ timeout = GetTickCount() > chanpart->ch_deadline;
+#else
+ {
+ struct timeval now_tv;
+
+ gettimeofday(&now_tv, NULL);
+ timeout = now_tv.tv_sec > chanpart->ch_deadline.tv_sec
+ || (now_tv.tv_sec == chanpart->ch_deadline.tv_sec
+ && now_tv.tv_usec > chanpart->ch_deadline.tv_usec);
+ }
+#endif
+ if (timeout)
+ {
+ status = FAIL;
+ chanpart->ch_waiting = FALSE;
+ }
+ else
+ {
+ reader.js_used = 0;
+ ch_log(channel, "still waiting on incomplete message");
+ }
}
}
+
+ if (status == FAIL)
+ {
+ ch_error(channel, "Decoding failed - discarding input");
+ ret = FALSE;
+ chanpart->ch_waiting = FALSE;
+ }
+ else if (reader.js_buf[reader.js_used] != NUL)
+ {
+ /* Put the unread part back into the channel.
+ * TODO: insert in front */
+ channel_save(channel, part, reader.js_buf + reader.js_used,
+ (int)(reader.js_end - reader.js_buf) - reader.js_used, NULL);
+ ret = status == MAYBE ? FALSE: TRUE;
+ }
else
ret = FALSE;
@@ -1559,6 +1618,8 @@ channel_get_json(channel_T *channel, int part, int id, typval_T **rettv)
|| tv->vval.v_number != channel->ch_part[part].ch_block_id)))
{
*rettv = item->jq_value;
+ if (tv->v_type == VAR_NUMBER)
+ ch_logn(channel, "Getting JSON message %d", tv->vval.v_number);
remove_json_node(head, item);
return OK;
}
@@ -2289,7 +2350,7 @@ channel_read(channel_T *channel, int part, char *func)
break; /* error or nothing more to read */
/* Store the read message in the queue. */
- channel_save(channel, part, buf, len);
+ channel_save(channel, part, buf, len, "RECV ");
readlen += len;
if (len < MAXMSGSIZE)
break; /* did read everything that's available */
@@ -2316,7 +2377,7 @@ channel_read(channel_T *channel, int part, char *func)
if (channel->ch_part[part].ch_mode == MODE_RAW
|| channel->ch_part[part].ch_mode == MODE_NL)
channel_save(channel, part, (char_u *)DETACH_MSG_RAW,
- (int)STRLEN(DETACH_MSG_RAW));
+ (int)STRLEN(DETACH_MSG_RAW), "PUT ");
/* TODO: When reading from stdout is not possible, should we try to
* keep stdin and stderr open? Probably not, assume the other side
@@ -2361,9 +2422,13 @@ channel_read_block(channel_T *channel, int part, int timeout)
continue;
/* Wait for up to the channel timeout. */
- if (fd == INVALID_FD
- || channel_wait(channel, fd, timeout) == FAIL)
+ if (fd == INVALID_FD)
+ return NULL;
+ if (channel_wait(channel, fd, timeout) == FAIL)
+ {
+ ch_log(channel, "Timed out");
return NULL;
+ }
channel_read(channel, part, "channel_read_block");
}
@@ -2403,16 +2468,18 @@ channel_read_block(channel_T *channel, int part, int timeout)
channel_read_json_block(
channel_T *channel,
int part,
- int timeout,
+ int timeout_arg,
int id,
typval_T **rettv)
{
int more;
sock_T fd;
+ int timeout;
+ chanpart_T *chanpart = &channel->ch_part[part];
ch_log(channel, "Reading JSON");
if (id != -1)
- channel->ch_part[part].ch_block_id = id;
+ chanpart->ch_block_id = id;
for (;;)
{
more = channel_parse_json(channel, part);
@@ -2420,7 +2487,7 @@ channel_read_json_block(
/* search for messsage "id" */
if (channel_get_json(channel, part, id, rettv) == OK)
{
- channel->ch_part[part].ch_block_id = 0;
+ chanpart->ch_block_id = 0;
return OK;
}
@@ -2431,14 +2498,50 @@ channel_read_json_block(
if (channel_parse_messages())
continue;
- /* Wait for up to the timeout. */
- fd = channel->ch_part[part].ch_fd;
+ /* Wait for up to the timeout. If there was an incomplete message
+ * use the deadline for that. */
+ timeout = timeout_arg;
+ if (chanpart->ch_waiting)
+ {
+#ifdef WIN32
+ timeout = chanpart->ch_deadline - GetTickCount() + 1;
+#else
+ {
+ struct timeval now_tv;
+
+ gettimeofday(&now_tv, NULL);
+ timeout = (chanpart->ch_deadline.tv_sec
+ - now_tv.tv_sec) * 1000
+ + (chanpart->ch_deadline.tv_usec
+ - now_tv.tv_usec) / 1000
+ + 1;
+ }
+#endif
+ if (timeout < 0)
+ {
+ /* Something went wrong, channel_parse_json() didn't
+ * discard message. Cancel waiting. */
+ chanpart->ch_waiting = FALSE;
+ timeout = timeout_arg;
+ }
+ else if (timeout > timeout_arg)
+ timeout = timeout_arg;
+ }
+ fd = chanpart->ch_fd;
if (fd == INVALID_FD || channel_wait(channel, fd, timeout) == FAIL)
- break;
- channel_read(channel, part, "channel_read_json_block");
+ {
+ if (timeout == timeout_arg)
+ {
+ if (fd != INVALID_FD)
+ ch_log(channel, "Timed out");
+ break;
+ }
+ }
+ else
+ channel_read(channel, part, "channel_read_json_block");
}
}
- channel->ch_part[part].ch_block_id = 0;
+ chanpart->ch_block_id = 0;
return FAIL;
}
diff --git a/src/json.c b/src/json.c
index cd80a761ec..9738fc5fec 100644
--- a/src/json.c
+++ b/src/json.c
@@ -877,8 +877,9 @@ json_decode_all(js_read_T *reader, typval_T *res, int options)
/*
* Decode the JSON from "reader" and store the result in "res".
* "options" can be JSON_JS or zero;
- * Return FAIL if the message has a decoding error or the message is
- * truncated. Consumes the message anyway.
+ * Return FAIL for a decoding error.
+ * Return MAYBE for an incomplete message.
+ * Consumes the message anyway.
*/
int
json_decode(js_read_T *reader, typval_T *res, int options)
@@ -891,7 +892,7 @@ json_decode(js_read_T *reader, typval_T *res, int options)
ret = json_decode_item(reader, res, options);
json_skip_white(reader);
- return ret == OK ? OK : FAIL;
+ return ret;
}
/*
diff --git a/src/structs.h b/src/structs.h
index 50e2a68e2e..470beff3c5 100644
--- a/src/structs.h
+++ b/src/structs.h
@@ -1357,6 +1357,14 @@ typedef struct {
jsonq_T ch_json_head; /* header for circular json read queue */
int ch_block_id; /* ID that channel_read_json_block() is
waiting for */
+ /* When ch_waiting is TRUE use ch_deadline to wait for incomplete message
+ * to be complete. */
+ int ch_waiting;
+#ifdef WIN32
+ DWORD ch_deadline;
+#else
+ struct timeval ch_deadline;
+#endif
cbq_T ch_cb_head; /* dummy node for per-request callbacks */
char_u *ch_callback; /* call when a msg is not handled */
diff --git a/src/testdir/test_channel.py b/src/testdir/test_channel.py
index 47a12ea7e0..ca12007a21 100644
--- a/src/testdir/test_channel.py
+++ b/src/testdir/test_channel.py
@@ -104,11 +104,36 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
print("sending: {}".format(cmd))
self.request.sendall(cmd.encode('utf-8'))
response = "ok"
- elif decoded[1] == 'malformed':
+ elif decoded[1] == 'malformed1':
cmd = '["ex",":"]wrong!["ex","smi"]'
print("sending: {}".format(cmd))
self.request.sendall(cmd.encode('utf-8'))
response = "ok"
+ elif decoded[1] == 'malformed2':
+ cmd = '"unterminated string'
+ print("sending: {}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ # Need to wait for Vim to give up, otherwise the double
+ # quote in the "ok" response terminates the string.
+ time.sleep(0.2)
+ elif decoded[1] == 'malformed3':
+ cmd = '["ex","missing ]"'
+ print("sending: {}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ # Need to wait for Vim to give up, otherwise the ]
+ # in the "ok" response terminates the list.
+ time.sleep(0.2)
+ elif decoded[1] == 'split':
+ cmd = '["ex","let '
+ print("sending: {}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ time.sleep(0.01)
+ cmd = 'g:split = 123"]'
+ print("sending: {}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
elif decoded[1] == 'an expr':
# Send an expr request.
cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]'
diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim
index c628bbe881..c4e23c97fa 100644
--- a/src/testdir/test_channel.vim
+++ b/src/testdir/test_channel.vim
@@ -127,7 +127,14 @@ func s:communicate(port)
call assert_equal('got it', ch_evalexpr(handle, 'hello!'))
" Malformed command should be ignored.
- call assert_equal('ok', ch_evalexpr(handle, 'malformed'))
+ call assert_equal('ok', ch_evalexpr(handle, 'malformed1'))
+ call assert_equal('ok', ch_evalexpr(handle, 'malformed2'))
+ call assert_equal('ok', ch_evalexpr(handle, 'malformed3'))
+
+ " split command should work
+ call assert_equal('ok', ch_evalexpr(handle, 'split'))
+ call s:waitFor('exists("g:split")')
+ call assert_equal(123, g:split)
" Request that triggers sending two ex commands. These will usually be
" handled before getting the response, but it's not guaranteed, thus wait a
diff --git a/src/version.c b/src/version.c
index c914ec479b..491ded3217 100644
--- a/src/version.c
+++ b/src/version.c
@@ -749,6 +749,8 @@ static char *(features[]) =
static int included_patches[] =
{ /* Add new patch number below this line */
/**/
+ 1617,
+/**/
1616,
/**/
1615,