summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 14:14:42 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 14:14:42 -0800
commit4533f6e27a366ecc3da4876074ebfe0cc0ea4f0f (patch)
tree8b6f1aeeda991e6a1ce98702d7cc35d2d2a444b1
parent89d3fa45b4add00cd0056361a2498e978cb1e119 (diff)
parent0f5417cea6cfeafd5cdec4223df63ca79918fdea (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "On the RBD side, there is a conversion to blk-mq from Christoph, several long-standing bug fixes from Ilya, and some cleanup from Rickard Strandqvist. On the CephFS side there is a long list of fixes from Zheng, including improved session handling, a few IO path fixes, some dcache management correctness fixes, and several blocking while !TASK_RUNNING fixes. The core code gets a few cleanups and Chaitanya has added support for TCP_NODELAY (which has been used on the server side for ages but we somehow missed on the kernel client). There is also an update to MAINTAINERS to fix up some email addresses and reflect that Ilya and Zheng are doing most of the maintenance for RBD and CephFS these days. Do not be surprised to see a pull request come from one of them in the future if I am unavailable for some reason" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits) MAINTAINERS: update Ceph and RBD maintainers libceph: kfree() in put_osd() shouldn't depend on authorizer libceph: fix double __remove_osd() problem rbd: convert to blk-mq ceph: return error for traceless reply race ceph: fix dentry leaks ceph: re-send requests when MDS enters reconnecting stage ceph: show nocephx_require_signatures and notcp_nodelay options libceph: tcp_nodelay support rbd: do not treat standalone as flatten ceph: fix atomic_open snapdir ceph: properly mark empty directory as complete client: include kernel version in client metadata ceph: provide seperate {inode,file}_operations for snapdir ceph: fix request time stamp encoding ceph: fix reading inline data when i_size > PAGE_SIZE ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_close_sessions) ceph: avoid block operation when !TASK_RUNNING (ceph_get_caps) ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync) rbd: fix error paths in rbd_dev_refresh() ...
-rw-r--r--MAINTAINERS7
-rw-r--r--drivers/block/rbd.c193
-rw-r--r--fs/ceph/acl.c14
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/caps.c127
-rw-r--r--fs/ceph/dir.c33
-rw-r--r--fs/ceph/file.c37
-rw-r--r--fs/ceph/inode.c41
-rw-r--r--fs/ceph/mds_client.c127
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/snap.c54
-rw-r--r--fs/ceph/super.c4
-rw-r--r--fs/ceph/super.h5
-rw-r--r--include/linux/ceph/ceph_fs.h37
-rw-r--r--include/linux/ceph/libceph.h3
-rw-r--r--include/linux/ceph/messenger.h4
-rw-r--r--include/linux/ceph/mon_client.h9
-rw-r--r--net/ceph/ceph_common.c16
-rw-r--r--net/ceph/ceph_strings.c14
-rw-r--r--net/ceph/debugfs.c2
-rw-r--r--net/ceph/messenger.c14
-rw-r--r--net/ceph/mon_client.c139
-rw-r--r--net/ceph/osd_client.c31
23 files changed, 444 insertions, 488 deletions
diff --git a/MAINTAINERS b/MAINTAINERS
index 1921ed58d1a0..7cfcee4e2bea 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2433,7 +2433,8 @@ F: arch/powerpc/oprofile/*cell*
F: arch/powerpc/platforms/cell/
CEPH DISTRIBUTED FILE SYSTEM CLIENT
-M: Sage Weil <sage@inktank.com>
+M: Yan, Zheng <zyan@redhat.com>
+M: Sage Weil <sage@redhat.com>
L: ceph-devel@vger.kernel.org
W: http://ceph.com/
T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
@@ -7998,8 +7999,8 @@ S: Supported
F: drivers/net/wireless/ath/wcn36xx/
RADOS BLOCK DEVICE (RBD)
-M: Yehuda Sadeh <yehuda@inktank.com>
-M: Sage Weil <sage@inktank.com>
+M: Ilya Dryomov <idryomov@gmail.com>
+M: Sage Weil <sage@redhat.com>
M: Alex Elder <elder@kernel.org>
M: ceph-devel@vger.kernel.org
W: http://ceph.com/
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8a86b62466f7..b40af3203089 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
#include <linux/kernel.h>
#include <linux/device.h>
#include <linux/module.h>
+#include <linux/blk-mq.h>
#include <linux/fs.h>
#include <linux/blkdev.h>
#include <linux/slab.h>
@@ -340,9 +341,7 @@ struct rbd_device {
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
- struct list_head rq_queue; /* incoming rq queue */
spinlock_t lock; /* queue, flags, open_count */
- struct work_struct rq_work;
struct rbd_image_header header;
unsigned long flags; /* possibly lock protected */
@@ -360,6 +359,9 @@ struct rbd_device {
atomic_t parent_ref;
struct rbd_device *parent;
+ /* Block layer tags. */
+ struct blk_mq_tag_set tag_set;
+
/* protects updating the header */
struct rw_semaphore header_rwsem;
@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
/*
* We support a 64-bit length, but ultimately it has to be
- * passed to blk_end_request(), which takes an unsigned int.
+ * passed to the block layer, which just supports a 32-bit
+ * length field.
*/
obj_request->xferred = osd_req->r_reply_op_len[0];
rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
more = obj_request->which < img_request->obj_request_count - 1;
} else {
rbd_assert(img_request->rq != NULL);
- more = blk_end_request(img_request->rq, result, xferred);
+
+ more = blk_update_request(img_request->rq, result, xferred);
+ if (!more)
+ __blk_mq_end_request(img_request->rq, result);
}
return more;
@@ -3304,8 +3310,10 @@ out:
return ret;
}
-static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
+static void rbd_queue_workfn(struct work_struct *work)
{
+ struct request *rq = blk_mq_rq_from_pdu(work);
+ struct rbd_device *rbd_dev = rq->q->queuedata;
struct rbd_img_request *img_request;
struct ceph_snap_context *snapc = NULL;
u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
u64 mapping_size;
int result;
+ if (rq->cmd_type != REQ_TYPE_FS) {
+ dout("%s: non-fs request type %d\n", __func__,
+ (int) rq->cmd_type);
+ result = -EIO;
+ goto err;
+ }
+
if (rq->cmd_flags & REQ_DISCARD)
op_type = OBJ_OP_DISCARD;
else if (rq->cmd_flags & REQ_WRITE)
@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
goto err_rq; /* Shouldn't happen */
}
+ blk_mq_start_request(rq);
+
down_read(&rbd_dev->header_rwsem);
mapping_size = rbd_dev->mapping.size;
if (op_type != OBJ_OP_READ) {
@@ -3404,53 +3421,18 @@ err_rq:
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
obj_op_name(op_type), length, offset, result);
ceph_put_snap_context(snapc);
- blk_end_request_all(rq, result);
+err:
+ blk_mq_end_request(rq, result);
}
-static void rbd_request_workfn(struct work_struct *work)
+static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
+ const struct blk_mq_queue_data *bd)
{
- struct rbd_device *rbd_dev =
- container_of(work, struct rbd_device, rq_work);
- struct request *rq, *next;
- LIST_HEAD(requests);
-
- spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
- list_splice_init(&rbd_dev->rq_queue, &requests);
- spin_unlock_irq(&rbd_dev->lock);
+ struct request *rq = bd->rq;
+ struct work_struct *work = blk_mq_rq_to_pdu(rq);
- list_for_each_entry_safe(rq, next, &requests, queuelist) {
- list_del_init(&rq->queuelist);
- rbd_handle_request(rbd_dev, rq);
- }
-}
-
-/*
- * Called with q->queue_lock held and interrupts disabled, possibly on
- * the way to schedule(). Do not sleep here!
- */
-static void rbd_request_fn(struct request_queue *q)
-{
- struct rbd_device *rbd_dev = q->queuedata;
- struct request *rq;
- int queued = 0;
-
- rbd_assert(rbd_dev);
-
- while ((rq = blk_fetch_request(q))) {
- /* Ignore any non-FS requests that filter through. */
- if (rq->cmd_type != REQ_TYPE_FS) {
- dout("%s: non-fs request type %d\n", __func__,
- (int) rq->cmd_type);
- __blk_end_request_all(rq, 0);
- continue;
- }
-
- list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
- queued++;
- }
-
- if (queued)
- queue_work(rbd_wq, &rbd_dev->rq_work);
+ queue_work(rbd_wq, work);
+ return BLK_MQ_RQ_QUEUE_OK;
}
/*
@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
del_gendisk(disk);
if (disk->queue)
blk_cleanup_queue(disk->queue);
+ blk_mq_free_tag_set(&rbd_dev->tag_set);
}
put_disk(disk);
}
@@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
ret = rbd_dev_header_info(rbd_dev);
if (ret)
- return ret;
+ goto out;
/*
* If there is a parent, see if it has disappeared due to the
@@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
if (rbd_dev->parent) {
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
- return ret;
+ goto out;
}
if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
- if (rbd_dev->mapping.size != rbd_dev->header.image_size)
- rbd_dev->mapping.size = rbd_dev->header.image_size;
+ rbd_dev->mapping.size = rbd_dev->header.image_size;
} else {
/* validate mapped snapshot's EXISTS flag */
rbd_exists_validate(rbd_dev);
}
+out:
up_write(&rbd_dev->header_rwsem);
-
- if (mapping_size != rbd_dev->mapping.size)
+ if (!ret && mapping_size != rbd_dev->mapping.size)
rbd_dev_update_size(rbd_dev);
+ return ret;
+}
+
+static int rbd_init_request(void *data, struct request *rq,
+ unsigned int hctx_idx, unsigned int request_idx,
+ unsigned int numa_node)
+{
+ struct work_struct *work = blk_mq_rq_to_pdu(rq);
+
+ INIT_WORK(work, rbd_queue_workfn);
return 0;
}
+static struct blk_mq_ops rbd_mq_ops = {
+ .queue_rq = rbd_queue_rq,
+ .map_queue = blk_mq_map_queue,
+ .init_request = rbd_init_request,
+};
+
static int rbd_init_disk(struct rbd_device *rbd_dev)
{
struct gendisk *disk;
struct request_queue *q;
u64 segment_size;
+ int err;
/* create gendisk info */
disk = alloc_disk(single_major ?
@@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
disk->fops = &rbd_bd_ops;
disk->private_data = rbd_dev;
- q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
- if (!q)
+ memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
+ rbd_dev->tag_set.ops = &rbd_mq_ops;
+ rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
+ rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
+ rbd_dev->tag_set.flags =
+ BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
+ rbd_dev->tag_set.nr_hw_queues = 1;
+ rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
+
+ err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
+ if (err)
goto out_disk;
+ q = blk_mq_init_queue(&rbd_dev->tag_set);
+ if (IS_ERR(q)) {
+ err = PTR_ERR(q);
+ goto out_tag_set;
+ }
+
/* We use the default size, but let's be explicit about it. */
blk_queue_physical_block_size(q, SECTOR_SIZE);
@@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
rbd_dev->disk = disk;
return 0;
+out_tag_set:
+ blk_mq_free_tag_set(&rbd_dev->tag_set);
out_disk:
put_disk(disk);
-
- return -ENOMEM;
+ return err;
}
/*
@@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
return NULL;
spin_lock_init(&rbd_dev->lock);
- INIT_LIST_HEAD(&rbd_dev->rq_queue);
- INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node);
@@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
}
/*
- * We always update the parent overlap. If it's zero we
- * treat it specially.
+ * We always update the parent overlap. If it's zero we issue
+ * a warning, as we will proceed as if there was no parent.
*/
- rbd_dev->parent_overlap = overlap;
if (!overlap) {
-
- /* A null parent_spec indicates it's the initial probe */
-
if (parent_spec) {
- /*
- * The overlap has become zero, so the clone
- * must have been resized down to 0 at some
- * point. Treat this the same as a flatten.
- */
- rbd_dev_parent_put(rbd_dev);
- pr_info("%s: clone image now standalone\n",
- rbd_dev->disk->disk_name);
+ /* refresh, careful to warn just once */
+ if (rbd_dev->parent_overlap)
+ rbd_warn(rbd_dev,
+ "clone now standalone (overlap became 0)");
} else {
- /*
- * For the initial probe, if we find the
- * overlap is zero we just pretend there was
- * no parent image.
- */
- rbd_warn(rbd_dev, "ignoring parent with overlap 0");
+ /* initial probe */
+ rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
}
}
+ rbd_dev->parent_overlap = overlap;
+
out:
ret = 0;
out_err:
@@ -4771,36 +4774,6 @@ static inline size_t next_token(const char **buf)
}
/*
- * Finds the next token in *buf, and if the provided token buffer is
- * big enough, copies the found token into it. The result, if
- * copied, is guaranteed to be terminated with '\0'. Note that *buf
- * must be terminated with '\0' on entry.
- *
- * Returns the length of the token found (not including the '\0').
- * Return value will be 0 if no token is found, and it will be >=
- * token_size if the token would not fit.
- *
- * The *buf pointer will be updated to point beyond the end of the
- * found token. Note that this occurs even if the token buffer is
- * too small to hold it.
- */
-static inline size_t copy_token(const char **buf,
- char *token,
- size_t token_size)
-{
- size_t len;
-
- len = next_token(buf);
- if (len < token_size) {
- memcpy(token, *buf, len);
- *(token + len) = '\0';
- }
- *buf += len;
-
- return len;
-}
-
-/*
* Finds the next token in *buf, dynamically allocates a buffer big
* enough to hold a copy of it, and copies the token into the new
* buffer. The copy is guaranteed to be terminated with '\0'. Note
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 5bd853ba44ff..64fa248343f6 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode,
spin_unlock(&ci->i_ceph_lock);
}
-static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
- int type)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct posix_acl *acl = ACL_NOT_CACHED;
-
- spin_lock(&ci->i_ceph_lock);
- if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
- acl = get_cached_acl(inode, type);
- spin_unlock(&ci->i_ceph_lock);
-
- return acl;
-}
-
struct posix_acl *ceph_get_acl(struct inode *inode, int type)
{
int size;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 24be059fd1f8..fd5599d32362 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page)
u64 len = PAGE_CACHE_SIZE;
if (off >= i_size_read(inode)) {
- zero_user_segment(page, err, PAGE_CACHE_SIZE);
+ zero_user_segment(page, 0, PAGE_CACHE_SIZE);
SetPageUptodate(page);
return 0;
}
- /*
- * Uptodate inline data should have been added into page cache
- * while getting Fcr caps.
- */
- if (ci->i_inline_version != CEPH_INLINE_NONE)
- return -EINVAL;
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ /*
+ * Uptodate inline data should have been added
+ * into page cache while getting Fcr caps.
+ */
+ if (off == 0)
+ return -EINVAL;
+ zero_user_segment(page, 0, PAGE_CACHE_SIZE);
+ SetPageUptodate(page);
+ return 0;
+ }
err = ceph_readpage_from_fscache(inode, page);
if (err == 0)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b93c631c6c87..8172775428a0 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
realmino);
if (realm) {
- ceph_get_snap_realm(mdsc, realm);
spin_lock(&realm->inodes_with_caps_lock);
ci->i_snap_realm = realm;
list_add(&ci->i_snap_realm_item,
@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode,
spin_lock(&mdsc->cap_dirty_lock);
list_del_init(&ci->i_dirty_item);
- ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
if (list_empty(&ci->i_flushing_item)) {
+ ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
mdsc->num_cap_flushing++;
dout(" inode %p now flushing seq %lld\n", inode,
@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS.
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
- loff_t endoff, int *got, struct page **pinned_page,
- int *check_max, int *err)
+ loff_t endoff, int *got, int *check_max, int *err)
{
struct inode *inode = &ci->vfs_inode;
int ret = 0;
- int have, implemented, _got = 0;
+ int have, implemented;
int file_wanted;
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
-again:
+
spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
@@ -2138,50 +2136,34 @@ again:
inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking));
if ((revoking & not) == 0) {
- _got = need | (have & want);
- __take_cap_refs(ci, _got);
+ *got = need | (have & want);
+ __take_cap_refs(ci, *got);
ret = 1;
}
} else {
+ int session_readonly = false;
+ if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
+ struct ceph_mds_session *s = ci->i_auth_cap->session;
+ spin_lock(&s->s_cap_lock);
+ session_readonly = s->s_readonly;
+ spin_unlock(&s->s_cap_lock);
+ }
+ if (session_readonly) {
+ dout("get_cap_refs %p needed %s but mds%d readonly\n",
+ inode, ceph_cap_string(need), ci->i_auth_cap->mds);
+ *err = -EROFS;
+ ret = 1;
+ goto out_unlock;
+ }
+
dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need));
}
out_unlock:
spin_unlock(&ci->i_ceph_lock);
- if (ci->i_inline_version != CEPH_INLINE_NONE &&
- (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
- i_size_read(inode) > 0) {
- int ret1;
- struct page *page = find_get_page(inode->i_mapping, 0);
- if (page) {
- if (PageUptodate(page)) {
- *pinned_page = page;
- goto out;
- }
- page_cache_release(page);
- }
- /*
- * drop cap refs first because getattr while holding
- * caps refs can cause deadlock.
- */
- ceph_put_cap_refs(ci, _got);
- _got = 0;
-
- /* getattr request will bring inline data into page cache */
- ret1 = __ceph_do_getattr(inode, NULL,
- CEPH_STAT_CAP_INLINE_DATA, true);
- if (ret1 >= 0) {
- ret = 0;
- goto again;
- }
- *err = ret1;
- ret = 1;
- }
-out:
dout("get_cap_refs %p ret %d got %s\n", inode,
- ret, ceph_cap_string(_got));
- *got = _got;
+ ret, ceph_cap_string(*got));
return ret;
}
@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff)
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
- int check_max, ret, err;
+ int _got, check_max, ret, err = 0;
retry:
if (endoff > 0)
check_max_size(&ci->vfs_inode, endoff);
+ _got = 0;
check_max = 0;
- err = 0;
ret = wait_event_interruptible(ci->i_cap_wq,
- try_get_cap_refs(ci, need, want, endoff,
- got, pinned_page,
- &check_max, &err));
+ try_get_cap_refs(ci, need, want, endoff,
+ &_got, &check_max, &err));
if (err)
ret = err;
+ if (ret < 0)
+ return ret;
+
if (check_max)
goto retry;
- return ret;
+
+ if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ i_size_read(&ci->vfs_inode) > 0) {
+ struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
+ if (page) {
+ if (PageUptodate(page)) {
+ *pinned_page = page;
+ goto out;
+ }
+ page_cache_release(page);
+ }
+ /*
+ * drop cap refs first because getattr while holding
+ * caps refs can cause deadlock.
+ */
+ ceph_put_cap_refs(ci, _got);
+ _got = 0;
+
+ /* getattr request will bring inline data into page cache */
+ ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+ CEPH_STAT_CAP_INLINE_DATA, true);
+ if (ret < 0)
+ return ret;
+ goto retry;
+ }
+out:
+ *got = _got;
+ return 0;
}
/*
@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode)
*/
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
- void *snaptrace, int snaptrace_len,
u64 inline_version,
void *inline_data, int inline_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued)
__releases(ci->i_ceph_lock)
+ __releases(mdsc->snap_rwsem)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
- down_write(&mdsc->snap_rwsem);
- ceph_update_snap_trace(mdsc, snaptrace,
- snaptrace + snaptrace_len, false);
- downgrade_write(&mdsc->snap_rwsem);
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
if (newcaps & ~issued)
@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_cap *cap;
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
+ struct ceph_snap_realm *realm;
int mds = session->s_mds;
int op, issued;
u32 seq, mseq;
@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
goto done_unlocked;
case CEPH_CAP_OP_IMPORT:
+ realm = NULL;
+ if (snaptrace_len) {
+ down_write(&mdsc->snap_rwsem);
+ ceph_update_snap_trace(mdsc, snaptrace,
+ snaptrace + snaptrace_len,
+ false, &realm);
+ downgrade_write(&mdsc->snap_rwsem);
+ } else {
+ down_read(&mdsc->snap_rwsem);
+ }
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
- handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
+ handle_cap_grant(mdsc, inode, h,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
+ if (realm)
+ ceph_put_snap_realm(mdsc, realm);
goto done_unlocked;
}
@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
- handle_cap_grant(mdsc, inode, h, NULL, 0,
+ handle_cap_grant(mdsc, inode, h,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
goto done_unlocked;
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index c241603764fd..0411dbb15815 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -26,8 +26,6 @@
* point by name.
*/
-const struct inode_operations ceph_dir_iops;
-const struct file_operations ceph_dir_fops;
const struct dentry_operations ceph_dentry_ops;
/*
@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
/*
* We created the item, then did a lookup, and found
* it was already linked to another inode we already
- * had in our cache (and thus got spliced). Link our
- * dentry to that inode, but don't hash it, just in
- * case the VFS wants to dereference it.
+ * had in our cache (and thus got spliced). To not
+ * confuse VFS (especially when inode is a directory),
+ * we don't link our dentry to that inode, return an
+ * error instead.
+ *
+ * This event should be rare and it happens only when
+ * we talk to old MDS. Recent MDS does not send traceless
+ * reply for request that creates new inode.
*/
- BUG_ON(!result->d_inode);
- d_instantiate(dentry, result->d_inode);
- return 0;
+ d_drop(result);
+ return -ESTALE;
}
return PTR_ERR(result);
}
@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = {
.fsync = ceph_dir_fsync,
};
+const struct file_operations ceph_snapdir_fops = {
+ .iterate = ceph_readdir,
+ .llseek = ceph_dir_llseek,
+ .open = ceph_open,
+ .release = ceph_release,
+};
+
const struct inode_operations ceph_dir_iops = {
.lookup = ceph_lookup,
.permission = ceph_permission,
@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = {
.atomic_open = ceph_atomic_open,
};
+const struct inode_operations ceph_snapdir_iops = {
+ .lookup = ceph_lookup,
+ .permission = ceph_permission,
+ .getattr = ceph_getattr,
+ .mkdir = ceph_mkdir,
+ .rmdir = ceph_unlink,
+};
+
const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_release = ceph_d_release,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 905986dd4c3c..a3d774b35149 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
err = ceph_mdsc_do_request(mdsc,
(flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
req);
+ err = ceph_handle_snapdir(req, dentry, err);
if (err)
goto out_req;
- err = ceph_handle_snapdir(req, dentry, err);
if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
@@ -392,13 +392,14 @@ more:
if (ret >= 0) {
int didpages;
if (was_short && (pos + ret < inode->i_size)) {
- u64 tmp = min(this_len - ret,
- inode->i_size - pos - ret);
+ int zlen = min(this_len - ret,
+ inode->i_size - pos - ret);
+ int zoff = (o_direct ? buf_align : io_align) +
+ read + ret;
dout(" zero gap %llu to %llu\n",
- pos + ret, pos + ret + tmp);
- ceph_zero_page_vector_range(page_align + read + ret,
- tmp, pages);
- ret += tmp;
+ pos + ret, pos + ret + zlen);
+ ceph_zero_page_vector_range(zoff, zlen, pages);
+ ret += zlen;
}
didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
@@ -878,28 +879,34 @@ again:
i_size = i_size_read(inode);
if (retry_op == READ_INLINE) {
- /* does not support inline data > PAGE_SIZE */
- if (i_size > PAGE_CACHE_SIZE) {
- ret = -EIO;
- } else if (iocb->ki_pos < i_size) {
+ BUG_ON(ret > 0 || read > 0);
+ if (iocb->ki_pos < i_size &&
+ iocb->ki_pos < PAGE_CACHE_SIZE) {
loff_t end = min_t(loff_t, i_size,
iocb->ki_pos + len);
+ end = min_t(loff_t, end, PAGE_CACHE_SIZE);
if (statret < end)
zero_user_segment(page, statret, end);
ret = copy_page_to_iter(page,
iocb->ki_pos & ~PAGE_MASK,
end - iocb->ki_pos, to);
iocb->ki_pos += ret;
- } else {
- ret = 0;
+ read += ret;
+ }
+ if (iocb->ki_pos < i_size && read < len) {
+ size_t zlen = min_t(size_t, len - read,
+ i_size - iocb->ki_pos);
+ ret = iov_iter_zero(zlen, to);
+ iocb->ki_pos += ret;
+ read += ret;
}
__free_pages(page, 0);
- return ret;
+ return read;
}
/* hit EOF or hole? */
if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
- ret < len) {
+ ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos,
inode->i_size);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 6b5173605154..119c43c80638 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent)
inode->i_mode = parent->i_mode;
inode->i_uid = parent->i_uid;
inode->i_gid = parent->i_gid;
- inode->i_op = &ceph_dir_iops;
- inode->i_fop = &ceph_dir_fops;
+ inode->i_op = &ceph_snapdir_iops;
+ inode->i_fop = &ceph_snapdir_fops;
ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
ci->i_rbytes = 0;
return inode;
@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ceph_vinop(inode), inode->i_mode);
}
- /* set dir completion flag? */
- if (S_ISDIR(inode->i_mode) &&
- ci->i_files == 0 && ci->i_subdirs == 0 &&
- ceph_snap(inode) == CEPH_NOSNAP &&
- (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
- (issued & CEPH_CAP_FILE_EXCL) == 0 &&
- !__ceph_dir_is_complete(ci)) {
- dout(" marking %p complete (empty)\n", inode);
- __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
- ci->i_ordered_count);
- }
-
/* were we issued a capability? */
if (info->cap.caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) {
+ unsigned caps = le32_to_cpu(info->cap.caps);
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
- cap_fmode,
- le32_to_cpu(info->cap.caps),
+ cap_fmode, caps,
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
le64_to_cpu(info->cap.realm),
info->cap.flags, &new_cap);
+
+ /* set dir completion flag? */
+ if (S_ISDIR(inode->i_mode) &&
+ ci->i_files == 0 && ci->i_subdirs == 0 &&
+ (caps & CEPH_CAP_FILE_SHARED) &&
+ (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+ !__ceph_dir_is_complete(ci)) {
+ dout(" marking %p complete (empty)\n", inode);
+ __ceph_dir_set_complete(ci,
+ atomic_read(&ci->i_release_count),
+ ci->i_ordered_count);
+ }
+
wake = true;
} else {
dout(" %p got snap_caps %s\n", inode,
@@ -1446,12 +1447,14 @@ retry_lookup:
}
if (!dn->d_inode) {
- dn = splice_dentry(dn, in, NULL);
- if (IS_ERR(dn)) {
- err = PTR_ERR(dn);
+ struct dentry *realdn = splice_dentry(dn, in, NULL);
+ if (IS_ERR(realdn)) {
+ err = PTR_ERR(realdn);
+ d_drop(dn);
dn = NULL;
goto next_item;
}
+ dn = realdn;
}
di = dn->d_