summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-01-30 07:54:34 -0600
committerAlex Elder <elder@inktank.com>2013-01-30 07:54:34 -0600
commit969e5aa3b0162a02c4f287d48ff58ca2145acf1b (patch)
tree1af8e8e47e7352c6d3b4abfdb4aea6bd9458666f
parent949db153b6466c6f7cad5a427ecea94985927311 (diff)
parent1ec3911dbd19076bcdfe5540096ff67f91a6ec02 (diff)
Merge branch 'testing' of github.com:ceph/ceph-client into v3.8-rc5-testing
-rw-r--r--drivers/block/rbd.c855
-rw-r--r--fs/ceph/caps.c32
-rw-r--r--fs/ceph/file.c6
-rw-r--r--fs/ceph/ioctl.c2
-rw-r--r--fs/ceph/mds_client.c33
-rw-r--r--fs/ceph/mds_client.h6
-rw-r--r--include/linux/ceph/ceph_features.h8
-rw-r--r--include/linux/ceph/decode.h29
-rw-r--r--include/linux/ceph/osd_client.h24
-rw-r--r--include/linux/ceph/osdmap.h2
-rw-r--r--include/linux/crush/crush.h2
-rw-r--r--net/ceph/crush/mapper.c15
-rw-r--r--net/ceph/osd_client.c206
-rw-r--r--net/ceph/osdmap.c43
14 files changed, 652 insertions, 611 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89576a0b3f2e..668936381ab0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -52,9 +52,12 @@
#define SECTOR_SHIFT 9
#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
-/* It might be useful to have this defined elsewhere too */
+/* It might be useful to have these defined elsewhere */
-#define U64_MAX ((u64) (~0ULL))
+#define U8_MAX ((u8) (~0U))
+#define U16_MAX ((u16) (~0U))
+#define U32_MAX ((u32) (~0U))
+#define U64_MAX ((u64) (~0ULL))
#define RBD_DRV_NAME "rbd"
#define RBD_DRV_NAME_LONG "rbd (rados block device)"
@@ -66,7 +69,6 @@
(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
-#define RBD_MAX_OPT_LEN 1024
#define RBD_SNAP_HEAD_NAME "-"
@@ -93,8 +95,6 @@
#define DEV_NAME_LEN 32
#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
-#define RBD_READ_ONLY_DEFAULT false
-
/*
* block device image metadata (in-memory version)
*/
@@ -119,16 +119,33 @@ struct rbd_image_header {
* An rbd image specification.
*
* The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
- * identify an image.
+ * identify an image. Each rbd_dev structure includes a pointer to
+ * an rbd_spec structure that encapsulates this identity.
+ *
+ * Each of the id's in an rbd_spec has an associated name. For a
+ * user-mapped image, the names are supplied and the id's associated
+ * with them are looked up. For a layered image, a parent image is
+ * defined by the tuple, and the names are looked up.
+ *
+ * An rbd_dev structure contains a parent_spec pointer which is
+ * non-null if the image it represents is a child in a layered
+ * image. This pointer will refer to the rbd_spec structure used
+ * by the parent rbd_dev for its own identity (i.e., the structure
+ * is shared between the parent and child).
+ *
+ * Since these structures are populated once, during the discovery
+ * phase of image construction, they are effectively immutable so
+ * we make no effort to synchronize access to them.
+ *
+ * Note that code herein does not assume the image name is known (it
+ * could be a null pointer).
*/
struct rbd_spec {
u64 pool_id;
char *pool_name;
char *image_id;
- size_t image_id_len;
char *image_name;
- size_t image_name_len;
u64 snap_id;
char *snap_name;
@@ -136,10 +153,6 @@ struct rbd_spec {
struct kref kref;
};
-struct rbd_options {
- bool read_only;
-};
-
/*
* an instance of the client. multiple devices may share an rbd client.
*/
@@ -154,7 +167,7 @@ struct rbd_client {
*/
struct rbd_req_status {
int done;
- int rc;
+ s32 rc;
u64 bytes;
};
@@ -212,11 +225,13 @@ struct rbd_device {
spinlock_t lock; /* queue lock */
struct rbd_image_header header;
- bool exists;
+ atomic_t exists;
struct rbd_spec *spec;
char *header_name;
+ struct ceph_file_layout layout;
+
struct ceph_osd_event *watch_event;
struct ceph_osd_request *watch_request;
@@ -277,6 +292,33 @@ static struct device rbd_root_dev = {
.release = rbd_root_dev_release,
};
+static __printf(2, 3)
+void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
+{
+ struct va_format vaf;
+ va_list args;
+
+ va_start(args, fmt);
+ vaf.fmt = fmt;
+ vaf.va = &args;
+
+ if (!rbd_dev)
+ printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
+ else if (rbd_dev->disk)
+ printk(KERN_WARNING "%s: %s: %pV\n",
+ RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
+ else if (rbd_dev->spec && rbd_dev->spec->image_name)
+ printk(KERN_WARNING "%s: image %s: %pV\n",
+ RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
+ else if (rbd_dev->spec && rbd_dev->spec->image_id)
+ printk(KERN_WARNING "%s: id %s: %pV\n",
+ RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
+ else /* punt */
+ printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
+ RBD_DRV_NAME, rbd_dev, &vaf);
+ va_end(args);
+}
+
#ifdef RBD_DEBUG
#define rbd_assert(expr) \
if (unlikely(!(expr))) { \
@@ -426,6 +468,12 @@ static match_table_t rbd_opts_tokens = {
{-1, NULL}
};
+struct rbd_options {
+ bool read_only;
+};
+
+#define RBD_READ_ONLY_DEFAULT false
+
static int parse_rbd_opts_token(char *c, void *private)
{
struct rbd_options *rbd_opts = private;
@@ -707,7 +755,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
goto done;
rbd_dev->mapping.read_only = true;
}
- rbd_dev->exists = true;
+ atomic_set(&rbd_dev->exists, 1);
done:
return ret;
}
@@ -724,7 +772,7 @@ static void rbd_header_free(struct rbd_image_header *header)
header->snapc = NULL;
}
-static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
+static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
{
char *name;
u64 segment;
@@ -772,6 +820,7 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
{
u64 start_seg;
u64 end_seg;
+ u64 result;
if (!len)
return 0;
@@ -781,7 +830,11 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
start_seg = ofs >> header->obj_order;
end_seg = (ofs + len - 1) >> header->obj_order;
- return end_seg - start_seg + 1;
+ result = end_seg - start_seg + 1;
+ if (result > (u64) INT_MAX)
+ return -ERANGE;
+
+ return (int) result;
}
/*
@@ -949,8 +1002,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
unsigned int bi_size;
struct bio *bio;
- if (!bi)
+ if (!bi) {
+ rbd_warn(NULL, "bio_chain exhausted with %u left", len);
goto out_err; /* EINVAL; ran out of bio's */
+ }
bi_size = min_t(unsigned int, bi->bi_size - off, len);
bio = bio_clone_range(bi, off, bi_size, gfpmask);
if (!bio)
@@ -976,44 +1031,84 @@ out_err:
return NULL;
}
-/*
- * helpers for osd request op vectors.
- */
-static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
- int opcode, u32 payload_len)
+struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
{
- struct ceph_osd_req_op *ops;
+ struct ceph_osd_req_op *op;
+ va_list args;
+ size_t size;
- ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
- if (!ops)
+ op = kzalloc(sizeof (*op), GFP_NOIO);
+ if (!op)
return NULL;
+ op->op = opcode;
+ va_start(args, opcode);
+ switch (opcode) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ /* rbd_osd_req_op_create(READ, offset, length) */
+ /* rbd_osd_req_op_create(WRITE, offset, length) */
+ op->extent.offset = va_arg(args, u64);
+ op->extent.length = va_arg(args, u64);
+ if (opcode == CEPH_OSD_OP_WRITE)
+ op->payload_len = op->extent.length;
+ break;
+ case CEPH_OSD_OP_CALL:
+ /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
+ op->cls.class_name = va_arg(args, char *);
+ size = strlen(op->cls.class_name);
+ rbd_assert(size <= (size_t) U8_MAX);
+ op->cls.class_len = size;
+ op->payload_len = size;
+
+ op->cls.method_name = va_arg(args, char *);
+ size = strlen(op->cls.method_name);
+ rbd_assert(size <= (size_t) U8_MAX);
+ op->cls.method_len = size;
+ op->payload_len += size;
+
+ op->cls.argc = 0;
+ op->cls.indata = va_arg(args, void *);
+ size = va_arg(args, size_t);
+ rbd_assert(size <= (size_t) U32_MAX);
+ op->cls.indata_len = (u32) size;
+ op->payload_len += size;
+ break;
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ case CEPH_OSD_OP_WATCH:
+ /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
+ /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
+ op->watch.cookie = va_arg(args, u64);
+ op->watch.ver = va_arg(args, u64);
+ op->watch.ver = cpu_to_le64(op->watch.ver);
+ if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
+ op->watch.flag = (u8) 1;
+ break;
+ default:
+ rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
+ kfree(op);
+ op = NULL;
+ break;
+ }
+ va_end(args);
- ops[0].op = opcode;
-
- /*
- * op extent offset and length will be set later on
- * in calc_raw_layout()
- */
- ops[0].payload_len = payload_len;
-
- return ops;
+ return op;
}
-static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
+static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
{
- kfree(ops);
+ kfree(op);
}
static void rbd_coll_end_req_index(struct request *rq,
struct rbd_req_coll *coll,
int index,
- int ret, u64 len)
+ s32 ret, u64 len)
{
struct request_queue *q;
int min, max, i;
dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
- coll, index, ret, (unsigned long long) len);
+ coll, index, (int)ret, (unsigned long long)len);
if (!rq)
return;
@@ -1034,7 +1129,7 @@ static void rbd_coll_end_req_index(struct request *rq,
max++;
for (i = min; i<max; i++) {
- __blk_end_request(rq, coll->status[i].rc,
+ __blk_end_request(rq, (int)coll->status[i].rc,
coll->status[i].bytes);
coll->num_done++;
kref_put(&coll->kref, rbd_coll_release);
@@ -1042,10 +1137,12 @@ static void rbd_coll_end_req_index(struct request *rq,
spin_unlock_irq(q->queue_lock);
}
-static void rbd_coll_end_req(struct rbd_request *req,
- int ret, u64 len)
+static void rbd_coll_end_req(struct rbd_request *rbd_req,
+ s32 ret, u64 len)
{
- rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
+ rbd_coll_end_req_index(rbd_req->rq,
+ rbd_req->coll, rbd_req->coll_index,
+ ret, len);
}
/*
@@ -1060,117 +1157,102 @@ static int rbd_do_request(struct request *rq,
struct page **pages,
int num_pages,
int flags,
- struct ceph_osd_req_op *ops,
+ struct ceph_osd_req_op *op,
struct rbd_req_coll *coll,
int coll_index,
- void (*rbd_cb)(struct ceph_osd_request *req,
- struct ceph_msg *msg),
- struct ceph_osd_request **linger_req,
+ void (*rbd_cb)(struct ceph_osd_request *,
+ struct ceph_msg *),
u64 *ver)
{
- struct ceph_osd_request *req;
- struct ceph_file_layout *layout;
- int ret;
- u64 bno;
- struct timespec mtime = CURRENT_TIME;
- struct rbd_request *req_data;
- struct ceph_osd_request_head *reqhead;
struct ceph_osd_client *osdc;
-
- req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
- if (!req_data) {
- if (coll)
- rbd_coll_end_req_index(rq, coll, coll_index,
- -ENOMEM, len);
- return -ENOMEM;
- }
-
- if (coll) {
- req_data->coll = coll;
- req_data->coll_index = coll_index;
- }
+ struct ceph_osd_request *osd_req;
+ struct rbd_request *rbd_req = NULL;
+ struct timespec mtime = CURRENT_TIME;
+ int ret;
dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
object_name, (unsigned long long) ofs,
(unsigned long long) len, coll, coll_index);
osdc = &rbd_dev->rbd_client->client->osdc;
- req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
- false, GFP_NOIO, pages, bio);
- if (!req) {
- ret = -ENOMEM;
- goto done_pages;
- }
-
- req->r_callback = rbd_cb;
+ osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
+ if (!osd_req)
+ return -ENOMEM;
- req_data->rq = rq;
- req_data->bio = bio;
- req_data->pages = pages;
- req_data->len = len;
+ osd_req->r_flags = flags;
+ osd_req->r_pages = pages;
+ if (bio) {
+ osd_req->r_bio = bio;
+ bio_get(osd_req->r_bio);
+ }
- req->r_priv = req_data;
+ if (coll) {
+ ret = -ENOMEM;
+ rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
+ if (!rbd_req)
+ goto done_osd_req;
+
+ rbd_req->rq = rq;
+ rbd_req->bio = bio;
+ rbd_req->pages = pages;
+ rbd_req->len = len;
+ rbd_req->coll = coll;
+ rbd_req->coll_index = coll_index;
+ }
- reqhead = req->r_request->front.iov_base;
- reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+ osd_req->r_callback = rbd_cb;
+ osd_req->r_priv = rbd_req;
- strncpy(req->r_oid, object_name, sizeof(req->r_oid));
- req->r_oid_len = strlen(req->r_oid);
+ strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
+ osd_req->r_oid_len = strlen(osd_req->r_oid);
- layout = &req->r_file_layout;
- memset(layout, 0, sizeof(*layout));
- layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
- layout->fl_stripe_count = cpu_to_le32(1);
- layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
- layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
- ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
- req, ops);
- rbd_assert(ret == 0);
+ osd_req->r_file_layout = rbd_dev->layout; /* struct */
+ osd_req->r_num_pages = calc_pages_for(ofs, len);
+ osd_req->r_page_alignment = ofs & ~PAGE_MASK;
- ceph_osdc_build_request(req, ofs, &len,
- ops,
- snapc,
- &mtime,
- req->r_oid, req->r_oid_len);
+ ceph_osdc_build_request(osd_req, ofs, len, 1, op,
+ snapc, snapid, &mtime);
- if (linger_req) {
- ceph_osdc_set_request_linger(osdc, req);
- *linger_req = req;
+ if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
+ ceph_osdc_set_request_linger(osdc, osd_req);
+ rbd_dev->watch_request = osd_req;
}
- ret = ceph_osdc_start_request(osdc, req, false);
+ ret = ceph_osdc_start_request(osdc, osd_req, false);
if (ret < 0)
goto done_err;
if (!rbd_cb) {
- ret = ceph_osdc_wait_request(osdc, req);
+ u64 version;
+
+ ret = ceph_osdc_wait_request(osdc, osd_req);
+ version = le64_to_cpu(osd_req->r_reassert_version.version);
if (ver)
- *ver = le64_to_cpu(req->r_reassert_version.version);
- dout("reassert_ver=%llu\n",
- (unsigned long long)
- le64_to_cpu(req->r_reassert_version.version));
- ceph_osdc_put_request(req);
+ *ver = version;
+ dout("reassert_ver=%llu\n", (unsigned long long) version);
+ ceph_osdc_put_request(osd_req);
}
return ret;
done_err:
- bio_chain_put(req_data->bio);
- ceph_osdc_put_request(req);
-done_pages:
- rbd_coll_end_req(req_data, ret, len);
- kfree(req_data);
+ if (bio)
+ bio_chain_put(osd_req->r_bio);
+ kfree(rbd_req);
+done_osd_req:
+ ceph_osdc_put_request(osd_req);
+
return ret;
}
/*
* Ceph osd op callback
*/
-static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
{
- struct rbd_request *req_data = req->r_priv;
+ struct rbd_request *rbd_req = osd_req->r_priv;
struct ceph_osd_reply_head *replyhead;
struct ceph_osd_op *op;
- __s32 rc;
+ s32 rc;
u64 bytes;
int read_op;
@@ -1178,68 +1260,66 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
replyhead = msg->front.iov_base;
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
op = (void *)(replyhead + 1);
- rc = le32_to_cpu(replyhead->result);
+ rc = (s32)le32_to_cpu(replyhead->result);
bytes = le64_to_cpu(op->extent.length);
read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
(unsigned long long) bytes, read_op, (int) rc);
- if (rc == -ENOENT && read_op) {
- zero_bio_chain(req_data->bio, 0);
+ if (rc == (s32)-ENOENT && read_op) {
+ zero_bio_chain(rbd_req->bio, 0);
rc = 0;
- } else if (rc == 0 && read_op && bytes < req_data->len) {
- zero_bio_chain(req_data->bio, bytes);
- bytes = req_data->len;
+ } else if (rc == 0 && read_op && bytes < rbd_req->len) {
+ zero_bio_chain(rbd_req->bio, bytes);
+ bytes = rbd_req->len;
}
- rbd_coll_end_req(req_data, rc, bytes);
+ rbd_coll_end_req(rbd_req, rc, bytes);
- if (req_data->bio)
- bio_chain_put(req_data->bio);
+ if (rbd_req->bio)
+ bio_chain_put(rbd_req->bio);
- ceph_osdc_put_request(req);
- kfree(req_data);
+ ceph_osdc_put_request(osd_req);
+ kfree(rbd_req);
}
-static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
+ struct ceph_msg *msg)
{
- ceph_osdc_put_request(req);
+ ceph_osdc_put_request(osd_req);
}
/*
* Do a synchronous ceph osd operation
*/
static int rbd_req_sync_op(struct rbd_device *rbd_dev,
- struct ceph_snap_context *snapc,
- u64 snapid,
int flags,
- struct ceph_osd_req_op *ops,
+ struct ceph_osd_req_op *op,
const char *object_name,
u64 ofs, u64 inbound_size,
char *inbound,
- struct ceph_osd_request **linger_req,
u64 *ver)
{
int ret;
struct page **pages;
int num_pages;
- rbd_assert(ops != NULL);
+ rbd_assert(op != NULL);
num_pages = calc_pages_for(ofs, inbound_size);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
- ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
+ ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
object_name, ofs, inbound_size, NULL,
pages, num_pages,
flags,
- ops,
+ op,
NULL, 0,
NULL,
- linger_req, ver);
+ ver);
if (ret < 0)
goto done;
@@ -1262,12 +1342,11 @@ static int rbd_do_op(struct request *rq,
struct rbd_req_coll *coll,
int coll_index)
{
- char *seg_name;
+ const char *seg_name;
u64 seg_ofs;
u64 seg_len;
int ret;
- struct ceph_osd_req_op *ops;
- u32 payload_len;
+ struct ceph_osd_req_op *op;
int opcode;
int flags;
u64 snapid;
@@ -1282,18 +1361,16 @@ static int rbd_do_op(struct request *rq,
opcode = CEPH_OSD_OP_WRITE;
flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
snapid = CEPH_NOSNAP;
- payload_len = seg_len;
} else {
opcode = CEPH_OSD_OP_READ;
flags = CEPH_OSD_FLAG_READ;
- snapc = NULL;
+ rbd_assert(!snapc);
snapid = rbd_dev->spec->snap_id;
- payload_len = 0;
}
ret = -ENOMEM;
- ops = rbd_create_rw_ops(1, opcode, payload_len);
- if (!ops)
+ op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
+ if (!op)
goto done;
/* we've taken care of segment sizes earlier when we
@@ -1306,11 +1383,13 @@ static int rbd_do_op(struct request *rq,
bio,
NULL, 0,
flags,
- ops,
+ op,
coll, coll_index,
- rbd_req_cb, 0, NULL);
-
- rbd_destroy_ops(ops);
+ rbd_req_cb, NULL);
+ if (ret < 0)
+ rbd_coll_end_req_index(rq, coll, coll_index,
+ (s32)ret, seg_len);
+ rbd_osd_req_op_destroy(op);
done:
kfree(seg_name);
return ret;
@@ -1320,24 +1399,21 @@ done:
* Request sync osd read
*/
static int rbd_req_sync_read(struct rbd_device *rbd_dev,
- u64 snapid,
const char *object_name,
u64 ofs, u64 len,
char *buf,
u64 *ver)
{
- struct ceph_osd_req_op *ops;
+ struct ceph_osd_req_op *op;
int ret;
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
- if (!ops)
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
+ if (!op)
return -ENOMEM;
- ret = rbd_req_sync_op(rbd_dev, NULL,
- snapid,
- CEPH_OSD_FLAG_READ,
- ops, object_name, ofs, len, buf, NULL, ver);
- rbd_destroy_ops(ops);
+ ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
+ op, object_name, ofs, len, buf, ver);
+ rbd_osd_req_op_destroy(op);
return ret;
}
@@ -1349,26 +1425,23 @@ static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
u64 ver,
u64 notify_id)
{
- struct ceph_osd_req_op *ops;
+ struct ceph_osd_req_op *op;
int ret;
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
- if (!ops)
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
+ if (!op)
return -ENOMEM;
- ops[0].watch.ver = cpu_to_le64(ver);
- ops[0].watch.cookie = notify_id;
- ops[0].watch.flag = 0;
-
ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
rbd_dev->header_name, 0, 0, NULL,
NULL, 0,
CEPH_OSD_FLAG_READ,
- ops,
+ op,
NULL, 0,
- rbd_simple_req_cb, 0, NULL);
+ rbd_simple_req_cb, NULL);
+
+ rbd_osd_req_op_destroy(op);
- rbd_destroy_ops(ops);
return ret;
}
@@ -1386,83 +1459,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
(unsigned int) opcode);
rc = rbd_dev_refresh(rbd_dev, &hver);
if (rc)
- pr_warning(RBD_DRV_NAME "%d got notification but failed to "
- " update snaps: %d\n", rbd_dev->major, rc);
+ rbd_warn(rbd_dev, "got notification but failed to "
+ " update snaps: %d\n", rc);
rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
}
/*
- * Request sync osd watch
+ * Request sync osd watch/unwatch. The value of "start" determines
+ * whether a watch request is being initiated or torn down.
*/
-static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
+static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
{
- struct ceph_osd_req_op *ops;
- struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- int ret;
+ struct ceph_osd_req_op *op;
+ int ret = 0;
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
- if (!ops)
- return -ENOMEM;
+ rbd_assert(start ^ !!rbd_dev->watch_event);
+ rbd_assert(start ^ !!rbd_dev->watch_request);
- ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
- (void *)rbd_dev, &rbd_dev->watch_event);
- if (ret < 0)
- goto fail;
+ if (start) {
+ struct ceph_osd_client *osdc;
- ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
- ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
- ops[0].watch.flag = 1;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
+ &rbd_dev->watch_event);
+ if (ret < 0)
+ return ret;
+ }
- ret = rbd_req_sync_op(rbd_dev, NULL,
- CEPH_NOSNAP,
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
+ rbd_dev->watch_event->cookie,
+ rbd_dev->header.obj_version, start);
+ if (op)
+ ret = rbd_req_sync_op(rbd_dev,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ops,
- rbd_dev->header_name,
- 0, 0, NULL,
- &rbd_dev->watch_request, NULL);
+ op, rbd_dev->header_name,
+ 0, 0, NULL, NULL);
- if (ret < 0)
- goto fail_event;
-
- rbd_destroy_ops(ops);
- return 0;
-
-fail_event:
- ceph_osdc_cancel_event(rbd_dev->watch_event);
- rbd_dev->watch_event = NULL;
-fail:
- rbd_destroy_ops(ops);
- return ret;
-}
-
-/*
- * Request sync osd unwatch
- */
-static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
-{
- struct ceph_osd_req_op *ops;
- int ret;
-
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
- if (!ops)
- return -ENOMEM;
-
- ops[0].watch.ver = 0;
- ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
- ops[0].watch.flag = 0;
-
- ret = rbd_req_sync_op(rbd_dev, NULL,
- CEPH_NOSNAP,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ops,
- rbd_dev->header_name,
- 0, 0, NULL, NULL, NULL);
+ /* Cancel the event if we're tearing down, or on error */
+ if (!start || !op || ret < 0) {
+ ceph_osdc_cancel_event(rbd_dev->watch_event);
+ rbd_dev->watch_event = NULL;
+ }
+ rbd_osd_req_op_destroy(op);
- rbd_destroy_ops(ops);
- ceph_osdc_cancel_event(rbd_dev->watch_event);
- rbd_dev->watch_event = NULL;
return ret;
}
@@ -1477,13 +1518,9 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
size_t outbound_size,
char *inbound,
size_t inbound_size,
- int flags,
u64 *ver)
{
- struct ceph_osd_req_op *ops;
- int class_name_len = strlen(class_name);
- int method_name_len = strlen(method_name);
- int payload_size;
+ struct ceph_osd_req_op *op;
int ret;
/*
@@ -1494,26 +1531,16 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
* the perspective of the server side) in the OSD request
* operation.
*/
- payload_size = class_name_len + method_name_len + outbound_size;
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
- if (!ops)
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
+ method_name, outbound, outbound_size);
+ if (!op)
return -ENOMEM;
- ops[0].cls.class_name = class_name;
- ops[0].cls.class_len = (__u8) class_name_len;
- ops[0].cls.method_name = method_name;
- ops[0].cls.method_len = (__u8) method_name_len;
- ops[0].cls.argc = 0;
- ops[0].cls.indata = outbound;
- ops[0].cls.indata_len = outbound_size;
-
- ret = rbd_req_sync_op(rbd_dev, NULL,
- CEPH_NOSNAP,
- flags, ops,
+ ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
object_name, 0, inbound_size, inbound,
- NULL, ver);
+ ver);
- rbd_destroy_ops(ops);
+ rbd_osd_req_op_destroy(op);
dout("cls_exec returned %d\n", ret);
return ret;
@@ -1533,113 +1560,123 @@ static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
return coll;
}
+static int rbd_dev_do_request(struct request *rq,
+ struct rbd_device *rbd_dev,
+ struct ceph_snap_context *snapc,
+ u64 ofs, unsigned int size,
+ struct bio *bio_chain)
+{
+ int num_segs;
+ struct rbd_req_coll *coll;
+ unsigned int bio_offset;
+ int cur_seg = 0;
+
+ dout("%s 0x%x bytes at 0x%llx\n",
+ rq_data_dir(rq) == WRITE ? "write" : "read",
+ size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
+
+ num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
+ if (num_segs <= 0)
+ return num_segs;
+
+ coll = rbd_alloc_coll(num_segs);
+ if (!coll)
+ return -ENOMEM;
+
+ bio_offset = 0;
+ do {
+ u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+ unsigned int clone_size;
+ struct bio *bio_clone;
+
+ BUG_ON(limit > (u64)UINT_MAX);
+ clone_size = (unsigned int)limit;
+ dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
+
+ kref_get(&coll->kref);
+
+ /* Pass a cloned bio chain via an osd request */
+
+ bio_clone = bio_chain_clone_range(&bio_chain,
+ &bio_offset, clone_size,
+ GFP_ATOMIC);
+ if (bio_clone)
+ (void)rbd_do_op(rq, rbd_dev, snapc,
+ ofs, clone_size,
+ bio_clone, coll, cur_seg);
+ else
+ rbd_coll_end_req_index(rq, coll, cur_seg,
+ (s32)-ENOMEM,
+ clone_size);
+ size -= clone_size;
+ ofs += clone_size;
+
+ cur_seg++;
+ } while (size > 0);
+ kref_put(&coll->kref, rbd_coll_release);
+
+ return 0;
+}
+
/*
* block device queue callback
*/
static void rbd_rq_fn(struct request_queue *q)
{
struct rbd_device *rbd_dev = q->queuedata;
+ bool read_only = rbd_dev->mapping.read_only;
struct request *rq;
while ((rq = blk_fetch_request(q))) {
- struct bio *bio;
- bool do_write;
- unsigned int size;
- u64 ofs;
- int num_segs, cur_seg = 0;
- struct rbd_req_coll *coll;
- struct ceph_snap_context *snapc;
- unsigned int bio_offset;
+ struct ceph_snap_context *snapc = NULL;
+ unsigned int size = 0;
+ int result;
dout("fetched request\n");
- /* filter out block requests we don't understand */
+ /* Filter out block requests we don't understand */
+
if ((rq->cmd_type != REQ_TYPE_FS)) {
__blk_end_request_all(rq, 0);
continue;
}
-
- /* deduce our operation (read, write) */
- do_write = (rq_data_dir(rq) == WRITE);
- if (do_write && rbd_dev->mapping.read_only) {
- __blk_end_request_all(rq, -EROFS);
- continue;
- }
-
spin_unlock_irq(q->queue_lock);
- down_read(&rbd_dev->header_rwsem);
-
- if (!rbd_dev->exists) {
- rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+ /* Write requests need a reference to the snapshot context */
+
+ if (rq_data_dir(rq) == WRITE) {
+ result = -EROFS;
+ if (read_only) /* Can't write to a read-only device */
+ goto out_end_request;
+
+ /*
+ * Note that each osd request will take its
+ * own reference to the snapshot context
+ * supplied. The reference we take here
+ * just guarantees the one we provide stays
+ * valid.
+ */
+ down_read(&rbd_dev->header_rwsem);
+ snapc = ceph_get_snap_context(rbd_dev->header.snapc);
up_read(&rbd_dev->header_rwsem);
+ rbd_assert(snapc != NULL);
+ } else if (!atomic_read(&rbd_dev->exists)) {
+ rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
dout("request for non-existent snapshot");
- spin_lock_irq(q->queue_lock);
- __blk_end_request_all(rq, -ENXIO);
- continue;
+ result = -ENXIO;
+ goto out_end_request;
}
- snapc = ceph_get_snap_context(rbd_dev->header.snapc);
-
- up_read(&rbd_dev->header_rwsem);
-
size = blk_rq_bytes(rq);
- ofs = blk_rq_pos(rq) * SECTOR_SIZE;
- bio = rq->bio;
-
- dout("%s 0x%x bytes at 0x%llx\n",
- do_write ? "write" : "read",
- size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
-
- num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
- if (num_segs <= 0) {
- spin_lock_irq(q->queue_lock);
- __blk_end_request_all(rq, num_segs);
+ result = rbd_dev_do_request(rq, rbd_dev, snapc,
+ blk_rq_pos(rq) * SECTOR_SIZE,
+ size, rq->bio);
+out_end_request:
+ if (snapc)
ceph_put_snap_context(snapc);
- continue;
- }
- coll = rbd_alloc_coll(num_segs);
- if (!coll) {
- spin_lock_irq(q->queue_lock);
- __blk_end_request_all(rq, -ENOMEM);
- ceph_put_snap_context(snapc);
- continue;
- }
-
- bio_offset = 0;
- do {
- u64 limit = rbd_segment_length(rbd_dev, ofs, size);
- unsigned int chain_size;
- struct bio *bio_chain;
-
- BUG_ON(limit > (u64) UINT_MAX);
- chain_size = (unsigned int) limit;
- dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-
- kref_get(&coll->kref);
-
- /* Pass a cloned bio chain via an osd request */
-
- bio_chain = bio_chain_clone_range(&bio,
- &bio_offset, chain_size,
- GFP_ATOMIC);
- if (bio_chain)
- (void) rbd_do_op(rq, rbd_dev, snapc,
- ofs, chain_size,
- bio_chain, coll, cur_seg);
- else
- rbd_coll_end_req_index(rq, coll, cur_seg,
- -ENOMEM, chain_size);
- size -= chain_size;
- ofs += chain_size;
-
- cur_seg++;
- } while (size > 0);
- kref_put(&coll->kref, rbd_coll_release);
-
spin_lock_irq(q->queue_lock);
-
- cep