summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-03-12 14:58:35 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2019-03-12 14:58:35 -0700
commit2b0a80b0d0bb0a3db74588279bf851b28c6c4705 (patch)
tree1d73413afae3617422027194eda5f71f94fd2c07
parent92825b0298ca6822085ef483f914b6e0dea9bf66 (diff)
parentd11ae8e0a76afc506071831854348f2ea1f3290e (diff)
Merge tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The highlights are: - rbd will now ignore discards that aren't aligned and big enough to actually free up some space (myself). This is controlled by the new alloc_size map option and can be disabled if needed. - support for rbd deep-flatten feature (myself). Deep-flatten allows "rbd flatten" to fully disconnect the clone image and its snapshots from the parent and make the parent snapshot removable. - a new round of cap handling improvements (Zheng Yan). The kernel client should now be much more prompt about releasing its caps and it is possible to put a limit on the number of caps held. - support for getting ceph.dir.pin extended attribute (Zheng Yan)" * tag 'ceph-for-5.1-rc1' of git://github.com/ceph/ceph-client: (26 commits) Documentation: modern versions of ceph are not backed by btrfs rbd: advertise support for RBD_FEATURE_DEEP_FLATTEN rbd: whole-object write and zeroout should copyup when snapshots exist rbd: copyup with an empty snapshot context (aka deep-copyup) rbd: introduce rbd_obj_issue_copyup_ops() rbd: stop copying num_osd_ops in rbd_obj_issue_copyup() rbd: factor out __rbd_osd_req_create() rbd: clear ->xferred on error from rbd_obj_issue_copyup() rbd: remove experimental designation from kernel layering ceph: add mount option to limit caps count ceph: periodically trim stale dentries ceph: delete stale dentry when last reference is dropped ceph: remove dentry_lru file from debugfs ceph: touch existing cap when handling reply ceph: pass inclusive lend parameter to filemap_write_and_wait_range() rbd: round off and ignore discards that are too small rbd: handle DISCARD and WRITE_ZEROES separately rbd: get rid of obj_req->obj_request_count libceph: use struct_size() for kmalloc() in crush_decode() ceph: send cap releases more aggressively ...
-rw-r--r--Documentation/filesystems/ceph.txt14
-rw-r--r--drivers/block/rbd.c400
-rw-r--r--fs/ceph/caps.c72
-rw-r--r--fs/ceph/debugfs.c27
-rw-r--r--fs/ceph/dir.c455
-rw-r--r--fs/ceph/file.c13
-rw-r--r--fs/ceph/inode.c52
-rw-r--r--fs/ceph/mds_client.c698
-rw-r--r--fs/ceph/mds_client.h43
-rw-r--r--fs/ceph/snap.c159
-rw-r--r--fs/ceph/super.c21
-rw-r--r--fs/ceph/super.h43
-rw-r--r--fs/ceph/xattr.c20
-rw-r--r--include/linux/ceph/types.h1
-rw-r--r--net/ceph/osdmap.c5
15 files changed, 1597 insertions, 426 deletions
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 1177052701e1..d2c6a5ccf0f5 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -22,9 +22,7 @@ In contrast to cluster filesystems like GFS, OCFS2, and GPFS that rely
on symmetric access by all clients to shared block devices, Ceph
separates data and metadata management into independent server
clusters, similar to Lustre. Unlike Lustre, however, metadata and
-storage nodes run entirely as user space daemons. Storage nodes
-utilize btrfs to store data objects, leveraging its advanced features
-(checksumming, metadata replication, etc.). File data is striped
+storage nodes run entirely as user space daemons. File data is striped
across storage nodes in large chunks to distribute workload and
facilitate high throughputs. When storage nodes fail, data is
re-replicated in a distributed fashion by the storage nodes themselves
@@ -118,6 +116,10 @@ Mount Options
of a non-responsive Ceph file system. The default is 30
seconds.
+ caps_max=X
+ Specify the maximum number of caps to hold. Unused caps are released
+ when number of caps exceeds the limit. The default is 0 (no limit)
+
rbytes
When stat() is called on a directory, set st_size to 'rbytes',
the summation of file sizes over all files nested beneath that
@@ -160,11 +162,11 @@ More Information
================
For more information on Ceph, see the home page at
- http://ceph.newdream.net/
+ https://ceph.com/
The Linux kernel client source tree is available at
- git://ceph.newdream.net/git/ceph-client.git
+ https://github.com/ceph/ceph-client.git
git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
and the source for the full system is at
- git://ceph.newdream.net/git/ceph.git
+ https://github.com/ceph/ceph.git
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 74088d8dbaf3..4ba967d65cf9 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -115,12 +115,14 @@ static int atomic_dec_return_safe(atomic_t *v)
#define RBD_FEATURE_LAYERING (1ULL<<0)
#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
+#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
#define RBD_FEATURE_DATA_POOL (1ULL<<7)
#define RBD_FEATURE_OPERATIONS (1ULL<<8)
#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
RBD_FEATURE_STRIPINGV2 | \
RBD_FEATURE_EXCLUSIVE_LOCK | \
+ RBD_FEATURE_DEEP_FLATTEN | \
RBD_FEATURE_DATA_POOL | \
RBD_FEATURE_OPERATIONS)
@@ -214,28 +216,40 @@ enum obj_operation_type {
OBJ_OP_READ = 1,
OBJ_OP_WRITE,
OBJ_OP_DISCARD,
+ OBJ_OP_ZEROOUT,
};
/*
* Writes go through the following state machine to deal with
* layering:
*
- * need copyup
- * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
- * | ^ |
- * v \------------------------------/
- * done
- * ^
- * |
- * RBD_OBJ_WRITE_FLAT
+ * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
+ * . | .
+ * . v .
+ * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . .
+ * . | . .
+ * . v v (deep-copyup .
+ * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) .
+ * flattened) v | . .
+ * . v . .
+ * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup .
+ * | not needed) v
+ * v .
+ * done . . . . . . . . . . . . . . . . . .
+ * ^
+ * |
+ * RBD_OBJ_WRITE_FLAT
*
* Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
- * there is a parent or not.
+ * assert_exists guard is needed or not (in some cases it's not needed
+ * even if there is a parent).
*/
enum rbd_obj_write_state {
RBD_OBJ_WRITE_FLAT = 1,
RBD_OBJ_WRITE_GUARD,
- RBD_OBJ_WRITE_COPYUP,
+ RBD_OBJ_WRITE_READ_FROM_PARENT,
+ RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC,
+ RBD_OBJ_WRITE_COPYUP_OPS,
};
struct rbd_obj_request {
@@ -291,7 +305,6 @@ struct rbd_img_request {
int result; /* first nonzero obj_request result */
struct list_head object_extents; /* obj_req.ex structs */
- u32 obj_request_count;
u32 pending_count;
struct kref kref;
@@ -421,6 +434,10 @@ static DEFINE_IDA(rbd_dev_id_ida);
static struct workqueue_struct *rbd_wq;
+static struct ceph_snap_context rbd_empty_snapc = {
+ .nref = REFCOUNT_INIT(1),
+};
+
/*
* single-major requires >= 0.75 version of userspace rbd utility.
*/
@@ -732,6 +749,7 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
*/
enum {
Opt_queue_depth,
+ Opt_alloc_size,
Opt_lock_timeout,
Opt_last_int,
/* int args above */
@@ -748,6 +766,7 @@ enum {
static match_table_t rbd_opts_tokens = {
{Opt_queue_depth, "queue_depth=%d"},
+ {Opt_alloc_size, "alloc_size=%d"},
{Opt_lock_timeout, "lock_timeout=%d"},
/* int args above */
{Opt_pool_ns, "_pool_ns=%s"},
@@ -764,6 +783,7 @@ static match_table_t rbd_opts_tokens = {
struct rbd_options {
int queue_depth;
+ int alloc_size;
unsigned long lock_timeout;
bool read_only;
bool lock_on_read;
@@ -772,6 +792,7 @@ struct rbd_options {
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
+#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
#define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
@@ -811,6 +832,17 @@ static int parse_rbd_opts_token(char *c, void *private)
}
pctx->opts->queue_depth = intval;
break;
+ case Opt_alloc_size:
+ if (intval < 1) {
+ pr_err("alloc_size out of range\n");
+ return -EINVAL;
+ }
+ if (!is_power_of_2(intval)) {
+ pr_err("alloc_size must be a power of 2\n");
+ return -EINVAL;
+ }
+ pctx->opts->alloc_size = intval;
+ break;
case Opt_lock_timeout:
/* 0 is "wait forever" (i.e. infinite timeout) */
if (intval < 0 || intval > INT_MAX / 1000) {
@@ -857,6 +889,8 @@ static char* obj_op_name(enum obj_operation_type op_type)
return "write";
case OBJ_OP_DISCARD:
return "discard";
+ case OBJ_OP_ZEROOUT:
+ return "zeroout";
default:
return "???";
}
@@ -1344,7 +1378,6 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
/* Image request now owns object's original reference */
obj_request->img_request = img_request;
- img_request->obj_request_count++;
img_request->pending_count++;
dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
}
@@ -1354,8 +1387,6 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
{
dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
list_del(&obj_request->ex.oe_item);
- rbd_assert(img_request->obj_request_count > 0);
- img_request->obj_request_count--;
rbd_assert(obj_request->img_request == img_request);
rbd_obj_request_put(obj_request);
}
@@ -1409,6 +1440,19 @@ static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
rbd_dev->layout.object_size;
}
+/*
+ * Must be called after rbd_obj_calc_img_extents().
+ */
+static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
+{
+ if (!obj_req->num_img_extents ||
+ (rbd_obj_is_entire(obj_req) &&
+ !obj_req->img_request->snapc->num_snaps))
+ return false;
+
+ return true;
+}
+
static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
{
return ceph_file_extents_bytes(obj_req->img_extents,
@@ -1422,6 +1466,7 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req)
return false;
case OBJ_OP_WRITE:
case OBJ_OP_DISCARD:
+ case OBJ_OP_ZEROOUT:
return true;
default:
BUG();
@@ -1470,18 +1515,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
}
static struct ceph_osd_request *
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+__rbd_osd_req_create(struct rbd_obj_request *obj_req,
+ struct ceph_snap_context *snapc, unsigned int num_ops)
{
- struct rbd_img_request *img_req = obj_req->img_request;
- struct rbd_device *rbd_dev = img_req->rbd_dev;
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct ceph_osd_request *req;
const char *name_format = rbd_dev->image_format == 1 ?
RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
- req = ceph_osdc_alloc_request(osdc,
- (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
- num_ops, false, GFP_NOIO);
+ req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
if (!req)
return NULL;
@@ -1506,6 +1549,13 @@ err_req:
return NULL;
}
+static struct ceph_osd_request *
+rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+{
+ return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
+ num_ops);
+}
+
static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
{
ceph_osdc_put_request(osd_req);
@@ -1671,7 +1721,6 @@ static void rbd_img_request_destroy(struct kref *kref)
for_each_obj_request_safe(img_request, obj_request, next_obj_request)
rbd_img_obj_request_del(img_request, obj_request);
- rbd_assert(img_request->obj_request_count == 0);
if (img_request_layered_test(img_request)) {
img_request_layered_clear(img_request);
@@ -1754,7 +1803,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
{
- obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+ obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
if (!obj_req->osd_req)
return -ENOMEM;
@@ -1790,6 +1839,11 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
return 0;
}
+static int count_write_ops(struct rbd_obj_request *obj_req)
+{
+ return 2; /* setallochint + write/writefull */
+}
+
static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
unsigned int which)
{
@@ -1816,6 +1870,7 @@ static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
{
unsigned int num_osd_ops, which = 0;
+ bool need_guard;
int ret;
/* reverse map the entire object onto the parent */
@@ -1823,47 +1878,112 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
if (ret)
return ret;
- if (obj_req->num_img_extents) {
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
- num_osd_ops = 3; /* stat + setallochint + write/writefull */
- } else {
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
- num_osd_ops = 2; /* setallochint + write/writefull */
- }
+ need_guard = rbd_obj_copyup_enabled(obj_req);
+ num_osd_ops = need_guard + count_write_ops(obj_req);
obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
if (!obj_req->osd_req)
return -ENOMEM;
- if (obj_req->num_img_extents) {
+ if (need_guard) {
ret = __rbd_obj_setup_stat(obj_req, which++);
if (ret)
return ret;
+
+ obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+ } else {
+ obj_req->write_state = RBD_OBJ_WRITE_FLAT;
}
__rbd_obj_setup_write(obj_req, which);
return 0;
}
-static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
+static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
+{
+ return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
+ CEPH_OSD_OP_ZERO;
+}
+
+static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+{
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+ u64 off = obj_req->ex.oe_off;
+ u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
+ int ret;
+
+ /*
+ * Align the range to alloc_size boundary and punt on discards
+ * that are too small to free up any space.
+ *
+ * alloc_size == object_size && is_tail() is a special case for
+ * filestore with filestore_punch_hole = false, needed to allow
+ * truncate (in addition to delete).
+ */
+ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
+ !rbd_obj_is_tail(obj_req)) {
+ off = round_up(off, rbd_dev->opts->alloc_size);
+ next_off = round_down(next_off, rbd_dev->opts->alloc_size);
+ if (off >= next_off)
+ return 1;
+ }
+
+ /* reverse map the entire object onto the parent */
+ ret = rbd_obj_calc_img_extents(obj_req, true);
+ if (ret)
+ return ret;
+
+ obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
+ if (!obj_req->osd_req)
+ return -ENOMEM;
+
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
+ osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
+ } else {
+ dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
+ obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
+ off, next_off - off);
+ osd_req_op_extent_init(obj_req->osd_req, 0,
+ truncate_or_zero_opcode(obj_req),
+ off, next_off - off, 0, 0);
+ }
+
+ obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+ rbd_osd_req_format_write(obj_req);
+ return 0;
+}
+
+static int count_zeroout_ops(struct rbd_obj_request *obj_req)
+{
+ int num_osd_ops;
+
+ if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
+ !rbd_obj_copyup_enabled(obj_req))
+ num_osd_ops = 2; /* create + truncate */
+ else
+ num_osd_ops = 1; /* delete/truncate/zero */
+
+ return num_osd_ops;
+}
+
+static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
unsigned int which)
{
u16 opcode;
if (rbd_obj_is_entire(obj_req)) {
if (obj_req->num_img_extents) {
- osd_req_op_init(obj_req->osd_req, which++,
- CEPH_OSD_OP_CREATE, 0);
+ if (!rbd_obj_copyup_enabled(obj_req))
+ osd_req_op_init(obj_req->osd_req, which++,
+ CEPH_OSD_OP_CREATE, 0);
opcode = CEPH_OSD_OP_TRUNCATE;
} else {
osd_req_op_init(obj_req->osd_req, which++,
CEPH_OSD_OP_DELETE, 0);
opcode = 0;
}
- } else if (rbd_obj_is_tail(obj_req)) {
- opcode = CEPH_OSD_OP_TRUNCATE;
} else {
- opcode = CEPH_OSD_OP_ZERO;
+ opcode = truncate_or_zero_opcode(obj_req);
}
if (opcode)
@@ -1875,9 +1995,10 @@ static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
rbd_osd_req_format_write(obj_req);
}
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
{
unsigned int num_osd_ops, which = 0;
+ bool need_guard;
int ret;
/* reverse map the entire object onto the parent */
@@ -1885,33 +2006,24 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
if (ret)
return ret;
- if (rbd_obj_is_entire(obj_req)) {
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
- if (obj_req->num_img_extents)
- num_osd_ops = 2; /* create + truncate */
- else
- num_osd_ops = 1; /* delete */
- } else {
- if (obj_req->num_img_extents) {
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
- num_osd_ops = 2; /* stat + truncate/zero */
- } else {
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
- num_osd_ops = 1; /* truncate/zero */
- }
- }
+ need_guard = rbd_obj_copyup_enabled(obj_req);
+ num_osd_ops = need_guard + count_zeroout_ops(obj_req);
obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
if (!obj_req->osd_req)
return -ENOMEM;
- if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
+ if (need_guard) {
ret = __rbd_obj_setup_stat(obj_req, which++);
if (ret)
return ret;
+
+ obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+ } else {
+ obj_req->write_state = RBD_OBJ_WRITE_FLAT;
}
- __rbd_obj_setup_discard(obj_req, which);
+ __rbd_obj_setup_zeroout(obj_req, which);
return 0;
}
@@ -1922,10 +2034,10 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
*/
static int __rbd_img_fill_request(struct rbd_img_request *img_req)
{
- struct rbd_obj_request *obj_req;
+ struct rbd_obj_request *obj_req, *next_obj_req;
int ret;
- for_each_obj_request(img_req, obj_req) {
+ for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
switch (img_req->op_type) {
case OBJ_OP_READ:
ret = rbd_obj_setup_read(obj_req);
@@ -1936,11 +2048,20 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
case OBJ_OP_DISCARD:
ret = rbd_obj_setup_discard(obj_req);
break;
+ case OBJ_OP_ZEROOUT:
+ ret = rbd_obj_setup_zeroout(obj_req);
+ break;
default:
rbd_assert(0);
}
- if (ret)
+ if (ret < 0)
return ret;
+ if (ret > 0) {
+ img_req->xferred += obj_req->ex.oe_len;
+ img_req->pending_count--;
+ rbd_img_obj_request_del(img_req, obj_req);
+ continue;
+ }
ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
if (ret)
@@ -2356,21 +2477,19 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
return true;
}
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+#define MODS_ONLY U32_MAX
+
+static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
+ u32 bytes)
{
- unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
int ret;
dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
+ rbd_assert(bytes > 0 && bytes != MODS_ONLY);
rbd_osd_req_destroy(obj_req->osd_req);
- /*
- * Create a copyup request with the same number of OSD ops as
- * the original request. The original request was stat + op(s),
- * the new copyup request will be copyup + the same op(s).
- */
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+ obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
if (!obj_req->osd_req)
return -ENOMEM;
@@ -2378,27 +2497,65 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
if (ret)
return ret;
- /*
- * Only send non-zero copyup data to save some I/O and network
- * bandwidth -- zero copyup data is equivalent to the object not
- * existing.
- */
- if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
- dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
- bytes = 0;
- }
osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
obj_req->copyup_bvecs,
obj_req->copyup_bvec_count,
bytes);
+ rbd_osd_req_format_write(obj_req);
- switch (obj_req->img_request->op_type) {
+ ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+ if (ret)
+ return ret;
+
+ rbd_obj_request_submit(obj_req);
+ return 0;
+}
+
+static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
+{
+ struct rbd_img_request *img_req = obj_req->img_request;
+ unsigned int num_osd_ops = (bytes != MODS_ONLY);
+ unsigned int which = 0;
+ int ret;
+
+ dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
+ rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
+ obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
+ rbd_osd_req_destroy(obj_req->osd_req);
+
+ switch (img_req->op_type) {
case OBJ_OP_WRITE:
- __rbd_obj_setup_write(obj_req, 1);
+ num_osd_ops += count_write_ops(obj_req);
break;
- case OBJ_OP_DISCARD:
- rbd_assert(!rbd_obj_is_entire(obj_req));
- __rbd_obj_setup_discard(obj_req, 1);
+ case OBJ_OP_ZEROOUT:
+ num_osd_ops += count_zeroout_ops(obj_req);
+ break;
+ default:
+ rbd_assert(0);
+ }
+
+ obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
+ if (!obj_req->osd_req)
+ return -ENOMEM;
+
+ if (bytes != MODS_ONLY) {
+ ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
+ "copyup");
+ if (ret)
+ return ret;
+
+ osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
+ obj_req->copyup_bvecs,
+ obj_req->copyup_bvec_count,
+ bytes);
+ }
+
+ switch (img_req->op_type) {
+ case OBJ_OP_WRITE:
+ __rbd_obj_setup_write(obj_req, which);
+ break;
+ case OBJ_OP_ZEROOUT:
+ __rbd_obj_setup_zeroout(obj_req, which);
break;
default:
rbd_assert(0);
@@ -2412,6 +2569,33 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
return 0;
}
+static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+{
+ /*
+ * Only send non-zero copyup data to save some I/O and network
+ * bandwidth -- zero copyup data is equivalent to the object not
+ * existing.
+ */
+ if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
+ dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
+ bytes = 0;
+ }
+
+ if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
+ /*
+ * Send a copyup request with an empty snapshot context to
+ * deep-copyup the object through all existing snapshots.
+ * A second request with the current snapshot context will be
+ * sent for the actual modification.
+ */
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC;
+ return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes);
+ }
+
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+ return rbd_obj_issue_copyup_ops(obj_req, bytes);
+}
+
static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
{
u32 i;
@@ -2451,22 +2635,19 @@ static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
if (!obj_req->num_img_extents) {
/*
* The overlap has become 0 (most likely because the
- * image has been flattened). Use rbd_obj_issue_copyup()
- * to re-submit the original write request -- the copyup
- * operation itself will be a no-op, since someone must
- * have populated the child object while we weren't
- * looking. Move to WRITE_FLAT state as we'll be done
- * with the operation once the null copyup completes.
+ * image has been flattened). Re-submit the original write
+ * request -- pass MODS_ONLY since the copyup isn't needed
+ * anymore.
*/
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
- return rbd_obj_issue_copyup(obj_req, 0);
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+ return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
}
ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
if (ret)
return ret;
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+ obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT;
return rbd_obj_read_from_parent(obj_req);
}
@@ -2474,7 +2655,6 @@ static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
{
int ret;
-again:
switch (obj_req->write_state) {
case RBD_OBJ_WRITE_GUARD:
rbd_assert(!obj_req->xferred);
@@ -2493,6 +2673,7 @@ again:
}
/* fall through */
case RBD_OBJ_WRITE_FLAT:
+ case RBD_OBJ_WRITE_COPYUP_OPS:
if (!obj_req->result)
/*
* There is no such thing as a successful short
@@ -2500,15 +2681,26 @@ again:
*/
obj_req->xferred = obj_req->ex.oe_len;
return true;
- case RBD_OBJ_WRITE_COPYUP:
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+ case RBD_OBJ_WRITE_READ_FROM_PARENT:
if (obj_req->result)
- goto again;
+ return true;
rbd_assert(obj_req->xferred);
ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
if (ret) {
obj_req->result = ret;
+ obj_req->xferred = 0;
+ return true;
+ }
+ return false;
+ case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC:
+ if (obj_req->result)
+ return true;
+
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS;
+ ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY);
+ if (ret) {
+ obj_req->result = ret;
return true;
}
return false;
@@ -2528,6 +2720,7 @@ static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
case OBJ_OP_WRITE:
return rbd_obj_handle_write(obj_req);
case OBJ_OP_DISCARD:
+ case OBJ_OP_ZEROOUT:
if (rbd_obj_handle_write(obj_req)) {
/*
* Hide -ENOENT from delete/truncate/zero -- discarding
@@ -3640,9 +3833,11 @@ static void rbd_queue_workfn(struct work_struct *work)
switch (req_op(rq)) {
case REQ_OP_DISCARD:
- case REQ_OP_WRITE_ZEROES:
op_type = OBJ_OP_DISCARD;
break;
+ case REQ_OP_WRITE_ZEROES:
+ op_type = OBJ_OP_ZEROOUT;
+ break;
case REQ_OP_WRITE:
op_type = OBJ_OP_WRITE;
break;
@@ -3722,12 +3917,12 @@ static void rbd_queue_workfn(struct work_struct *work)
img_request->rq = rq;
snapc = NULL; /* img_request consumes a ref */
- if (op_type == OBJ_OP_DISCARD)
+ if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
result = rbd_img_fill_nodata(img_request, offset, length);
else
result = rbd_img_fill_from_bio(img_request, offset, length,
rq->bio);
- if (result)
+ if (result || !img_request->pending_count)
goto err_img_request;
rbd_img_request_submit(img_request);
@@ -5388,6 +5583,7 @@ static int rbd_add_parse_args(const char *buf,
pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
+ pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
@@ -5795,14 +5991,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
goto err_out_probe;
-
- /*
- * Need to warn users if this image is the one being
- * mapped and has a parent.
- */
- if (!depth && rbd_dev->parent_spec)
- rbd_warn(rbd_dev,
- "WARNING: kernel layering is EXPERIMENTAL!");
}
ret = rbd_dev_probe_parent(rbd_dev, depth);
@@ -5885,6 +6073,12 @@ static ssize_t do_rbd_add(struct bus_type *bus,
if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
rbd_dev->opts->read_only = true;
+ if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
+ rbd_warn(rbd_dev, "alloc_size adjusted to %u",
+ rbd_dev->layout.object_size);
+ rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
+ }
+
rc = rbd_dev_device_setup(rbd_dev);
if (rc)
goto err_out_image_probe;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index bba28a5034ba..36a8dc699448 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc)
spin_unlock(&mdsc->caps_list_lock);
}
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+ struct ceph_mount_options *fsopt)
{
spin_lock(&mdsc->caps_list_lock);
- mdsc->caps_min_count += delta;
- BUG_ON(mdsc->caps_min_count < 0);
+ mdsc->caps_min_count = fsopt->max_readdir;
+ if (mdsc->caps_min_count < 1024)
+ mdsc->caps_min_count = 1024;
+ mdsc->caps_use_max = fsopt->caps_max;
+ if (mdsc->caps_use_max > 0 &&
+ mdsc->caps_use_max < mdsc->caps_min_count)
+ mdsc->caps_use_max = mdsc->caps_min_count;
spin_unlock(&mdsc->caps_list_lock);
}
@@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
if (!err) {
BUG_ON(have + alloc != need);
ctx->count = need;
+ ctx->used = 0;
}
spin_lock(&mdsc->caps_list_lock);
@@ -295,13 +302,24 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
}
void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
- struct ceph_cap_reservation *ctx)
+ struct ceph_cap_reservation *ctx)
{
+ bool reclaim = false;
+ if (!ctx->count)
+ return;
+
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
spin_lock(&mdsc->caps_list_lock);
__ceph_unreserve_caps(mdsc, ctx->count);
ctx->count = 0;
+
+ if (mdsc->caps_use_max > 0 &&
+ mdsc->caps_use_count > mdsc->caps_use_max)
+ reclaim = true;
spin_unlock(&mdsc->caps_list_lock);
+
+ if (reclaim)
+ ceph_reclaim_caps_nr(mdsc, ctx->used);
}
struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +364,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
+ ctx->used++;
mdsc->caps_reserve_count--;
mdsc->caps_use_count++;
@@ -500,12 +519,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
{
- struct ceph_mount_options *ma = mdsc->fsc->mount_options;
+ struct ceph_mount_options *opt = mdsc->fsc->mount_options;
ci->i_hold_caps_min = round_jiffies(jiffies +
- ma->caps_wanted_delay_min * HZ);
+ opt->caps_wanted_delay_min * HZ);
ci->i_hold_caps_max = round_jiffies(jiffies +
- ma->caps_wanted_delay_max * HZ);
+ opt->caps_wanted_delay_max * HZ);
dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
}
@@ -657,6 +676,10 @@ void ceph_add_cap(struct inode *inode,
session->s_nr_caps++;
spin_unlock(&session->s_cap_lock);
} else {
+ spin_lock(&session->s_cap_lock);
+ list_move_tail(&cap->session_caps, &session->s_caps);
+ spin_unlock(&session->s_cap_lock);
+
if (cap->cap_gen < session->s_cap_gen)
cap->issued = cap->implemented = CEPH_CAP_PIN;
@@ -1081,9 +1104,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
(!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
cap->queue_release = 1;
if (removed) {
- list_add_tail(&cap->session_caps,
- &session->s_cap_releases);
- session->s_num_cap_releases++;
+ __ceph_queue_cap_release(session, cap);
removed = 0;
}
} else {
@@ -1245,7 +1266,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
* Queue cap releases when an inode is dropped from our cache. Since
* inode is about to be destroyed, there is no need for i_ceph_lock.
*/
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;
@@ -2393,6 +2414,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) {
ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+ /* encode_caps_cb() also will reset these sequence<