summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-06-12 23:06:23 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2014-06-12 23:06:23 -0700
commit6d87c225f5d82d29243dc124f1ffcbb0e14ec358 (patch)
tree7d72e2e6a77ec0911e86911d2ddae62c1b4161cf
parent338c09a94b14c449dd53227e9bea44816668c6a5 (diff)
parent22001f619f29ddf66582d834223dcff4c0b74595 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "This has a mix of bug fixes and cleanups. Alex's patch fixes a rare race in RBD. Ilya's patches fix an ENOENT check when a second rbd image is mapped and a couple memory leaks. Zheng fixes several issues with fragmented directories and multiple MDSs. Josh fixes a spin/sleep issue, and Josh and Guangliang's patches fix setting and unsetting RBD images read-only. Naturally there are several other cleanups mixed in for good measure" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits) rbd: only set disk to read-only once rbd: move calls that may sleep out of spin lock range rbd: add ioctl for rbd ceph: use truncate_pagecache() instead of truncate_inode_pages() ceph: include time stamp in every MDS request rbd: fix ida/idr memory leak rbd: use reference counts for image requests rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync() rbd: make sure we have latest osdmap on 'rbd map' libceph: add ceph_monc_wait_osdmap() libceph: mon_get_version request infrastructure libceph: recognize poolop requests in debugfs ceph: refactor readpage_nounlock() to make the logic clearer mds: check cap ID when handling cap export message ceph: remember subtree root dirfrag's auth MDS ceph: introduce ceph_fill_fragtree() ceph: handle cap import atomically ceph: pre-allocate ceph_cap struct for ceph_add_cap() ceph: update inode fields according to issued caps rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO ...
-rw-r--r--drivers/block/rbd.c242
-rw-r--r--fs/ceph/acl.c6
-rw-r--r--fs/ceph/addr.c17
-rw-r--r--fs/ceph/caps.c246
-rw-r--r--fs/ceph/export.c2
-rw-r--r--fs/ceph/inode.c247
-rw-r--r--fs/ceph/mds_client.c9
-rw-r--r--fs/ceph/mds_client.h1
-rw-r--r--fs/ceph/super.h13
-rw-r--r--include/linux/ceph/ceph_fs.h2
-rw-r--r--include/linux/ceph/mon_client.h11
-rw-r--r--net/ceph/ceph_common.c2
-rw-r--r--net/ceph/debugfs.c8
-rw-r--r--net/ceph/mon_client.c150
14 files changed, 670 insertions, 286 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4c95b503b09e..bbeb404b3a07 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
return -ENOENT;
(void) get_device(&rbd_dev->dev);
- set_device_ro(bdev, rbd_dev->mapping.read_only);
return 0;
}
@@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
put_device(&rbd_dev->dev);
}
+static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
+{
+ int ret = 0;
+ int val;
+ bool ro;
+ bool ro_changed = false;
+
+ /* get_user() may sleep, so call it before taking rbd_dev->lock */
+ if (get_user(val, (int __user *)(arg)))
+ return -EFAULT;
+
+ ro = val ? true : false;
+ /* Snapshot doesn't allow to write*/
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
+ return -EROFS;
+
+ spin_lock_irq(&rbd_dev->lock);
+ /* prevent others open this device */
+ if (rbd_dev->open_count > 1) {
+ ret = -EBUSY;
+ goto out;
+ }
+
+ if (rbd_dev->mapping.read_only != ro) {
+ rbd_dev->mapping.read_only = ro;
+ ro_changed = true;
+ }
+
+out:
+ spin_unlock_irq(&rbd_dev->lock);
+ /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
+ if (ret == 0 && ro_changed)
+ set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
+
+ return ret;
+}
+
+static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
+ unsigned int cmd, unsigned long arg)
+{
+ struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+ int ret = 0;
+
+ switch (cmd) {
+ case BLKROSET:
+ ret = rbd_ioctl_set_ro(rbd_dev, arg);
+ break;
+ default:
+ ret = -ENOTTY;
+ }
+
+ return ret;
+}
+
+#ifdef CONFIG_COMPAT
+static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
+ unsigned int cmd, unsigned long arg)
+{
+ return rbd_ioctl(bdev, mode, cmd, arg);
+}
+#endif /* CONFIG_COMPAT */
+
static const struct block_device_operations rbd_bd_ops = {
.owner = THIS_MODULE,
.open = rbd_open,
.release = rbd_release,
+ .ioctl = rbd_ioctl,
+#ifdef CONFIG_COMPAT
+ .compat_ioctl = rbd_compat_ioctl,
+#endif
};
/*
@@ -1382,6 +1447,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
kref_put(&obj_request->kref, rbd_obj_request_destroy);
}
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+ dout("%s: img %p (was %d)\n", __func__, img_request,
+ atomic_read(&img_request->kref.refcount));
+ kref_get(&img_request->kref);
+}
+
static bool img_request_child_test(struct rbd_img_request *img_request);
static void rbd_parent_request_destroy(struct kref *kref);
static void rbd_img_request_destroy(struct kref *kref);
@@ -2142,6 +2214,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
img_request->next_completion = which;
out:
spin_unlock_irq(&img_request->completion_lock);
+ rbd_img_request_put(img_request);
if (!more)
rbd_img_request_complete(img_request);
@@ -2242,6 +2315,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
goto out_unwind;
obj_request->osd_req = osd_req;
obj_request->callback = rbd_img_obj_callback;
+ rbd_img_request_get(img_request);
if (write_request) {
osd_req_op_alloc_hint_init(osd_req, which,
@@ -2872,56 +2946,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
}
/*
- * Request sync osd watch/unwatch. The value of "start" determines
- * whether a watch request is being initiated or torn down.
+ * Initiate a watch request, synchronously.
*/
-static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
- rbd_assert(start ^ !!rbd_dev->watch_event);
- rbd_assert(start ^ !!rbd_dev->watch_request);
+ rbd_assert(!rbd_dev->watch_event);
+ rbd_assert(!rbd_dev->watch_request);
- if (start) {
- ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
- &rbd_dev->watch_event);
- if (ret < 0)
- return ret;
- rbd_assert(rbd_dev->watch_event != NULL);
- }
+ ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+ &rbd_dev->watch_event);
+ if (ret < 0)
+ return ret;
+
+ rbd_assert(rbd_dev->watch_event);
- ret = -ENOMEM;
obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
- OBJ_REQUEST_NODATA);
- if (!obj_request)
+ OBJ_REQUEST_NODATA);
+ if (!obj_request) {
+ ret = -ENOMEM;
goto out_cancel;
+ }
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request);
- if (!obj_request->osd_req)
- goto out_cancel;
+ if (!obj_request->osd_req) {
+ ret = -ENOMEM;
+ goto out_put;
+ }
- if (start)
- ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
- else
- ceph_osdc_unregister_linger_request(osdc,
- rbd_dev->watch_request->osd_req);
+ ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
- rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
+ rbd_dev->watch_event->cookie, 0, 1);
rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
- goto out_cancel;
+ goto out_linger;
+
ret = rbd_obj_request_wait(obj_request);
if (ret)
- goto out_cancel;
+ goto out_linger;
+
ret = obj_request->result;
if (ret)
- goto out_cancel;
+ goto out_linger;
/*
* A watch request is set to linger, so the underlying osd
@@ -2931,36 +3004,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
* it. We'll drop that reference (below) after we've
* unregistered it.
*/
- if (start) {
- rbd_dev->watch_request = obj_request;
+ rbd_dev->watch_request = obj_request;
- return 0;
+ return 0;
+
+out_linger:
+ ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
+out_put:
+ rbd_obj_request_put(obj_request);
+out_cancel:
+ ceph_osdc_cancel_event(rbd_dev->watch_event);
+ rbd_dev->watch_event = NULL;
+
+ return ret;
+}
+
+/*
+ * Tear down a watch request, synchronously.
+ */
+static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ struct rbd_obj_request *obj_request;
+ int ret;
+
+ rbd_assert(rbd_dev->watch_event);
+ rbd_assert(rbd_dev->watch_request);
+
+ obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+ OBJ_REQUEST_NODATA);
+ if (!obj_request) {
+ ret = -ENOMEM;
+ goto out_cancel;
+ }
+
+ obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
+ obj_request);
+ if (!obj_request->osd_req) {
+ ret = -ENOMEM;
+ goto out_put;
}
+ osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
+ rbd_dev->watch_event->cookie, 0, 0);
+ rbd_osd_req_format_write(obj_request);
+
+ ret = rbd_obj_request_submit(osdc, obj_request);
+ if (ret)
+ goto out_put;
+
+ ret = rbd_obj_request_wait(obj_request);
+ if (ret)
+ goto out_put;
+
+ ret = obj_request->result;
+ if (ret)
+ goto out_put;
+
/* We have successfully torn down the watch request */
+ ceph_osdc_unregister_linger_request(osdc,
+ rbd_dev->watch_request->osd_req);
rbd_obj_request_put(rbd_dev->watch_request);
rbd_dev->watch_request = NULL;
+
+out_put:
+ rbd_obj_request_put(obj_request);
out_cancel:
- /* Cancel the event if we're tearing down, or on error */
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
- if (obj_request)
- rbd_obj_request_put(obj_request);
return ret;
}
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
-{
- return __rbd_dev_header_watch_sync(rbd_dev, true);
-}
-
static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
{
int ret;
- ret = __rbd_dev_header_watch_sync(rbd_dev, false);
+ ret = __rbd_dev_header_unwatch_sync(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
ret);
@@ -3058,7 +3179,6 @@ static void rbd_request_fn(struct request_queue *q)
__releases(q->queue_lock) __acquires(q->queue_lock)
{
struct rbd_device *rbd_dev = q->queuedata;
- bool read_only = rbd_dev->mapping.read_only;
struct request *rq;
int result;
@@ -3094,7 +3214,7 @@ static void rbd_request_fn(struct request_queue *q)
if (write_request) {
result = -EROFS;
- if (read_only)
+ if (rbd_dev->mapping.read_only)
goto end_request;
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
}
@@ -4683,6 +4803,38 @@ out_err:
}
/*
+ * Return pool id (>= 0) or a negative error code.
+ */
+static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
+{
+ u64 newest_epoch;
+ unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
+ int tries = 0;
+ int ret;
+
+again:
+ ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
+ if (ret == -ENOENT && tries++ < 1) {
+ ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
+ &newest_epoch);
+ if (ret < 0)
+ return ret;
+
+ if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
+ ceph_monc_request_next_osdmap(&rbdc->client->monc);
+ (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
+ newest_epoch, timeout);
+ goto again;
+ } else {
+ /* the osdmap we have is new enough */
+ return -ENOENT;
+ }
+ }
+
+ return ret;
+}
+
+/*
* An rbd format 2 image has a unique identifier, distinct from the
* name given to it by the user. Internally, that identifier is
* what's used to specify the names of objects related to the image.
@@ -4752,7 +4904,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
image_id = ceph_extract_encoded_string(&p, p + ret,
NULL, GFP_NOIO);
- ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+ ret = PTR_ERR_OR_ZERO(image_id);
if (!ret)
rbd_dev->image_format = 2;
} else {
@@ -4907,6 +5059,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
if (ret)
goto err_out_disk;
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+ set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
ret = rbd_bus_add_dev(rbd_dev);
if (ret)
@@ -5053,7 +5206,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
struct rbd_options *rbd_opts = NULL;
struct rbd_spec *spec = NULL;
struct rbd_client *rbdc;
- struct ceph_osd_client *osdc;
bool read_only;
int rc = -ENOMEM;
@@ -5075,8 +5227,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
}
/* pick the pool */
- osdc = &rbdc->client->osdc;
- rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+ rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
if (rc < 0)
goto err_out_client;
spec->pool_id = (u64)rc;
@@ -5387,6 +5538,7 @@ err_out_slab:
static void __exit rbd_exit(void)
{
+ ida_destroy(&rbd_dev_id_ida);
rbd_sysfs_cleanup();
if (single_major)
unregister_blkdev(rbd_major, RBD_DRV_NAME);
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 21887d63dad5..469f2e8657e8 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -104,12 +104,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
struct dentry *dentry;
- if (acl) {
- ret = posix_acl_valid(acl);
- if (ret < 0)
- goto out;
- }
-
switch (type) {
case ACL_TYPE_ACCESS:
name = POSIX_ACL_XATTR_ACCESS;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 4f3f69079f36..90b3954d48ed 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -211,18 +211,15 @@ static int readpage_nounlock(struct file *filp, struct page *page)
SetPageError(page);
ceph_fscache_readpage_cancel(inode, page);
goto out;
- } else {
- if (err < PAGE_CACHE_SIZE) {
- /* zero fill remainder of page */
- zero_user_segment(page, err, PAGE_CACHE_SIZE);
- } else {
- flush_dcache_page(page);
- }
}
- SetPageUptodate(page);
+ if (err < PAGE_CACHE_SIZE)
+ /* zero fill remainder of page */
+ zero_user_segment(page, err, PAGE_CACHE_SIZE);
+ else
+ flush_dcache_page(page);
- if (err >= 0)
- ceph_readpage_to_fscache(inode, page);
+ SetPageUptodate(page);
+ ceph_readpage_to_fscache(inode, page);
out:
return err < 0 ? err : 0;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index c561b628ebce..1fde164b74b5 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
return 0;
}
-static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
- struct ceph_cap_reservation *ctx)
+struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
struct ceph_cap *cap = NULL;
@@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
* it is < 0. (This is so we can atomically add the cap and add an
* open file reference to it.)
*/
-int ceph_add_cap(struct inode *inode,
- struct ceph_mds_session *session, u64 cap_id,
- int fmode, unsigned issued, unsigned wanted,
- unsigned seq, unsigned mseq, u64 realmino, int flags,
- struct ceph_cap_reservation *caps_reservation)
+void ceph_add_cap(struct inode *inode,
+ struct ceph_mds_session *session, u64 cap_id,
+ int fmode, unsigned issued, unsigned wanted,
+ unsigned seq, unsigned mseq, u64 realmino, int flags,
+ struct ceph_cap **new_cap)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_cap *new_cap = NULL;
struct ceph_cap *cap;
int mds = session->s_mds;
int actual_wanted;
@@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,
if (fmode >= 0)
wanted |= ceph_caps_for_mode(fmode);
-retry:
- spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
- if (new_cap) {
- cap = new_cap;
- new_cap = NULL;
- } else {
- spin_unlock(&ci->i_ceph_lock);
- new_cap = get_cap(mdsc, caps_reservation);
- if (new_cap == NULL)
- return -ENOMEM;
- goto retry;
- }
+ cap = *new_cap;
+ *new_cap = NULL;
cap->issued = 0;
cap->implemented = 0;
@@ -562,9 +551,6 @@ retry:
session->s_nr_caps++;
spin_unlock(&session->s_cap_lock);
} else {
- if (new_cap)
- ceph_put_cap(mdsc, new_cap);
-
/*
* auth mds of the inode changed. we received the cap export
* message, but still haven't received the cap import message.
@@ -626,7 +612,6 @@ retry:
ci->i_auth_cap = cap;
cap->mds_wanted = wanted;
}
- ci->i_cap_exporting_issued = 0;
} else {
WARN_ON(ci->i_auth_cap == cap);
}
@@ -648,9 +633,6 @@ retry:
if (fmode >= 0)
__ceph_get_fmode(ci, fmode);
- spin_unlock(&ci->i_ceph_lock);
- wake_up_all(&ci->i_cap_wq);
- return 0;
}
/*
@@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
*/
int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
{
- int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
+ int have = ci->i_snap_caps;
struct ceph_cap *cap;
struct rb_node *p;
@@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
*/
static int __ceph_is_any_caps(struct ceph_inode_info *ci)
{
- return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
+ return !RB_EMPTY_ROOT(&ci->i_caps);
}
int ceph_is_any_caps(struct inode *inode)
@@ -2397,32 +2379,30 @@ static void invalidate_aliases(struct inode *inode)
* actually be a revocation if it specifies a smaller cap set.)
*
* caller holds s_mutex and i_ceph_lock, we drop both.
- *
- * return value:
- * 0 - ok
- * 1 - check_caps on auth cap only (writeback)
- * 2 - check_caps (ack revoke)
*/
-static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+static void handle_cap_grant(struct ceph_mds_client *mdsc,
+ struct inode *inode, struct ceph_mds_caps *grant,
+ void *snaptrace, int snaptrace_len,
+ struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
- struct ceph_cap *cap,
- struct ceph_buffer *xattr_buf)
- __releases(ci->i_ceph_lock)
+ struct ceph_cap *cap, int issued)
+ __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
int seq = le32_to_cpu(grant->seq);
int newcaps = le32_to_cpu(grant->caps);
- int issued, implemented, used, wanted, dirty;
+ int used, wanted, dirty;
u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size);
struct timespec mtime, atime, ctime;
int check_caps = 0;
- int wake = 0;
- int writeback = 0;
- int queue_invalidate = 0;
- int deleted_inode = 0;
- int queue_revalidate = 0;
+ bool wake = 0;
+ bool writeback = 0;
+ bool queue_trunc = 0;
+ bool queue_invalidate = 0;
+ bool queue_revalidate = 0;
+ bool deleted_inode = 0;
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2466,16 +2446,13 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
}
/* side effects now are allowed */
-
- issued = __ceph_caps_issued(ci, &implemented);
- issued |= implemented | __ceph_caps_dirty(ci);
-
cap->cap_gen = session->s_cap_gen;
cap->seq = seq;
__check_cap_issue(ci, cap, newcaps);
- if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+ if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
+ (issued & CEPH_CAP_AUTH_EXCL) == 0) {
inode->i_mode = le32_to_cpu(grant->mode);
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
@@ -2484,7 +2461,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
from_kgid(&init_user_ns, inode->i_gid));
}
- if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+ if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
+ (issued & CEPH_CAP_LINK_EXCL) == 0) {
set_nlink(inode, le32_to_cpu(grant->nlink));
if (inode->i_nlink == 0 &&
(newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
@@ -2511,30 +2489,35 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
queue_revalidate = 1;
- /* size/ctime/mtime/atime? */
- ceph_fill_file_size(inode, issued,
- le32_to_cpu(grant->truncate_seq),
- le64_to_cpu(grant->truncate_size), size);
- ceph_decode_timespec(&mtime, &grant->mtime);
- ceph_decode_timespec(&atime, &grant->atime);
- ceph_decode_timespec(&ctime, &grant->ctime);
- ceph_fill_file_time(inode, issued,
- le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
- &atime);
-
-
- /* file layout may have changed */
- ci->i_layout = grant->layout;
-
- /* max size increase? */
- if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
- dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
- ci->i_max_size = max_size;
- if (max_size >= ci->i_wanted_max_size) {
- ci->i_wanted_max_size = 0; /* reset */
- ci->i_requested_max_size = 0;
+ if (newcaps & CEPH_CAP_ANY_RD) {
+ /* ctime/mtime/atime? */
+ ceph_decode_timespec(&mtime, &grant->mtime);
+ ceph_decode_timespec(&atime, &grant->atime);
+ ceph_decode_timespec(&ctime, &grant->ctime);
+ ceph_fill_file_time(inode, issued,
+ le32_to_cpu(grant->time_warp_seq),
+ &ctime, &mtime, &atime);
+ }
+
+ if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
+ /* file layout may have changed */
+ ci->i_layout = grant->layout;
+ /* size/truncate_seq? */
+ queue_trunc = ceph_fill_file_size(inode, issued,
+ le32_to_cpu(grant->truncate_seq),
+ le64_to_cpu(grant->truncate_size),
+ size);
+ /* max size increase? */
+ if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
+ dout("max_size %lld -> %llu\n",
+ ci->i_max_size, max_size);
+ ci->i_max_size = max_size;
+ if (max_size >= ci->i_wanted_max_size) {
+ ci->i_wanted_max_size = 0; /* reset */
+ ci->i_requested_max_size = 0;
+ }
+ wake = 1;
}
- wake = 1;
}
/* check cap bits */
@@ -2595,6 +2578,23 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
spin_unlock(&ci->i_ceph_lock);
+ if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
+ down_write(&mdsc->snap_rwsem);
+ ceph_update_snap_trace(mdsc, snaptrace,
+ snaptrace + snaptrace_len, false);
+ downgrade_write(&mdsc->snap_rwsem);
+ kick_flushing_inode_caps(mdsc, session, inode);
+ up_read(&mdsc->snap_rwsem);
+ if (newcaps & ~issued)
+ wake = 1;
+ }
+
+ if (queue_trunc) {
+ ceph_queue_vmtruncate(inode);
+ ceph_queue_revalidate(inode);
+ } else if (queue_revalidate)
+ ceph_queue_revalidate(inode);
+
if (writeback)
/*
* queue inode for writeback: we can't actually call
@@ -2606,8 +2606,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
ceph_queue_invalidate(inode);
if (deleted_inode)
invalidate_aliases(inode);
- if (queue_revalidate)
- ceph_queue_revalidate(inode);
if (wake)
wake_up_all(&ci->i_cap_wq);
@@ -2784,7 +2782,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_session *tsession = NULL;
- struct ceph_cap *cap, *tcap;
+ struct ceph_cap *cap, *tcap, *new_cap = NULL;
struct ceph_inode_info *ci = ceph_inode(inode);
u64 t_cap_id;
unsigned mseq = le32_to_cpu(ex->migrate_seq);
@@ -2807,7 +2805,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
retry:
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
- if (!cap)
+ if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
goto out_unlock;
if (target < 0) {
@@ -2846,15 +2844,14 @@ retry:
}
__ceph_remove_cap(cap, false);
goto out_unlock;
- }
-
- if (tsession) {
- int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
- spin_unlock(&ci->i_ceph_lock);
+ } else if (tsession) {
/* add placeholder for the export tagert */
+ int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
- t_seq - 1, t_mseq, (u64)-1, flag, NULL);
- goto retry;
+ t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
+
+ __ceph_remove_cap(cap, false);
+ goto out_unlock;
}
spin_unlock(&ci->i_ceph_lock);
@@ -2873,6 +2870,7 @@ retry:
SINGLE_DEPTH_NESTING);
}
ceph_add_cap_releases(mdsc, tsession);
+ new_cap = ceph_get_cap(mdsc, NULL);
} else {
WARN_ON(1);
tsession = NULL;
@@ -2887,24 +2885,27 @@ out_unlock:
mutex_unlock(&tsession->s_mutex);
ceph_put_mds_session(tsession);
}
+ if (new_cap)
+ ceph_put_cap(mdsc, new_cap);
}
/*
- * Handle cap IMPORT. If there are temp bits from an older EXPORT,
- * clean them up.
+ * Handle cap IMPORT.
*
- * caller holds s_mutex.
+ * caller holds s_mutex. acquires i_ceph_lock
*/
static void handle_cap_import(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *im,
struct ceph_mds_cap_peer *ph,
struct ceph_mds_session *session,
- void *snaptrace, int snaptrace_len)
+ struct ceph_cap **target_cap, int *old_issued)
+ __acquires(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_cap *cap;
+ struct ceph_cap *cap, *ocap, *new_cap = NULL;
int mds = session->s_mds;
- unsigned issued = le32_to_cpu(im->caps);
+ int issued;
+ unsigned caps = le32_to_cpu(im->caps);
unsigned wanted = le32_to_cpu(im->wanted);
unsigned seq = le32_to_cpu(im->seq);
unsigned mseq = le32_to_cpu(im->migrate_seq);
@@ -2924,40 +2925,52 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
inode, ci, mds, mseq, peer);
+retry:
spin_lock(&ci->i_ceph_lock);
- cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
- if (cap && cap->cap_id == p_cap_id) {
+ cap = __get_cap_for_mds(ci, mds);
+ if (!cap) {
+ if (!new_cap) {
+ spin_unlock(&ci->i_ceph_lock);
+ new_cap = ceph_get_cap(mdsc, NULL);
+ goto retry;
+ }
+ cap = new_cap;
+ } else {
+ if (new_cap) {
+ ceph_put_cap(mdsc, new_cap);
+ new_cap = NULL;
+ }
+ }
+
+ __ceph_caps_issued(ci, &issued);
+ issued |= __ceph_caps_dirty(ci);
+
+ ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
+ realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
+
+ ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
+ if (ocap && ocap->cap_id == p_cap_id) {
dout(" remove export cap %p mds%d flags %d\n",
- cap, peer, ph->flags);
+ ocap, peer, ph->flags);
if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
- (cap->seq != le32_to_cpu(ph->seq) ||
- cap->mseq != le32_to_cpu(ph->mseq))) {
+ (ocap->seq != le32_to_cpu(ph->seq) ||
+ ocap->mseq != le32_to_cpu(ph->mseq))) {
pr_err("handle_cap_import: mismatched seq/mseq: "
"ino (%llx.%llx) mds%d seq %d mseq %d "
"importer mds%d has peer seq %d mseq %d\n",
- ceph_vinop(inode), peer, cap->seq,
- cap->mseq, mds, le32_to_cpu(ph->seq),
+ ceph_vinop(inode), peer, ocap->seq,
+ ocap->mseq, mds, le32_to_cpu(ph->seq),
le32_to_cpu(ph->mseq));
}
- ci->i_cap_exporting_issued = cap->issued;
- __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
+ __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
}
/* make sure we re-request max_size, if necessary */
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
- spin_unlock(&ci->i_ceph_lock);
-
- down_write(&mdsc->snap_rwsem);
- ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
- false);
- downgrade_write(&mdsc->snap_rwsem);
- ceph_add_cap(inode, session, cap_id, -1,
- issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
- NULL /* no caps context */);
- kick_flushing_inode_caps(mdsc, session, inode);
- up_read(&mdsc->snap_rwsem);
+ *old_issued = issued;
+ *target_cap = cap;
}
/*
@@ -2977,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
int mds = session->s_mds;
- int op;
+ int op, issued;
u32 seq, mseq;
struct ceph_vino vino;
u64 cap_id;
@@ -3069,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, peer, session,
- snaptrace, snaptrace_len);
+ &cap, &issued);
+ handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
+ msg->middle, session, cap, issued);
+ goto done_unlocked;
}
/* the rest require a cap */
@@ -3086,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
switch (op) {
case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT:
- case CEPH_CAP_OP_IMPORT:
- handle_cap_grant(inode, h, session, cap, msg->middle);
+ __ceph_caps_issued(ci, &issued);
+ issued |= __ceph_caps_dirty(ci);
+ handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
+ session, cap, issued);
goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 00d6af6a32ec..8d7d782f4382 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -169,7 +169,7 @@ static struct dentry *__get_parent(struct super_block *sb,
return dentry;
}
-struct dentry *ceph_get_parent(struct dentry *child)
+static struct dentry *ceph_get_parent(struct dentry *child)
{
/* don't re-export snaps */
if (ceph_snap(child->d_inode) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e4fff9ff1c27..04c89c266cec 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -10,6 +10,7 @@
#include <linux/writeback.h>
#include <linux/vmalloc.h>
#include <linux/posix_acl.h>
+#include <linux/random.h>
#include "super.h"
#include "mds_client.h"
@@ -179,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
* specified, copy the frag delegation info to the caller if
* it is present.
*/
-u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
- struct ceph_inode_frag *pfrag,
- int *found)
+static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+ struct ceph_inode_frag *pfrag, int *found)
{
u32 t = ceph_frag_make(0, 0);
struct ceph_inode_frag *frag;
@@ -191,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
if (found)
*found = 0;
- mutex_lock(&ci->i_fragtree_mutex);
while (1) {
WARN_ON(!ceph_frag_contains_value(t, v));
frag = __ceph_find_frag(ci, t);
@@ -220,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
}
dout("choose_frag(%x) = %x\n", v, t);
- mutex_unlock(&ci->i_fragtree_mutex);
return t;
}
+u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+ struct ceph_inode_frag *pfrag, int *found)
+{
+ u32 ret;
+ mutex_lock(&ci->i_fragtree_mutex);
+ ret = __ceph_choose_frag(ci, v, pfrag, found);
+ mutex_unlock(&ci->i_fragtree_mutex);
+ return ret;
+}
+
/*
* Process dirfrag (delegation) info from the mds. Include leaf
* fragment in tree ONLY if ndist > 0. Otherwise, only
@@ -237,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode,
u32 id = le32_to_cpu(dirinfo->frag);
int mds = le32_to_cpu(dirinfo->aut