summaryrefslogtreecommitdiffstats
path: root/net/tipc/server.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-02-15 10:40:45 +0100
committerDavid S. Miller <davem@davemloft.net>2018-02-16 15:26:33 -0500
commit414574a0af36d329f560f542e650cc4a81cc1d69 (patch)
treef4a319ea2214e6fb3f8a12521001bf0b50add3a2 /net/tipc/server.c
parentdf79d040dcd7d7e580c50edf40b82e677fe84801 (diff)
tipc: simplify interaction between subscription and topology connection
The message transmission and reception in the topology server is more generic than is currently necessary. By basing the funtionality on the fact that we only send items of type struct tipc_event and always receive items of struct tipc_subcr we can make several simplifications, and also get rid of some unnecessary dynamic memory allocations. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/server.c')
-rw-r--r--net/tipc/server.c170
1 files changed, 58 insertions, 112 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
index b8268c0882a7..7933fb92d852 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -84,9 +84,9 @@ struct tipc_conn {
/* An entry waiting to be sent */
struct outqueue_entry {
- u32 evt;
+ bool inactive;
+ struct tipc_event evt;
struct list_head list;
- struct kvec iov;
};
static void tipc_recv_work(struct work_struct *work);
@@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
return con;
}
+/* sock_data_ready - interrupt callback indicating the socket has data to read
+ * The queued job is launched in tipc_recv_from_sock()
+ */
static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
@@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk)
read_unlock_bh(&sk->sk_callback_lock);
}
+/* sock_write_space - interrupt callback after a sendmsg EAGAIN
+ * Indicates that there now is more is space in the send buffer
+ * The queued job is launched in tipc_send_to_sock()
+ */
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
@@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
return con;
}
-int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
- void *buf, size_t len)
+static int tipc_con_rcv_sub(struct tipc_server *srv,
+ struct tipc_conn *con,
+ struct tipc_subscr *s)
{
- struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
@@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
- sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
+ sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
if (!sub)
return -1;
@@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
static int tipc_receive_from_sock(struct tipc_conn *con)
{
- struct tipc_server *s = con->server;
+ struct tipc_server *srv = con->server;
struct sock *sk = con->sock->sk;
struct msghdr msg = {};
+ struct tipc_subscr s;
struct kvec iov;
- void *buf;
int ret;
- buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
- if (!buf) {
- ret = -ENOMEM;
- goto out_close;
- }
-
- iov.iov_base = buf;
- iov.iov_len = s->max_rcvbuf_size;
+ iov.iov_base = &s;
+ iov.iov_len = sizeof(s);
msg.msg_name = NULL;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
- if (ret <= 0) {
- kmem_cache_free(s->rcvbuf_cache, buf);
- goto out_close;
+ if (ret == -EWOULDBLOCK)
+ return -EWOULDBLOCK;
+ if (ret > 0) {
+ read_lock_bh(&sk->sk_callback_lock);
+ ret = tipc_con_rcv_sub(srv, con, &s);
+ read_unlock_bh(&sk->sk_callback_lock);
}
-
- read_lock_bh(&sk->sk_callback_lock);
- ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
- read_unlock_bh(&sk->sk_callback_lock);
- kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0)
- tipc_conn_terminate(s, con->conid);
- return ret;
-
-out_close:
- if (ret != -EWOULDBLOCK)
tipc_close_conn(con);
- else if (ret == 0)
- /* Don't return success if we really got EOF */
- ret = -EAGAIN;
return ret;
}
@@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s)
return 0;
}
-static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
-{
- struct outqueue_entry *entry;
- void *buf;
-
- entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
- if (!entry)
- return NULL;
-
- buf = kmemdup(data, len, GFP_ATOMIC);
- if (!buf) {
- kfree(entry);
- return NULL;
- }
-
- entry->iov.iov_base = buf;
- entry->iov.iov_len = len;
-
- return entry;
-}
-
-static void tipc_free_entry(struct outqueue_entry *e)
-{
- kfree(e->iov.iov_base);
- kfree(e);
-}
-
static void tipc_clean_outqueues(struct tipc_conn *con)
{
struct outqueue_entry *e, *safe;
@@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
spin_lock_bh(&con->outqueue_lock);
list_for_each_entry_safe(e, safe, &con->outqueue, list) {
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
}
-int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- u32 evt, void *data, size_t len)
+/* tipc_conn_queue_evt - interrupt level call from a subscription instance
+ * The queued job is launched in tipc_send_to_sock()
+ */
+void tipc_conn_queue_evt(struct tipc_server *s, int conid,
+ u32 event, struct tipc_event *evt)
{
struct outqueue_entry *e;
struct tipc_conn *con;
con = tipc_conn_lookup(s, conid);
if (!con)
- return -EINVAL;
+ return;
- if (!connected(con)) {
- conn_put(con);
- return 0;
- }
+ if (!connected(con))
+ goto err;
- e = tipc_alloc_entry(data, len);
- if (!e) {
- conn_put(con);
- return -ENOMEM;
- }
- e->evt = evt;
+ e = kmalloc(sizeof(*e), GFP_ATOMIC);
+ if (!e)
+ goto err;
+ e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
+ memcpy(&e->evt, evt, sizeof(*evt));
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);
- if (!queue_work(s->send_wq, &con->swork))
- conn_put(con);
- return 0;
-}
-
-void tipc_conn_terminate(struct tipc_server *s, int conid)
-{
- struct tipc_conn *con;
-
- con = tipc_conn_lookup(s, conid);
- if (con) {
- tipc_close_conn(con);
- conn_put(con);
- }
+ if (queue_work(s->send_wq, &con->swork))
+ return;
+err:
+ conn_put(con);
}
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
@@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
*conid = con->conid;
con->sock = NULL;
- rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
+ rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
if (rc < 0)
tipc_close_conn(con);
return !rc;
@@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
+ struct kvec iov;
int count = 0;
int ret;
@@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con)
while (!list_empty(queue)) {
e = list_first_entry(queue, struct outqueue_entry, list);
-
+ evt = &e->evt;
spin_unlock_bh(&con->outqueue_lock);
- if (e->evt == TIPC_SUBSCR_TIMEOUT) {
- evt = (struct tipc_event *)e->iov.iov_base;
+ if (e->inactive)
tipc_con_delete_sub(con, &evt->s);
- }
+
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
+ iov.iov_base = evt;
+ iov.iov_len = sizeof(*evt);
+ msg.msg_name = NULL;
if (con->sock) {
- ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
- e->iov.iov_len);
+ ret = kernel_sendmsg(con->sock, &msg, &iov,
+ 1, sizeof(*evt));
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
- goto send_err;
+ goto err;
}
} else {
- evt = e->iov.iov_base;
tipc_send_kern_top_evt(srv->net, evt);
}
@@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con)
}
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
out:
return;
-
-send_err:
+err:
tipc_close_conn(con);
}
@@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s)
idr_init(&s->conn_idr);
s->idr_in_use = 0;
- s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
- 0, SLAB_HWCACHE_ALIGN, NULL);
- if (!s->rcvbuf_cache)
- return -ENOMEM;
-
ret = tipc_work_start(s);
- if (ret < 0) {
- kmem_cache_destroy(s->rcvbuf_cache);
+ if (ret < 0)
return ret;
- }
+
ret = tipc_open_listening_sock(s);
- if (ret < 0) {
+ if (ret < 0)
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
- return ret;
- }
+
return ret;
}
@@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s)
spin_unlock_bh(&s->idr_lock);
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
idr_destroy(&s->conn_idr);
}