summaryrefslogtreecommitdiffstats
path: root/src/channel.c
diff options
context:
space:
mode:
authorBram Moolenaar <Bram@vim.org>2017-08-18 20:50:30 +0200
committerBram Moolenaar <Bram@vim.org>2017-08-18 20:50:30 +0200
commit97bd5e6527bf2b48acdd1550acba161e82a5bc99 (patch)
tree22b6437023669cf07961a5ac29490ea8d33e9340 /src/channel.c
parentcfce71710b6a2e1fb7f7f27d2a359e4b926f3af9 (diff)
patch 8.0.0957: a terminal job can deadlock when sending many keysv8.0.0957
Problem: When term_sendkeys() sends many keys it may get stuck in writing to the job. Solution: Make the write non-blocking, buffer keys to be sent.
Diffstat (limited to 'src/channel.c')
-rw-r--r--src/channel.c222
1 files changed, 185 insertions, 37 deletions
diff --git a/src/channel.c b/src/channel.c
index e8030798ca..fab35899a4 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -1373,7 +1373,7 @@ can_write_buf_line(channel_T *channel)
}
/*
- * Write any lines to the input channel.
+ * Write any buffer lines to the input channel.
*/
static void
channel_write_in(channel_T *channel)
@@ -1446,6 +1446,25 @@ channel_buffer_free(buf_T *buf)
}
/*
+ * Write any lines waiting to be written to "channel".
+ */
+ static void
+channel_write_input(channel_T *channel)
+{
+ chanpart_T *in_part = &channel->ch_part[PART_IN];
+
+ if (in_part->ch_writeque.wq_next != NULL)
+ channel_send(channel, PART_IN, (char_u *)"", 0, "channel_write_input");
+ else if (in_part->ch_bufref.br_buf != NULL)
+ {
+ if (in_part->ch_buf_append)
+ channel_write_new_lines(in_part->ch_bufref.br_buf);
+ else
+ channel_write_in(channel);
+ }
+}
+
+/*
* Write any lines waiting to be written to a channel.
*/
void
@@ -1454,17 +1473,7 @@ channel_write_any_lines(void)
channel_T *channel;
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
- {
- chanpart_T *in_part = &channel->ch_part[PART_IN];
-
- if (in_part->ch_bufref.br_buf != NULL)
- {
- if (in_part->ch_buf_append)
- channel_write_new_lines(in_part->ch_bufref.br_buf);
- else
- channel_write_in(channel);
- }
- }
+ channel_write_input(channel);
}
/*
@@ -2984,7 +2993,9 @@ channel_fill_wfds(int maxfd_arg, fd_set *wfds)
{
chanpart_T *in_part = &ch->ch_part[PART_IN];
- if (in_part->ch_fd != INVALID_FD && in_part->ch_bufref.br_buf != NULL)
+ if (in_part->ch_fd != INVALID_FD
+ && (in_part->ch_bufref.br_buf != NULL
+ || in_part->ch_writeque.wq_next != NULL))
{
FD_SET((int)in_part->ch_fd, wfds);
if ((int)in_part->ch_fd >= maxfd)
@@ -3530,6 +3541,31 @@ channel_handle_events(void)
# endif
/*
+ * Set "channel"/"part" to non-blocking.
+ */
+ void
+channel_set_nonblock(channel_T *channel, ch_part_T part)
+{
+ chanpart_T *ch_part = &channel->ch_part[part];
+ int fd = ch_part->ch_fd;
+
+ if (fd != INVALID_FD)
+ {
+#ifdef _WIN32
+ if (part == PART_SOCK)
+ {
+ u_long val = 1;
+
+ ioctlsocket(fd, FIONBIO, &val);
+ }
+ else
+#endif
+ fcntl(fd, F_SETFL, O_NONBLOCK);
+ ch_part->ch_nonblocking = TRUE;
+ }
+}
+
+/*
* Write "buf" (NUL terminated string) to "channel"/"part".
* When "fun" is not NULL an error message might be given.
* Return FAIL or OK.
@@ -3538,14 +3574,16 @@ channel_handle_events(void)
channel_send(
channel_T *channel,
ch_part_T part,
- char_u *buf,
- int len,
+ char_u *buf_arg,
+ int len_arg,
char *fun)
{
int res;
sock_T fd;
+ chanpart_T *ch_part = &channel->ch_part[part];
+ int did_use_queue = FALSE;
- fd = channel->ch_part[part].ch_fd;
+ fd = ch_part->ch_fd;
if (fd == INVALID_FD)
{
if (!channel->ch_error && fun != NULL)
@@ -3561,29 +3599,145 @@ channel_send(
{
ch_log_lead("SEND ", channel);
fprintf(log_fd, "'");
- ignored = (int)fwrite(buf, len, 1, log_fd);
+ ignored = (int)fwrite(buf_arg, len_arg, 1, log_fd);
fprintf(log_fd, "'\n");
fflush(log_fd);
did_log_msg = TRUE;
}
- if (part == PART_SOCK)
- res = sock_write(fd, (char *)buf, len);
- else
- res = fd_write(fd, (char *)buf, len);
- if (res != len)
+ for (;;)
{
- if (!channel->ch_error && fun != NULL)
+ writeq_T *wq = &ch_part->ch_writeque;
+ char_u *buf;
+ int len;
+
+ if (wq->wq_next != NULL)
{
- ch_error(channel, "%s(): write failed", fun);
- EMSG2(_("E631: %s(): write failed"), fun);
+ /* first write what was queued */
+ buf = wq->wq_next->wq_ga.ga_data;
+ len = wq->wq_next->wq_ga.ga_len;
+ did_use_queue = TRUE;
+ }
+ else
+ {
+ if (len_arg == 0)
+ /* nothing to write, called from channel_select_check() */
+ return OK;
+ buf = buf_arg;
+ len = len_arg;
}
- channel->ch_error = TRUE;
- return FAIL;
- }
- channel->ch_error = FALSE;
- return OK;
+ if (part == PART_SOCK)
+ res = sock_write(fd, (char *)buf, len);
+ else
+ res = fd_write(fd, (char *)buf, len);
+ if (res < 0 && (errno == EWOULDBLOCK
+#ifdef EAGAIN
+ || errno == EAGAIN
+#endif
+ ))
+ res = 0; /* nothing got written */
+
+ if (res >= 0 && ch_part->ch_nonblocking)
+ {
+ writeq_T *entry = wq->wq_next;
+
+ if (did_use_queue)
+ ch_log(channel, "Sent %d bytes now", res);
+ if (res == len)
+ {
+ /* Wrote all the buf[len] bytes. */
+ if (entry != NULL)
+ {
+ /* Remove the entry from the write queue. */
+ ga_clear(&entry->wq_ga);
+ wq->wq_next = entry->wq_next;
+ if (wq->wq_next == NULL)
+ wq->wq_prev = NULL;
+ else
+ wq->wq_next->wq_prev = NULL;
+ continue;
+ }
+ if (did_use_queue)
+ ch_log(channel, "Write queue empty");
+ }
+ else
+ {
+ /* Wrote only buf[res] bytes, can't write more now. */
+ if (entry != NULL)
+ {
+ if (res > 0)
+ {
+ /* Remove the bytes that were written. */
+ mch_memmove(entry->wq_ga.ga_data,
+ (char *)entry->wq_ga.ga_data + res,
+ len - res);
+ entry->wq_ga.ga_len -= res;
+ }
+ buf = buf_arg;
+ len = len_arg;
+ }
+ else
+ {
+ buf += res;
+ len -= res;
+ }
+ ch_log(channel, "Adding %d bytes to the write queue", len);
+
+ /* Append the not written bytes of the argument to the write
+ * buffer. Limit entries to 4000 bytes. */
+ if (wq->wq_prev != NULL
+ && wq->wq_prev->wq_ga.ga_len + len < 4000)
+ {
+ writeq_T *last = wq->wq_prev;
+
+ /* append to the last entry */
+ if (ga_grow(&last->wq_ga, len) == OK)
+ {
+ mch_memmove((char *)last->wq_ga.ga_data
+ + last->wq_ga.ga_len,
+ buf, len);
+ last->wq_ga.ga_len += len;
+ }
+ }
+ else
+ {
+ writeq_T *last = (writeq_T *)alloc((int)sizeof(writeq_T));
+
+ if (last != NULL)
+ {
+ ch_log(channel, "Creating new entry");
+ last->wq_prev = wq->wq_prev;
+ last->wq_next = NULL;
+ if (wq->wq_prev == NULL)
+ wq->wq_next = last;
+ else
+ wq->wq_prev->wq_next = last;
+ wq->wq_prev = last;
+ ga_init2(&last->wq_ga, 1, 1000);
+ if (ga_grow(&last->wq_ga, len) == OK)
+ {
+ mch_memmove(last->wq_ga.ga_data, buf, len);
+ last->wq_ga.ga_len = len;
+ }
+ }
+ }
+ }
+ }
+ else if (res != len)
+ {
+ if (!channel->ch_error && fun != NULL)
+ {
+ ch_error(channel, "%s(): write failed", fun);
+ EMSG2(_("E631: %s(): write failed"), fun);
+ }
+ channel->ch_error = TRUE;
+ return FAIL;
+ }
+
+ channel->ch_error = FALSE;
+ return OK;
+ }
}
/*
@@ -3873,13 +4027,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
if (ret > 0 && in_part->ch_fd != INVALID_FD
&& FD_ISSET(in_part->ch_fd, wfds))
{
- if (in_part->ch_buf_append)
- {
- if (in_part->ch_bufref.br_buf != NULL)
- channel_write_new_lines(in_part->ch_bufref.br_buf);
- }
- else
- channel_write_in(channel);
+ channel_write_input(channel);
--ret;
}
}