diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2013-05-06 13:11:19 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2013-05-06 13:11:19 -0700 |
commit | 91f8575685e35f3bd021286bc82d26397458f5a9 (patch) | |
tree | 09de8d889758a12071adb9427ed741e27c907aa6 /drivers/block | |
parent | 2e378f3eebd28feefbb1f9953834a5a19482f053 (diff) | |
parent | b5b09be30cf99f9c699e825629f02e3bce555d44 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Alex Elder:
"This is a big pull.
Most of it is culmination of Alex's work to implement RBD image
layering, which is now complete (yay!).
There is also some work from Yan to fix i_mutex behavior surrounding
writes in cephfs, a sync write fix, a fix for RBD images that get
resized while they are mapped, and a few patches from me that resolve
annoying auth warnings and fix several bugs in the ceph auth code."
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits)
rbd: fix image request leak on parent read
libceph: use slab cache for osd client requests
libceph: allocate ceph message data with a slab allocator
libceph: allocate ceph messages with a slab allocator
rbd: allocate image object names with a slab allocator
rbd: allocate object requests with a slab allocator
rbd: allocate name separate from obj_request
rbd: allocate image requests with a slab allocator
rbd: use binary search for snapshot lookup
rbd: clear EXISTS flag if mapped snapshot disappears
rbd: kill off the snapshot list
rbd: define rbd_snap_size() and rbd_snap_features()
rbd: use snap_id not index to look up snap info
rbd: look up snapshot name in names buffer
rbd: drop obj_request->version
rbd: drop rbd_obj_method_sync() version parameter
rbd: more version parameter removal
rbd: get rid of some version parameters
rbd: stop tracking header object version
rbd: snap names are pointer to constant data
...
Diffstat (limited to 'drivers/block')
-rw-r--r-- | drivers/block/rbd.c | 2858 |
1 files changed, 1834 insertions, 1024 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b7b7a88d9f68..c2ca1818f335 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1,3 +1,4 @@ + /* rbd.c -- Export ceph rados objects as a Linux block device @@ -32,12 +33,14 @@ #include <linux/ceph/mon_client.h> #include <linux/ceph/decode.h> #include <linux/parser.h> +#include <linux/bsearch.h> #include <linux/kernel.h> #include <linux/device.h> #include <linux/module.h> #include <linux/fs.h> #include <linux/blkdev.h> +#include <linux/slab.h> #include "rbd_types.h" @@ -52,13 +55,6 @@ #define SECTOR_SHIFT 9 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) -/* It might be useful to have these defined elsewhere */ - -#define U8_MAX ((u8) (~0U)) -#define U16_MAX ((u16) (~0U)) -#define U32_MAX ((u32) (~0U)) -#define U64_MAX ((u64) (~0ULL)) - #define RBD_DRV_NAME "rbd" #define RBD_DRV_NAME_LONG "rbd (rados block device)" @@ -72,6 +68,8 @@ #define RBD_SNAP_HEAD_NAME "-" +#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ + /* This allows a single page to hold an image name sent by OSD */ #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) #define RBD_IMAGE_ID_LEN_MAX 64 @@ -80,11 +78,14 @@ /* Feature bits */ -#define RBD_FEATURE_LAYERING 1 +#define RBD_FEATURE_LAYERING (1<<0) +#define RBD_FEATURE_STRIPINGV2 (1<<1) +#define RBD_FEATURES_ALL \ + (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) /* Features supported by this (client software) implementation. */ -#define RBD_FEATURES_ALL (0) +#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) /* * An RBD device name will be "rbd#", where the "rbd" comes from @@ -112,7 +113,8 @@ struct rbd_image_header { char *snap_names; u64 *snap_sizes; - u64 obj_version; + u64 stripe_unit; + u64 stripe_count; }; /* @@ -142,13 +144,13 @@ struct rbd_image_header { */ struct rbd_spec { u64 pool_id; - char *pool_name; + const char *pool_name; - char *image_id; - char *image_name; + const char *image_id; + const char *image_name; u64 snap_id; - char *snap_name; + const char *snap_name; struct kref kref; }; @@ -174,13 +176,44 @@ enum obj_request_type { OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES }; +enum obj_req_flags { + OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ + OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ + OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ + OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +}; + struct rbd_obj_request { const char *object_name; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ + unsigned long flags; - struct rbd_img_request *img_request; - struct list_head links; /* img_request->obj_requests */ + /* + * An object request associated with an image will have its + * img_data flag set; a standalone object request will not. + * + * A standalone object request will have which == BAD_WHICH + * and a null obj_request pointer. + * + * An object request initiated in support of a layered image + * object (to check for its existence before a write) will + * have which == BAD_WHICH and a non-null obj_request pointer. + * + * Finally, an object request for rbd image data will have + * which != BAD_WHICH, and will have a non-null img_request + * pointer. The value of which will be in the range + * 0..(img_request->obj_request_count-1). + */ + union { + struct rbd_obj_request *obj_request; /* STAT op */ + struct { + struct rbd_img_request *img_request; + u64 img_offset; + /* links for img_request->obj_requests list */ + struct list_head links; + }; + }; u32 which; /* posn image request list */ enum obj_request_type type; @@ -191,13 +224,12 @@ struct rbd_obj_request { u32 page_count; }; }; + struct page **copyup_pages; struct ceph_osd_request *osd_req; u64 xferred; /* bytes transferred */ - u64 version; int result; - atomic_t done; rbd_obj_callback_t callback; struct completion completion; @@ -205,19 +237,31 @@ struct rbd_obj_request { struct kref kref; }; +enum img_req_flags { + IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ + IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ + IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ +}; + struct rbd_img_request { - struct request *rq; struct rbd_device *rbd_dev; u64 offset; /* starting image byte offset */ u64 length; /* byte count from offset */ - bool write_request; /* false for read */ + unsigned long flags; union { + u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ - u64 snap_id; /* for reads */ }; + union { + struct request *rq; /* block request */ + struct rbd_obj_request *obj_request; /* obj req initiator */ + }; + struct page **copyup_pages; spinlock_t completion_lock;/* protects next_completion */ u32 next_completion; rbd_img_callback_t callback; + u64 xferred;/* aggregate bytes transferred */ + int result; /* first nonzero obj_request result */ u32 obj_request_count; struct list_head obj_requests; /* rbd_obj_request structs */ @@ -232,15 +276,6 @@ struct rbd_img_request { #define for_each_obj_request_safe(ireq, oreq, n) \ list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) -struct rbd_snap { - struct device dev; - const char *name; - u64 size; - struct list_head node; - u64 id; - u64 features; -}; - struct rbd_mapping { u64 size; u64 features; @@ -276,6 +311,7 @@ struct rbd_device { struct rbd_spec *parent_spec; u64 parent_overlap; + struct rbd_device *parent; /* protects updating the header */ struct rw_semaphore header_rwsem; @@ -284,9 +320,6 @@ struct rbd_device { struct list_head node; - /* list of snapshots */ - struct list_head snaps; - /* sysfs related */ struct device dev; unsigned long open_count; /* protected by lock */ @@ -312,16 +345,21 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); -static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); -static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); +/* Slab caches for frequently-allocated structures */ + +static struct kmem_cache *rbd_img_request_cache; +static struct kmem_cache *rbd_obj_request_cache; +static struct kmem_cache *rbd_segment_name_cache; -static void rbd_dev_release(struct device *dev); -static void rbd_remove_snap_dev(struct rbd_snap *snap); +static int rbd_img_request_submit(struct rbd_img_request *img_request); + +static void rbd_dev_device_release(struct device *dev); static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count); +static int rbd_dev_image_probe(struct rbd_device *rbd_dev); static struct bus_attribute rbd_bus_attrs[] = { __ATTR(add, S_IWUSR, NULL, rbd_add), @@ -383,8 +421,19 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) # define rbd_assert(expr) ((void) 0) #endif /* !RBD_DEBUG */ -static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); -static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); +static void rbd_img_parent_read(struct rbd_obj_request *obj_request); +static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); + +static int rbd_dev_refresh(struct rbd_device *rbd_dev); +static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev); +static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, + u64 snap_id); +static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u8 *order, u64 *snap_size); +static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features); +static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name); static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -484,6 +533,13 @@ out_opt: return ERR_PTR(ret); } +static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) +{ + kref_get(&rbdc->kref); + + return rbdc; +} + /* * Find a ceph client with specific addr and configuration. If * found, bump its reference count. @@ -499,7 +555,8 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) spin_lock(&rbd_client_list_lock); list_for_each_entry(client_node, &rbd_client_list, node) { if (!ceph_compare_options(ceph_opts, client_node->client)) { - kref_get(&client_node->kref); + __rbd_get_client(client_node); + found = true; break; } @@ -722,7 +779,6 @@ static int rbd_header_from_disk(struct rbd_image_header *header, header->snap_sizes[i] = le64_to_cpu(ondisk->snaps[i].image_size); } else { - WARN_ON(ondisk->snap_names_len); header->snap_names = NULL; header->snap_sizes = NULL; } @@ -735,18 +791,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header, /* Allocate and fill in the snapshot context */ header->image_size = le64_to_cpu(ondisk->image_size); - size = sizeof (struct ceph_snap_context); - size += snap_count * sizeof (header->snapc->snaps[0]); - header->snapc = kzalloc(size, GFP_KERNEL); + + header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); if (!header->snapc) goto out_err; - - atomic_set(&header->snapc->nref, 1); header->snapc->seq = le64_to_cpu(ondisk->snap_seq); - header->snapc->num_snaps = snap_count; for (i = 0; i < snap_count; i++) - header->snapc->snaps[i] = - le64_to_cpu(ondisk->snaps[i].id); + header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id); return 0; @@ -761,70 +812,174 @@ out_err: return -ENOMEM; } -static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) +static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) { - struct rbd_snap *snap; + const char *snap_name; + rbd_assert(which < rbd_dev->header.snapc->num_snaps); + + /* Skip over names until we find the one we are looking for */ + + snap_name = rbd_dev->header.snap_names; + while (which--) + snap_name += strlen(snap_name) + 1; + + return kstrdup(snap_name, GFP_KERNEL); +} + +/* + * Snapshot id comparison function for use with qsort()/bsearch(). + * Note that result is for snapshots in *descending* order. + */ +static int snapid_compare_reverse(const void *s1, const void *s2) +{ + u64 snap_id1 = *(u64 *)s1; + u64 snap_id2 = *(u64 *)s2; + + if (snap_id1 < snap_id2) + return 1; + return snap_id1 == snap_id2 ? 0 : -1; +} + +/* + * Search a snapshot context to see if the given snapshot id is + * present. + * + * Returns the position of the snapshot id in the array if it's found, + * or BAD_SNAP_INDEX otherwise. + * + * Note: The snapshot array is in kept sorted (by the osd) in + * reverse order, highest snapshot id first. + */ +static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + u64 *found; + + found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, + sizeof (snap_id), snapid_compare_reverse); + + return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; +} + +static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, + u64 snap_id) +{ + u32 which; + + which = rbd_dev_snap_index(rbd_dev, snap_id); + if (which == BAD_SNAP_INDEX) + return NULL; + + return _rbd_dev_v1_snap_name(rbd_dev, which); +} + +static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) +{ if (snap_id == CEPH_NOSNAP) return RBD_SNAP_HEAD_NAME; - list_for_each_entry(snap, &rbd_dev->snaps, node) - if (snap_id == snap->id) - return snap->name; + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (rbd_dev->image_format == 1) + return rbd_dev_v1_snap_name(rbd_dev, snap_id); - return NULL; + return rbd_dev_v2_snap_name(rbd_dev, snap_id); } -static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) +static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_size) { + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (snap_id == CEPH_NOSNAP) { + *snap_size = rbd_dev->header.image_size; + } else if (rbd_dev->image_format == 1) { + u32 which; - struct rbd_snap *snap; + which = rbd_dev_snap_index(rbd_dev, snap_id); + if (which == BAD_SNAP_INDEX) + return -ENOENT; - list_for_each_entry(snap, &rbd_dev->snaps, node) { - if (!strcmp(snap_name, snap->name)) { - rbd_dev->spec->snap_id = snap->id; - rbd_dev->mapping.size = snap->size; - rbd_dev->mapping.features = snap->features; + *snap_size = rbd_dev->header.snap_sizes[which]; + } else { + u64 size = 0; + int ret; - return 0; - } + ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); + if (ret) + return ret; + + *snap_size = size; } + return 0; +} + +static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (snap_id == CEPH_NOSNAP) { + *snap_features = rbd_dev->header.features; + } else if (rbd_dev->image_format == 1) { + *snap_features = 0; /* No features for format 1 */ + } else { + u64 features = 0; + int ret; + + ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); + if (ret) + return ret; - return -ENOENT; + *snap_features = features; + } + return 0; } -static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) +static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) { + const char *snap_name = rbd_dev->spec->snap_name; + u64 snap_id; + u64 size = 0; + u64 features = 0; int ret; - if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, - sizeof (RBD_SNAP_HEAD_NAME))) { - rbd_dev->spec->snap_id = CEPH_NOSNAP; - rbd_dev->mapping.size = rbd_dev->header.image_size; - rbd_dev->mapping.features = rbd_dev->header.features; - ret = 0; + if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) { + snap_id = rbd_snap_id_by_name(rbd_dev, snap_name); + if (snap_id == CEPH_NOSNAP) + return -ENOENT; } else { - ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); - if (ret < 0) - goto done; - rbd_dev->mapping.read_only = true; + snap_id = CEPH_NOSNAP; } - set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); -done: - return ret; + ret = rbd_snap_size(rbd_dev, snap_id, &size); + if (ret) + return ret; + ret = rbd_snap_features(rbd_dev, snap_id, &features); + if (ret) + return ret; + + rbd_dev->mapping.size = size; + rbd_dev->mapping.features = features; + + /* If we are mapping a snapshot it must be marked read-only */ + + if (snap_id != CEPH_NOSNAP) + rbd_dev->mapping.read_only = true; + + return 0; } -static void rbd_header_free(struct rbd_image_header *header) +static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) { - kfree(header->object_prefix); - header->object_prefix = NULL; - kfree(header->snap_sizes); - header->snap_sizes = NULL; - kfree(header->snap_names); - header->snap_names = NULL; - ceph_put_snap_context(header->snapc); - header->snapc = NULL; + rbd_dev->mapping.size = 0; + rbd_dev->mapping.features = 0; + rbd_dev->mapping.read_only = true; +} + +static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev) +{ + rbd_dev->mapping.size = 0; + rbd_dev->mapping.features = 0; + rbd_dev->mapping.read_only = true; } static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) @@ -833,7 +988,7 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) u64 segment; int ret; - name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); + name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); if (!name) return NULL; segment = offset >> rbd_dev->header.obj_order; @@ -849,6 +1004,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) return name; } +static void rbd_segment_name_free(const char *name) +{ + /* The explicit cast here is needed to drop the const qualifier */ + + kmem_cache_free(rbd_segment_name_cache, (void *)name); +} + static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; @@ -921,6 +1083,37 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) } /* + * similar to zero_bio_chain(), zeros data defined by a page array, + * starting at the given byte offset from the start of the array and + * continuing up to the given end offset. The pages array is + * assumed to be big enough to hold all bytes up to the end. + */ +static void zero_pages(struct page **pages, u64 offset, u64 end) +{ + struct page **page = &pages[offset >> PAGE_SHIFT]; + + rbd_assert(end > offset); + rbd_assert(end - offset <= (u64)SIZE_MAX); + while (offset < end) { + size_t page_offset; + size_t length; + unsigned long flags; + void *kaddr; + + page_offset = (size_t)(offset & ~PAGE_MASK); + length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); + local_irq_save(flags); + kaddr = kmap_atomic(*page); + memset(kaddr + page_offset, 0, length); + kunmap_atomic(kaddr); + local_irq_restore(flags); + + offset += length; + page++; + } +} + +/* * Clone a portion of a bio, starting at the given byte offset * and continuing for the number of bytes indicated. */ @@ -1064,6 +1257,77 @@ out_err: return NULL; } +/* + * The default/initial value for all object request flags is 0. For + * each flag, once its value is set to 1 it is never reset to 0 + * again. + */ +static void obj_request_img_data_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { + struct rbd_device *rbd_dev; + + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", + obj_request); + } +} + +static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; +} + +static void obj_request_done_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { + struct rbd_device *rbd_dev = NULL; + + if (obj_request_img_data_test(obj_request)) + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked done\n", + obj_request); + } +} + +static bool obj_request_done_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; +} + +/* + * This sets the KNOWN flag after (possibly) setting the EXISTS + * flag. The latter is set based on the "exists" value provided. + * + * Note that for our purposes once an object exists it never goes + * away again. It's possible that the response from two existence + * checks are separated by the creation of the target object, and + * the first ("doesn't exist") response arrives *after* the second + * ("does exist"). In that case we ignore the second one. + */ +static void obj_request_existence_set(struct rbd_obj_request *obj_request, + bool exists) +{ + if (exists) + set_bit(OBJ_REQ_EXISTS, &obj_request->flags); + set_bit(OBJ_REQ_KNOWN, &obj_request->flags); + smp_mb(); +} + +static bool obj_request_known_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; +} + +static bool obj_request_exists_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; +} + static void rbd_obj_request_get(struct rbd_obj_request *obj_request) { dout("%s: obj %p (was %d)\n", __func__, obj_request, @@ -1101,9 +1365,11 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, { rbd_assert(obj_request->img_request == NULL); - rbd_obj_request_get(obj_request); + /* Image request now owns object's original reference */ obj_request->img_request = img_request; obj_request->which = img_request->obj_request_count; + rbd_assert(!obj_request_img_data_test(obj_request)); + obj_request_img_data_set(obj_request); rbd_assert(obj_request->which != BAD_WHICH); img_request->obj_request_count++; list_add_tail(&obj_request->links, &img_request->obj_requests); @@ -1123,6 +1389,7 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, img_request->obj_request_count--; rbd_assert(obj_request->which == img_request->obj_request_count); obj_request->which = BAD_WHICH; + rbd_assert(obj_request_img_data_test(obj_request)); rbd_assert(obj_request->img_request == img_request); obj_request->img_request = NULL; obj_request->callback = NULL; @@ -1141,76 +1408,6 @@ static bool obj_request_type_valid(enum obj_request_type type) } } -static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) -{ - struct ceph_osd_req_op *op; - va_list args; - size_t size; - - op = kzalloc(sizeof (*op), GFP_NOIO); - if (!op) - return NULL; - op->op = opcode; - va_start(args, opcode); - switch (opcode) { - case CEPH_OSD_OP_READ: - case CEPH_OSD_OP_WRITE: - /* rbd_osd_req_op_create(READ, offset, length) */ - /* rbd_osd_req_op_create(WRITE, offset, length) */ - op->extent.offset = va_arg(args, u64); - op->extent.length = va_arg(args, u64); - if (opcode == CEPH_OSD_OP_WRITE) - op->payload_len = op->extent.length; - break; - case CEPH_OSD_OP_STAT: - break; - case CEPH_OSD_OP_CALL: - /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */ - op->cls.class_name = va_arg(args, char *); - size = strlen(op->cls.class_name); - rbd_assert(size <= (size_t) U8_MAX); - op->cls.class_len = size; - op->payload_len = size; - - op->cls.method_name = va_arg(args, char *); - size = strlen(op->cls.method_name); - rbd_assert(size <= (size_t) U8_MAX); - op->cls.method_len = size; - op->payload_len += size; - - op->cls.argc = 0; - op->cls.indata = va_arg(args, void *); - size = va_arg(args, size_t); - rbd_assert(size <= (size_t) U32_MAX); - op->cls.indata_len = (u32) size; - op->payload_len += size; - break; - case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_WATCH: - /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */ - /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */ - op->watch.cookie = va_arg(args, u64); - op->watch.ver = va_arg(args, u64); - op->watch.ver = cpu_to_le64(op->watch.ver); - if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int)) - op->watch.flag = (u8) 1; - break; - default: - rbd_warn(NULL, "unsupported opcode %hu\n", opcode); - kfree(op); - op = NULL; - break; - } - va_end(args); - - return op; -} - -static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) -{ - kfree(op); -} - static int rbd_obj_request_submit(struct ceph_osd_client *osdc, struct rbd_obj_request *obj_request) { @@ -1221,7 +1418,24 @@ static int rbd_obj_request_submit(struct ceph_osd_client *osdc, static void rbd_img_request_complete(struct rbd_img_request *img_request) { + dout("%s: img %p\n", __func__, img_request); + + /* + * If no error occurred, compute the aggregate transfer + * count for the image request. We could instead use + * atomic64_cmpxchg() to update it as each object request + * completes; not clear which way is better off hand. + */ + if (!img_request->result) { + struct rbd_obj_request *obj_request; + u64 xferred = 0; + + for_each_obj_request(img_request, obj_request) + xferred += obj_request->xferred; + img_request->xferred = xferred; + } + if (img_request->callback) img_request->callback(img_request); else @@ -1237,39 +1451,56 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) return wait_for_completion_interruptible(&obj_request->completion); } -static void obj_request_done_init(struct rbd_obj_request *obj_request) +/* + * The default/initial value for all image request flags is 0. Each + * is conditionally set to 1 at image request initialization time + * and currently never change thereafter. + */ +static void img_request_write_set(struct rbd_img_request *img_request) { - atomic_set(&obj_request->done, 0); - smp_wmb(); + set_bit(IMG_REQ_WRITE, &img_request->flags); + smp_mb(); } -static void obj_request_done_set(struct rbd_obj_request *obj_request) +static bool img_request_write_test(struct rbd_img_request *img_request) { - int done; + smp_mb(); + return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; +} - done = atomic_inc_return(&obj_request->done); - if (done > 1) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev; +static void img_request_child_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_CHILD, &img_request->flags); + smp_mb(); +} - rbd_dev = img_request ? img_request->rbd_dev : NULL; - rbd_warn(rbd_dev, "obj_request %p was already done\n", - obj_request); - } +static bool img_request_child_test(struct rbd_img_request *img_request) +{ + smp_mb(); + return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; } -static bool obj_request_done_test(struct rbd_obj_request *obj_request) +static void img_request_layered_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_LAYERED, &img_request->flags); + smp_mb(); +} + +static bool img_request_layered_test(struct rbd_img_request *img_request) { smp_mb(); - return atomic_read(&obj_request->done) != 0; + return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; } static void rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) { + u64 xferred = obj_request->xferred; + u64 length = obj_request->length; + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, obj_request, obj_request->img_request, obj_request->result, - obj_request->xferred, obj_request->length); + xferred, length); /* * ENOENT means a hole in the image. We zero-fill the * entire length of the request. A short read also implies @@ -1277,15 +1508,20 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) * update the xferred count to indicate the whole request * was satisfied. */ - BUG_ON(obj_request->type != OBJ_REQUEST_BIO); + rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); if (obj_request->result == -ENOENT) { - zero_bio_chain(obj_request->bio_list, 0); + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, 0); + else + zero_pages(obj_request->pages, 0, length); obj_request->result = 0; - obj_request->xferred = obj_request->length; - } else if (obj_request->xferred < obj_request->length && - !obj_request->result) { - zero_bio_chain(obj_request->bio_list, obj_request->xferred); - obj_request->xferred = obj_request->length; + obj_request->xferred = length; + } else if (xferred < length && !obj_request->result) { + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, xferred); + else + zero_pages(obj_request->pages, xferred, length); + obj_request->xferred = length; } obj_request_done_set(obj_request); } @@ -1308,9 +1544,23 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { - dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, - obj_request->result, obj_request->xferred, obj_request->length); - if (obj_request->img_request) + struct rbd_img_request *img_request = NULL; + struct rbd_device *rbd_dev = NULL; + bool layered = false; + + if (obj_request_img_data_test(obj_request)) { + img_request = obj_request->img_request; + layered = img_request && img_request_layered_test(img_request); + rbd_dev = img_request->rbd_dev; + } + + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, + obj_request, img_request, obj_request->result, + obj_request->xferred, obj_request->length); + if (layered && obj_request->result == -ENOENT && + obj_request->img_offset < rbd_dev->parent_overlap) + rbd_img_parent_read(obj_request); + else if (img_request) rbd_img_obj_request_read_callback(obj_request); else obj_request_done_set(obj_request); @@ -1321,9 +1571,8 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) dout("%s: obj %p result %d %llu\n", __func__, obj_request, obj_request->result, obj_request->length); /* - * There is no such thing as a successful short write. - * Our xferred value is the number of bytes transferred - * back. Set it to our originally-requested length. + * There is no such thing as a successful short write. Set + * it to our originally-requested length. */ obj_request->xferred = obj_request->length; obj_request_done_set(obj_request); @@ -1347,22 +1596,25 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); rbd_assert(osd_req == obj_request->osd_req); - rbd_assert(!!obj_request->img_request ^ - (obj_request->which == BAD_WHICH)); + if (obj_request_img_data_test(obj_request)) { + rbd_assert(obj_request->img_request); + rbd_assert(obj_request->which != BAD_WHICH); + } else { + rbd_assert(obj_request->which == BAD_WHICH); + } if (osd_req->r_result < 0) obj_request->result = osd_req->r_result; - obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); - WARN_ON(osd_req->r_num_ops != 1); /* For now */ + BUG_ON(osd_req->r_num_ops > 2); /* * We support a 64-bit length, but ultimately it has to be * passed to blk_end_request(), which takes an unsigned int. */ obj_request->xferred = osd_req->r_reply_op_len[0]; - rbd_assert(obj_request->xferred < (u64) UINT_MAX); - opcode = osd_req->r_request_ops[0].op; + rbd_assert(obj_request->xferred < (u64)UINT_MAX); + opcode = osd_req->r_ops[0].op; switch (opcode) { case CEPH_OSD_OP_READ: rbd_osd_read_callback(obj_request); @@ -1388,28 +1640,49 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, rbd_obj_request_complete(obj_request); } +static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + u64 snap_id; + + rbd_assert(osd_req != NULL); + + snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; + ceph_osdc_build_request(osd_req, obj_request->offset, + NULL, snap_id, NULL); +} + +static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + struct ceph_snap_context *snapc; + struct timespec mtime = CURRENT_TIME; + + rbd_assert(osd_req != NULL); + + snapc = img_request ? img_request->snapc : NULL; + ceph_osdc_build_request(osd_req, obj_request->offset, + snapc, CEPH_NOSNAP, &mtime); +} + static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, bool write_request, - struct rbd_obj_request *obj_request, - struct ceph_osd_req_op *op) + struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_snap_context *snapc = NULL; struct ceph_osd_client *osdc; struct ceph_osd_request *osd_req; - struct timespec now; - struct timespec *mtime; - u64 snap_id = CEPH_NOSNAP; - u64 offset = obj_request->offset; - u64 length = obj_request->length; - if (img_request) { - rbd_assert(img_request->write_request == write_request); - if (img_request->write_request) + if (obj_request_img_data_test(obj_request)) { + struct rbd_img_request *img_request = obj_request->img_request; + + rbd_assert(write_request == + img_request_write_test(img_request)); + if (write_request) snapc = img_request->snapc; - else - snap_id = img_request->snap_id; } /* Allocate and initialize the request, for the single op */ @@ -1419,31 +1692,10 @@ static struct ceph_osd_request *rbd_osd_req_create( if (!osd_req) return NULL; /* ENOMEM */ - rbd_assert(obj_request_type_valid(obj_request->type)); - switch (obj_request->type) { - case OBJ_REQUEST_NODATA: - break; /* Nothing to do */ - case OBJ_REQUEST_BIO: - rbd_assert(obj_request->bio_list != NULL); - osd_req->r_bio = obj_request->bio_list; - break; - case OBJ_REQUEST_PAGES: - osd_req- |