summaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-04-28 16:07:26 +0200
committerIlya Dryomov <idryomov@gmail.com>2016-05-26 01:14:03 +0200
commit5aea3dcd50215fa9563270251ad7323e2f2490ee (patch)
tree1bcaaf4d5ff7b443c3776af771b228a5fad903b0 /net/ceph
parent9dd2845ccb40452d4ac943231ea34aade4a02c68 (diff)
libceph: a major OSD client update
This is a major sync up, up to ~Jewel. The highlights are: - per-session request trees (vs a global per-client tree) - per-session locking (vs a global per-client rwlock) - homeless OSD session - no ad-hoc global per-client lists - support for pool quotas - foundation for watch/notify v2 support - foundation for map check (pool deletion detection) support The switchover is incomplete: lingering requests can be setup and teared down but aren't ever reestablished. This functionality is restored with the introduction of the new lingering infrastructure (ceph_osd_linger_request, linger_work, etc) in a later commit. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/debugfs.c34
-rw-r--r--net/ceph/osd_client.c1164
2 files changed, 587 insertions, 611 deletions
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 6d3ff713edeb..61dbd9de4650 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -182,21 +182,39 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
seq_putc(s, '\n');
}
+static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
+{
+ struct rb_node *n;
+
+ mutex_lock(&osd->lock);
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ dump_request(s, req);
+ }
+
+ mutex_unlock(&osd->lock);
+}
+
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
struct ceph_osd_client *osdc = &client->osdc;
- struct rb_node *p;
-
- mutex_lock(&osdc->request_mutex);
- for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
- struct ceph_osd_request *req;
+ struct rb_node *n;
- req = rb_entry(p, struct ceph_osd_request, r_node);
+ down_read(&osdc->lock);
+ seq_printf(s, "REQUESTS %d homeless %d\n",
+ atomic_read(&osdc->num_requests),
+ atomic_read(&osdc->num_homeless));
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
- dump_request(s, req);
+ dump_requests(s, osd);
}
- mutex_unlock(&osdc->request_mutex);
+ dump_requests(s, &osdc->homeless_osd);
+
+ up_read(&osdc->lock);
return 0;
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d1c8e06f1261..4c856c87b1a9 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,16 +25,6 @@ static struct kmem_cache *ceph_osd_request_cache;
static const struct ceph_connection_operations osd_con_ops;
-static void __send_queued(struct ceph_osd_client *osdc);
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
-static void __register_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
-static void __unregister_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
-static void __unregister_linger_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
-static void __enqueue_request(struct ceph_osd_request *req);
-
/*
* Implement client access to distributed object storage cluster.
*
@@ -53,6 +43,43 @@ static void __enqueue_request(struct ceph_osd_request *req);
* channel with an OSD is reset.
*/
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+
+#if 1
+static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
+{
+ bool wrlocked = true;
+
+ if (unlikely(down_read_trylock(sem))) {
+ wrlocked = false;
+ up_read(sem);
+ }
+
+ return wrlocked;
+}
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
+{
+ WARN_ON(!rwsem_is_locked(&osdc->lock));
+}
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
+{
+ WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
+}
+static inline void verify_osd_locked(struct ceph_osd *osd)
+{
+ struct ceph_osd_client *osdc = osd->o_osdc;
+
+ WARN_ON(!(mutex_is_locked(&osd->lock) &&
+ rwsem_is_locked(&osdc->lock)) &&
+ !rwsem_is_wrlocked(&osdc->lock));
+}
+#else
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
+static inline void verify_osd_locked(struct ceph_osd *osd) { }
+#endif
+
/*
* calculate the mapping of a file extent onto an object, and fill out the
* request accordingly. shorten extent as necessary if it crosses an
@@ -336,18 +363,14 @@ static void ceph_osdc_release_request(struct kref *kref)
dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
req->r_request, req->r_reply);
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
- WARN_ON(!list_empty(&req->r_req_lru_item));
- WARN_ON(!list_empty(&req->r_osd_item));
WARN_ON(!list_empty(&req->r_linger_item));
WARN_ON(!list_empty(&req->r_linger_osd_item));
WARN_ON(req->r_osd);
if (req->r_request)
ceph_msg_put(req->r_request);
- if (req->r_reply) {
- ceph_msg_revoke_incoming(req->r_reply);
+ if (req->r_reply)
ceph_msg_put(req->r_reply);
- }
for (which = 0; which < req->r_num_ops; which++)
osd_req_op_data_release(req, which);
@@ -418,8 +441,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd_item);
- INIT_LIST_HEAD(&req->r_req_lru_item);
- INIT_LIST_HEAD(&req->r_osd_item);
target_init(&req->r_t);
@@ -869,141 +890,11 @@ static bool osd_homeless(struct ceph_osd *osd)
return osd->o_osd == CEPH_HOMELESS_OSD;
}
-static struct ceph_osd_request *
-__lookup_request_ge(struct ceph_osd_client *osdc,
- u64 tid)
-{
- struct ceph_osd_request *req;
- struct rb_node *n = osdc->requests.rb_node;
-
- while (n) {
- req = rb_entry(n, struct ceph_osd_request, r_node);
- if (tid < req->r_tid) {
- if (!n->rb_left)
- return req;
- n = n->rb_left;
- } else if (tid > req->r_tid) {
- n = n->rb_right;
- } else {
- return req;
- }
- }
- return NULL;
-}
-
-static void __kick_linger_request(struct ceph_osd_request *req)
-{
- struct ceph_osd_client *osdc = req->r_osdc;
- struct ceph_osd *osd = req->r_osd;
-
- /*
- * Linger requests need to be resent with a new tid to avoid
- * the dup op detection logic on the OSDs. Achieve this with
- * a re-register dance instead of open-coding.
- */
- ceph_osdc_get_request(req);
- if (!list_empty(&req->r_linger_item))
- __unregister_linger_request(osdc, req);
- else
- __unregister_request(osdc, req);
- __register_request(osdc, req);
- ceph_osdc_put_request(req);
-
- /*
- * Unless request has been registered as both normal and
- * lingering, __unregister{,_linger}_request clears r_osd.
- * However, here we need to preserve r_osd to make sure we
- * requeue on the same OSD.
- */
- WARN_ON(req->r_osd || !osd);
- req->r_osd = osd;
-
- dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
- __enqueue_request(req);
-}
-
-/*
- * Resubmit requests pending on the given osd.
- */
-static void __kick_osd_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *osd)
-{
- struct ceph_osd_request *req, *nreq;
- LIST_HEAD(resend);
- LIST_HEAD(resend_linger);
- int err;
-
- dout("%s osd%d\n", __func__, osd->o_osd);
- err = __reset_osd(osdc, osd);
- if (err)
- return;
-
- /*
- * Build up a list of requests to resend by traversing the
- * osd's list of requests. Requests for a given object are
- * sent in tid order, and that is also the order they're
- * kept on this list. Therefore all requests that are in
- * flight will be found first, followed by all requests that
- * have not yet been sent. And to resend requests while
- * preserving this order we will want to put any sent
- * requests back on the front of the osd client's unsent
- * list.
- *
- * So we build a separate ordered list of already-sent
- * requests for the affected osd and splice it onto the
- * front of the osd client's unsent list. Once we've seen a
- * request that has not yet been sent we're done. Those
- * requests are already sitting right where they belong.
- */
- list_for_each_entry(req, &osd->o_requests, r_osd_item) {
- if (!req->r_sent)
- break;
-
- if (!req->r_linger) {
- dout("%s requeueing %p tid %llu\n", __func__, req,
- req->r_tid);
- list_move_tail(&req->r_req_lru_item, &resend);
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
- } else {
- list_move_tail(&req->r_req_lru_item, &resend_linger);
- }
- }
- list_splice(&resend, &osdc->req_unsent);
-
- /*
- * Both registered and not yet registered linger requests are
- * enqueued with a new tid on the same OSD. We add/move them
- * to req_unsent/o_requests at the end to keep things in tid
- * order.
- */
- list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
- r_linger_osd_item) {
- WARN_ON(!list_empty(&req->r_req_lru_item));
- __kick_linger_request(req);
- }
-
- list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
- __kick_linger_request(req);
-}
-
-/*
- * If the osd connection drops, we need to resubmit all requests.
- */
-static void osd_reset(struct ceph_connection *con)
+static bool osd_registered(struct ceph_osd *osd)
{
- struct ceph_osd *osd = con->private;
- struct ceph_osd_client *osdc;
+ verify_osdc_locked(osd->o_osdc);
- if (!osd)
- return;
- dout("osd_reset osd%d\n", osd->o_osd);
- osdc = osd->o_osdc;
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
- __kick_osd_requests(osdc, osd);
- __send_queued(osdc);
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+ return !RB_EMPTY_NODE(&osd->o_node);
}
/*
@@ -1013,17 +904,18 @@ static void osd_init(struct ceph_osd *osd)
{
atomic_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
- INIT_LIST_HEAD(&osd->o_requests);
+ osd->o_requests = RB_ROOT;
INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru);
INIT_LIST_HEAD(&osd->o_keepalive_item);
osd->o_incarnation = 1;
+ mutex_init(&osd->lock);
}
static void osd_cleanup(struct ceph_osd *osd)
{
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
- WARN_ON(!list_empty(&osd->o_requests));
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests));
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
@@ -1077,30 +969,6 @@ static void put_osd(struct ceph_osd *osd)
DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
-/*
- * remove an osd from our map
- */
-static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
- dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
- WARN_ON(!list_empty(&osd->o_requests));
- WARN_ON(!list_empty(&osd->o_linger_requests));
-
- list_del_init(&osd->o_osd_lru);
- erase_osd(&osdc->osds, osd);
-}
-
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
- dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
-
- if (!RB_EMPTY_NODE(&osd->o_node)) {
- ceph_con_close(&osd->o_con);
- __remove_osd(osdc, osd);
- put_osd(osd);
- }
-}
-
static void __move_osd_to_lru(struct ceph_osd *osd)
{
struct ceph_osd_client *osdc = osd->o_osdc;
@@ -1117,7 +985,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd)
static void maybe_move_osd_to_lru(struct ceph_osd *osd)
{
- if (list_empty(&osd->o_requests) &&
+ if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests))
__move_osd_to_lru(osd);
}
@@ -1135,29 +1003,63 @@ static void __remove_osd_from_lru(struct ceph_osd *osd)
}
/*
+ * Close the connection and assign any leftover requests to the
+ * homeless session.
+ */
+static void close_osd(struct ceph_osd *osd)
+{
+ struct ceph_osd_client *osdc = osd->o_osdc;
+ struct rb_node *n;
+
+ verify_osdc_wrlocked(osdc);
+ dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+ ceph_con_close(&osd->o_con);
+
+ for (n = rb_first(&osd->o_requests); n; ) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ n = rb_next(n); /* unlink_request() */
+
+ dout(" reassigning req %p tid %llu\n", req, req->r_tid);
+ unlink_request(osd, req);
+ link_request(&osdc->homeless_osd, req);
+ }
+
+ __remove_osd_from_lru(osd);
+ erase_osd(&osdc->osds, osd);
+ put_osd(osd);
+}
+
+/*
* reset osd connect
*/
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int reopen_osd(struct ceph_osd *osd)
{
struct ceph_entity_addr *peer_addr;
- dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
- if (list_empty(&osd->o_requests) &&
+ dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+ if (RB_EMPTY_ROOT(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
- remove_osd(osdc, osd);
+ close_osd(osd);
return -ENODEV;
}
- peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+ peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
!ceph_con_opened(&osd->o_con)) {
- struct ceph_osd_request *req;
+ struct rb_node *n;
dout("osd addr hasn't changed and connection never opened, "
"letting msgr retry\n");
/* touch each r_stamp for handle_timeout()'s benfit */
- list_for_each_entry(req, &osd->o_requests, r_osd_item)
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
req->r_stamp = jiffies;
+ }
return -EAGAIN;
}
@@ -1169,73 +1071,84 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
return 0;
}
-/*
- * Register request, assign tid. If this is the first request, set up
- * the timeout event.
- */
-static void __register_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
-{
- req->r_tid = ++osdc->last_tid;
- req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
- dout("__register_request %p tid %lld\n", req, req->r_tid);
- insert_request(&osdc->requests, req);
- ceph_osdc_get_request(req);
- osdc->num_requests++;
-}
-
-/*
- * called under osdc->request_mutex
- */
-static void __unregister_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
+ bool wrlocked)
{
- if (RB_EMPTY_NODE(&req->r_node)) {
- dout("__unregister_request %p tid %lld not registered\n",
- req, req->r_tid);
- return;
- }
+ struct ceph_osd *osd;
- dout("__unregister_request %p tid %lld\n", req, req->r_tid);
- erase_request(&osdc->requests, req);
- osdc->num_requests--;
+ if (wrlocked)
+ verify_osdc_wrlocked(osdc);
+ else
+ verify_osdc_locked(osdc);
- if (req->r_osd) {
- /* make sure the original request isn't in flight. */
- ceph_msg_revoke(req->r_request);
+ if (o != CEPH_HOMELESS_OSD)
+ osd = lookup_osd(&osdc->osds, o);
+ else
+ osd = &osdc->homeless_osd;
+ if (!osd) {
+ if (!wrlocked)
+ return ERR_PTR(-EAGAIN);
- list_del_init(&req->r_osd_item);
- maybe_move_osd_to_lru(req->r_osd);
- if (list_empty(&req->r_linger_osd_item))
- req->r_osd = NULL;
+ osd = create_osd(osdc, o);
+ insert_osd(&osdc->osds, osd);
+ ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+ &osdc->osdmap->osd_addr[osd->o_osd]);
}
- list_del_init(&req->r_req_lru_item);
- ceph_osdc_put_request(req);
+ dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
+ return osd;
}
/*
- * Cancel a previously queued request message
+ * Create request <-> OSD session relation.
+ *
+ * @req has to be assigned a tid, @osd may be homeless.
*/
-static void __cancel_request(struct ceph_osd_request *req)
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
{
- if (req->r_sent && req->r_osd) {
- ceph_msg_revoke(req->r_request);
- req->r_sent = 0;
- }
+ verify_osd_locked(osd);
+ WARN_ON(!req->r_tid || req->r_osd);
+ dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+ req, req->r_tid);
+
+ if (!osd_homeless(osd))
+ __remove_osd_from_lru(osd);
+ else
+ atomic_inc(&osd->o_osdc->num_homeless);
+
+ get_osd(osd);
+ insert_request(&osd->o_requests, req);
+ req->r_osd = osd;
}
-static void __register_linger_request(struct ceph_osd_client *osdc,
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
+{
+ verify_osd_locked(osd);
+ WARN_ON(req->r_osd != osd);
+ dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+ req, req->r_tid);
+
+ req->r_osd = NULL;
+ erase_request(&osd->o_requests, req);
+ put_osd(osd);
+
+ if (!osd_homeless(osd))
+ maybe_move_osd_to_lru(osd);
+ else
+ atomic_dec(&osd->o_osdc->num_homeless);
+}
+
+static void __register_linger_request(struct ceph_osd *osd,
struct ceph_osd_request *req)
{
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
WARN_ON(!req->r_linger);
ceph_osdc_get_request(req);
- list_add_tail(&req->r_linger_item, &osdc->req_linger);
- if (req->r_osd)
- list_add_tail(&req->r_linger_osd_item,
- &req->r_osd->o_linger_requests);
+ list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
+ list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
+ __remove_osd_from_lru(osd);
+ req->r_osd = osd;
}
static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1255,7 +1168,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
if (req->r_osd) {
list_del_init(&req->r_linger_osd_item);
maybe_move_osd_to_lru(req->r_osd);
- if (list_empty(&req->r_osd_item))
+ if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
req->r_osd = NULL;
}
ceph_osdc_put_request(req);
@@ -1291,11 +1204,20 @@ static bool have_pool_full(struct ceph_osd_client *osdc)
return false;
}
+static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
+{
+ struct ceph_pg_pool_info *pi;
+
+ pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+ if (!pi)
+ return false;
+
+ return __pool_full(pi);
+}
+
/*
* Returns whether a request should be blocked from being sent
* based on the current osdmap and osd_client settings.
- *
- * Caller should hold map_sem for read.
*/
static bool target_should_be_paused(struct ceph_osd_client *osdc,
const struct ceph_osd_request_target *t,
@@ -1421,87 +1343,6 @@ out:
return ct_res;
}
-static void __enqueue_request(struct ceph_osd_request *req)
-{
- struct ceph_osd_client *osdc = req->r_osdc;
-
- dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
- req->r_osd ? req->r_osd->o_osd : -1);
-
- if (req->r_osd) {
- __remove_osd_from_lru(req->r_osd);
- list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
- list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
- } else {
- list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
- }
-}
-
-/*
- * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
- * (as needed), and set the request r_osd appropriately. If there is
- * no up osd, set r_osd to NULL. Move the request to the appropriate list
- * (unsent, homeless) or leave on in-flight lru.
- *
- * Return 0 if unchanged, 1 if changed, or negative on error.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __map_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req, int force_resend)
-{
- enum calc_target_result ct_res;
- int err;
-
- dout("map_request %p tid %lld\n", req, req->r_tid);
-
- ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
- switch (ct_res) {
- case CALC_TARGET_POOL_DNE:
- list_move(&req->r_req_lru_item, &osdc->req_notarget);
- return -EIO;
- case CALC_TARGET_NO_ACTION:
- return 0; /* no change */
- default:
- BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
- }
-
- dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
- req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
- req->r_osd ? req->r_osd->o_osd : -1);
-
- if (req->r_osd) {
- __cancel_request(req);
- list_del_init(&req->r_osd_item);
- list_del_init(&req->r_linger_osd_item);
- req->r_osd = NULL;
- }
-
- req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
- if (!req->r_osd && req->r_t.osd >= 0) {
- err = -ENOMEM;
- req->r_osd = create_osd(osdc, req->r_t.osd);
- if (!req->r_osd) {
- list_move(&req->r_req_lru_item, &osdc->req_notarget);
- goto out;
- }
-
- dout("map_request osd %p is osd%d\n", req->r_osd,
- req->r_osd->o_osd);
- insert_osd(&osdc->osds, req->r_osd);
-
- ceph_con_open(&req->r_osd->o_con,
- CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
- &osdc->osdmap->osd_addr[req->r_osd->o_osd]);
- }
-
- __enqueue_request(req);
- err = 1; /* osd or pg changed */
-
-out:
- return err;
-}
-
static void setup_request_data(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
@@ -1648,8 +1489,16 @@ static void send_request(struct ceph_osd_request *req)
{
struct ceph_osd *osd = req->r_osd;
+ verify_osd_locked(osd);
WARN_ON(osd->o_osd != req->r_t.osd);
+ /*
+ * We may have a previously queued request message hanging
+ * around. Cancel it to avoid corrupting the msgr.
+ */
+ if (req->r_sent)
+ ceph_msg_revoke(req->r_request);
+
req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
if (req->r_attempts)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
@@ -1671,24 +1520,11 @@ static void send_request(struct ceph_osd_request *req)
ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
}
-/*
- * Send any requests in the queue (req_unsent).
- */
-static void __send_queued(struct ceph_osd_client *osdc)
-{
- struct ceph_osd_request *req, *tmp;
-
- dout("__send_queued\n");
- list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
- list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
- send_request(req);
- }
-}
-
static void maybe_request_map(struct ceph_osd_client *osdc)
{
bool continuous = false;
+ verify_osdc_locked(osdc);
WARN_ON(!osdc->osdmap->epoch);
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
@@ -1705,38 +1541,121 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
ceph_monc_renew_subs(&osdc->client->monc);
}
-/*
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req,
- bool nofail)
+static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
{
- int rc;
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osd *osd;
+ bool need_send = false;
+ bool promoted = false;
- __register_request(osdc, req);
- req->r_sent = 0;
- req->r_got_reply = 0;
- rc = __map_request(osdc, req, 0);
- if (rc < 0) {
- if (nofail) {
- dout("osdc_start_request failed map, "
- " will retry %lld\n", req->r_tid);
- rc = 0;
- } else {
- __unregister_request(osdc, req);
- }
- return rc;
+ WARN_ON(req->r_tid || req->r_got_reply);
+ dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
+
+again:
+ calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+ osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
+ if (IS_ERR(osd)) {
+ WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
+ goto promote;
}
- if (req->r_osd == NULL) {
- dout("send_request %p no up osds in pg\n", req);
- ceph_monc_request_next_osdmap(&osdc->client->monc);
+ if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+ dout("req %p pausewr\n", req);
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
+ dout("req %p pauserd\n", req);
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
+ CEPH_OSD_FLAG_FULL_FORCE)) &&
+ (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+ pool_full(osdc, req->r_t.base_oloc.pool))) {
+ dout("req %p full/pool_full\n", req);
+ pr_warn_ratelimited("FULL or reached pool quota\n");
+ req->r_t.paused = true;
+ maybe_request_map(osdc);
+ } else if (!osd_homeless(osd)) {
+ need_send = true;
} else {
- __send_queued(osdc);
+ maybe_request_map(osdc);
}
- return 0;
+ mutex_lock(&osd->lock);
+ /*
+ * Assign the tid atomically with send_request() to protect
+ * multiple writes to the same object from racing with each
+ * other, resulting in out of order ops on the OSDs.
+ */
+ req->r_tid = atomic64_inc_return(&osdc->last_tid);
+ link_request(osd, req);
+ if (need_send)
+ send_request(req);
+ mutex_unlock(&osd->lock);
+
+ if (promoted)
+ downgrade_write(&osdc->lock);
+ return;
+
+promote:
+ up_read(&osdc->lock);
+ down_write(&osdc->lock);
+ wrlocked = true;
+ promoted = true;
+ goto again;
+}
+
+static void account_request(struct ceph_osd_request *req)
+{
+ unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+
+ if (req->r_flags & CEPH_OSD_FLAG_READ) {
+ WARN_ON(req->r_flags & mask);
+ req->r_flags |= CEPH_OSD_FLAG_ACK;
+ } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+ WARN_ON(!(req->r_flags & mask));
+ else
+ WARN_ON(1);
+
+ WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
+ atomic_inc(&req->r_osdc->num_requests);
+}
+
+static void submit_request(struct ceph_osd_request *req, bool wrlocked)
+{
+ ceph_osdc_get_request(req);
+ account_request(req);
+ __submit_request(req, wrlocked);
+}
+
+static void __finish_request(struct ceph_osd_request *req)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osd *osd = req->r_osd;
+
+ verify_osd_locked(osd);
+ dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+ unlink_request(osd, req);
+ atomic_dec(&osdc->num_requests);
+
+ /*
+ * If an OSD has failed or returned and a request has been sent
+ * twice, it's possible to get a reply and end up here while the
+ * request message is queued for delivery. We will ignore the
+ * reply, so not a big deal, but better to try and catch it.
+ */
+ ceph_msg_revoke(req->r_request);
+ ceph_msg_revoke_incoming(req->r_reply);
+}
+
+static void finish_request(struct ceph_osd_request *req)
+{
+ __finish_request(req);
+ ceph_osdc_put_request(req);
}
static void __complete_request(struct ceph_osd_request *req)
@@ -1747,6 +1666,13 @@ static void __complete_request(struct ceph_osd_request *req)
complete_all(&req->r_completion);
}
+static void cancel_request(struct ceph_osd_request *req)
+{
+ dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+ finish_request(req);
+}
+
/*
* Timeout callback, called every N seconds. When 1 or more OSD
* requests has been active for more than N seconds, we send a keepalive
@@ -1758,44 +1684,49 @@ static void handle_timeout(struct work_struct *work)
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_options *opts = osdc->client->options;
- struct ceph_osd_request *req;
- struct ceph_osd *osd;
- struct list_head slow_osds;
- dout("timeout\n");
- down_read(&osdc->map_sem);
-
- ceph_monc_request_next_osdmap(&osdc->client->monc);
+ unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
+ LIST_HEAD(slow_osds);
+ struct rb_node *n, *p;
- mutex_lock(&osdc->request_mutex);
+ dout("%s osdc %p\n", __func__, osdc);
+ down_write(&osdc->lock);
/*
* ping osds that are a bit slow. this ensures that if there
* is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
*/
- INIT_LIST_HEAD(&slow_osds);
- list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
- if (time_before(jiffies,
- req->r_stamp + opts->osd_keepalive_timeout))
- break;
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+ bool found = false;
+
+ for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+ struct ceph_osd_request *req =
+ rb_entry(p, struct ceph_osd_request, r_node);
+
+ if (time_before(req->r_stamp, cutoff)) {
+ dout(" req %p tid %llu on osd%d is laggy\n",
+ req, req->r_tid, osd->o_osd);
+ found = true;
+ }
+ }
- osd = req->r_osd;
- BUG_ON(!osd);
- dout(" tid %llu is slow, will send keepalive on osd%d\n",
- req->r_tid, osd->o_osd);
- list_move_tail(&osd->o_keepalive_item, &slow_osds);
+ if (found)
+ list_move_tail(&osd->o_keepalive_item, &slow_osds);
}
+
+ if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
+ maybe_request_map(osdc);
+
while (!list_empty(&slow_osds)) {
- osd = list_entry(slow_osds.next, struct ceph_osd,
- o_keepalive_item);
+ struct ceph_osd *osd = list_first_entry(&slow_osds,
+ struct ceph_osd,
+ o_keepalive_item);
list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}
- __send_queued(osdc);
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
-
+ up_write(&osdc->lock);
schedule_delayed_work(&osdc->timeout_work,
osdc->client->options->osd_keepalive_timeout);
}
@@ -1809,18 +1740,17 @@ static void handle_osds_timeout(struct work_struct *work)
struct ceph_osd *osd, *nosd;
dout("%s osdc %p\n", __func__, osdc);
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
-
+ down_write(&osdc->lock);
list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
if (time_before(jiffies, osd->lru_ttl))
break;
- remove_osd(osdc, osd);
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
+ WARN_ON(!list_empty(&osd->o_linger_requests));
+ close_osd(osd);
}
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+ up_write(&osdc->lock);
schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(delay));
}
@@ -2045,8 +1975,9 @@ static bool done_request(const struct ceph_osd_request *req,
* when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
* r_safe_completion r_safe_completion
*/
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
{
+ struct ceph_osd_client *osdc = osd->o_osdc;
struct ceph_osd_request *req;
struct MOSDOpReply m;
u64 tid = le64_to_cpu(msg->hdr.tid);
@@ -2057,14 +1988,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
dout("%s msg %p tid %llu\n", __func__, msg, tid);
- down_read(&osdc->map_sem);
- mutex_lock(&osdc->request_mutex);
- req = lookup_request(&osdc->requests, tid);
+ down_read(&osdc->lock);
+ if (!osd_registered(osd)) {
+ dout("%s osd%d unknown\n", __func__, osd->o_osd);
+ goto out_unlock_osdc;
+ }
+ WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+ mutex_lock(&osd->lock);
+ req = lookup_request(&osd->o_requests, tid);
if (!req) {
- dout("%s no tid %llu\n", __func__, tid);
- goto out_unlock;
+ dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
+ goto out_unlock_session;
}
- ceph_osdc_get_request(req);
ret = decode_MOSDOpReply(msg, &m);
if (ret) {
@@ -2083,7 +2019,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
req, req->r_tid, m.retry_attempt,
req->r_attempts - 1);
- goto out_put;
+ goto out_unlock_session;
}
} else {
WARN_ON(1); /* MOSDOpReply v4 is assumed */
@@ -2092,22 +2028,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
if (!ceph_oloc_empty(&m.redirect.oloc)) {
dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
m.redirect.oloc.pool);
- __unregister_request(osdc, req);
+ unlink_request(osd, req);
+ mutex_unlock(&osd->lock);
ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
-
- /*
- * Start redirect requests with nofail=true. If
- * mapping fails, request will end up on the notarget
- * list, waiting for the new osdmap (which can take
- * a while), even though the original request mapped
- * successfully. In the future we might want to follow
- * original request's nofail setting here.
- */
- ret = __ceph_osdc_start_request(osdc, req, true);
- BUG_ON(ret);
-
- goto out_put;
+ req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
+ req->r_tid = 0;
+ __submit_request(req, false);
+ goto out_unlock_osdc;
}
if (m.num_ops != req->r_num_ops) {
@@ -2137,19 +2065,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
req->r_got_reply = true;
} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
dout("req %p tid %llu dup ack\n", req, req->r_tid);
- goto out_put;
+ goto out_unlock_session;
}
if (done_request(req, &m)) {
- __unregister_request(osdc, req);
+ __finish_request(req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
- __register_linger_request(osdc, req);
+ __register_linger_request(osd, req);
}
}
- mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+ mutex_unlock(&osd->lock);
+ up_read(&osdc->lock);
if (done_request(req, &m)) {
if