diff options
-rw-r--r-- | MAINTAINERS | 7 | ||||
-rw-r--r-- | drivers/block/rbd.c | 193 | ||||
-rw-r--r-- | fs/ceph/acl.c | 14 | ||||
-rw-r--r-- | fs/ceph/addr.c | 19 | ||||
-rw-r--r-- | fs/ceph/caps.c | 127 | ||||
-rw-r--r-- | fs/ceph/dir.c | 33 | ||||
-rw-r--r-- | fs/ceph/file.c | 37 | ||||
-rw-r--r-- | fs/ceph/inode.c | 41 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 127 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 2 | ||||
-rw-r--r-- | fs/ceph/snap.c | 54 | ||||
-rw-r--r-- | fs/ceph/super.c | 4 | ||||
-rw-r--r-- | fs/ceph/super.h | 5 | ||||
-rw-r--r-- | include/linux/ceph/ceph_fs.h | 37 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 3 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 4 | ||||
-rw-r--r-- | include/linux/ceph/mon_client.h | 9 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 16 | ||||
-rw-r--r-- | net/ceph/ceph_strings.c | 14 | ||||
-rw-r--r-- | net/ceph/debugfs.c | 2 | ||||
-rw-r--r-- | net/ceph/messenger.c | 14 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 139 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 31 |
23 files changed, 444 insertions, 488 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index 1921ed58d1a0..7cfcee4e2bea 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -2433,7 +2433,8 @@ F: arch/powerpc/oprofile/*cell* F: arch/powerpc/platforms/cell/ CEPH DISTRIBUTED FILE SYSTEM CLIENT -M: Sage Weil <sage@inktank.com> +M: Yan, Zheng <zyan@redhat.com> +M: Sage Weil <sage@redhat.com> L: ceph-devel@vger.kernel.org W: http://ceph.com/ T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git @@ -7998,8 +7999,8 @@ S: Supported F: drivers/net/wireless/ath/wcn36xx/ RADOS BLOCK DEVICE (RBD) -M: Yehuda Sadeh <yehuda@inktank.com> -M: Sage Weil <sage@inktank.com> +M: Ilya Dryomov <idryomov@gmail.com> +M: Sage Weil <sage@redhat.com> M: Alex Elder <elder@kernel.org> M: ceph-devel@vger.kernel.org W: http://ceph.com/ diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8a86b62466f7..b40af3203089 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -38,6 +38,7 @@ #include <linux/kernel.h> #include <linux/device.h> #include <linux/module.h> +#include <linux/blk-mq.h> #include <linux/fs.h> #include <linux/blkdev.h> #include <linux/slab.h> @@ -340,9 +341,7 @@ struct rbd_device { char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ - struct list_head rq_queue; /* incoming rq queue */ spinlock_t lock; /* queue, flags, open_count */ - struct work_struct rq_work; struct rbd_image_header header; unsigned long flags; /* possibly lock protected */ @@ -360,6 +359,9 @@ struct rbd_device { atomic_t parent_ref; struct rbd_device *parent; + /* Block layer tags. */ + struct blk_mq_tag_set tag_set; + /* protects updating the header */ struct rw_semaphore header_rwsem; @@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, /* * We support a 64-bit length, but ultimately it has to be - * passed to blk_end_request(), which takes an unsigned int. + * passed to the block layer, which just supports a 32-bit + * length field. */ obj_request->xferred = osd_req->r_reply_op_len[0]; rbd_assert(obj_request->xferred < (u64)UINT_MAX); @@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) more = obj_request->which < img_request->obj_request_count - 1; } else { rbd_assert(img_request->rq != NULL); - more = blk_end_request(img_request->rq, result, xferred); + + more = blk_update_request(img_request->rq, result, xferred); + if (!more) + __blk_mq_end_request(img_request->rq, result); } return more; @@ -3304,8 +3310,10 @@ out: return ret; } -static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) +static void rbd_queue_workfn(struct work_struct *work) { + struct request *rq = blk_mq_rq_from_pdu(work); + struct rbd_device *rbd_dev = rq->q->queuedata; struct rbd_img_request *img_request; struct ceph_snap_context *snapc = NULL; u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; @@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) u64 mapping_size; int result; + if (rq->cmd_type != REQ_TYPE_FS) { + dout("%s: non-fs request type %d\n", __func__, + (int) rq->cmd_type); + result = -EIO; + goto err; + } + if (rq->cmd_flags & REQ_DISCARD) op_type = OBJ_OP_DISCARD; else if (rq->cmd_flags & REQ_WRITE) @@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) goto err_rq; /* Shouldn't happen */ } + blk_mq_start_request(rq); + down_read(&rbd_dev->header_rwsem); mapping_size = rbd_dev->mapping.size; if (op_type != OBJ_OP_READ) { @@ -3404,53 +3421,18 @@ err_rq: rbd_warn(rbd_dev, "%s %llx at %llx result %d", obj_op_name(op_type), length, offset, result); ceph_put_snap_context(snapc); - blk_end_request_all(rq, result); +err: + blk_mq_end_request(rq, result); } -static void rbd_request_workfn(struct work_struct *work) +static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx, + const struct blk_mq_queue_data *bd) { - struct rbd_device *rbd_dev = - container_of(work, struct rbd_device, rq_work); - struct request *rq, *next; - LIST_HEAD(requests); - - spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */ - list_splice_init(&rbd_dev->rq_queue, &requests); - spin_unlock_irq(&rbd_dev->lock); + struct request *rq = bd->rq; + struct work_struct *work = blk_mq_rq_to_pdu(rq); - list_for_each_entry_safe(rq, next, &requests, queuelist) { - list_del_init(&rq->queuelist); - rbd_handle_request(rbd_dev, rq); - } -} - -/* - * Called with q->queue_lock held and interrupts disabled, possibly on - * the way to schedule(). Do not sleep here! - */ -static void rbd_request_fn(struct request_queue *q) -{ - struct rbd_device *rbd_dev = q->queuedata; - struct request *rq; - int queued = 0; - - rbd_assert(rbd_dev); - - while ((rq = blk_fetch_request(q))) { - /* Ignore any non-FS requests that filter through. */ - if (rq->cmd_type != REQ_TYPE_FS) { - dout("%s: non-fs request type %d\n", __func__, - (int) rq->cmd_type); - __blk_end_request_all(rq, 0); - continue; - } - - list_add_tail(&rq->queuelist, &rbd_dev->rq_queue); - queued++; - } - - if (queued) - queue_work(rbd_wq, &rbd_dev->rq_work); + queue_work(rbd_wq, work); + return BLK_MQ_RQ_QUEUE_OK; } /* @@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) del_gendisk(disk); if (disk->queue) blk_cleanup_queue(disk->queue); + blk_mq_free_tag_set(&rbd_dev->tag_set); } put_disk(disk); } @@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) ret = rbd_dev_header_info(rbd_dev); if (ret) - return ret; + goto out; /* * If there is a parent, see if it has disappeared due to the @@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) if (rbd_dev->parent) { ret = rbd_dev_v2_parent_info(rbd_dev); if (ret) - return ret; + goto out; } if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { - if (rbd_dev->mapping.size != rbd_dev->header.image_size) - rbd_dev->mapping.size = rbd_dev->header.image_size; + rbd_dev->mapping.size = rbd_dev->header.image_size; } else { /* validate mapped snapshot's EXISTS flag */ rbd_exists_validate(rbd_dev); } +out: up_write(&rbd_dev->header_rwsem); - - if (mapping_size != rbd_dev->mapping.size) + if (!ret && mapping_size != rbd_dev->mapping.size) rbd_dev_update_size(rbd_dev); + return ret; +} + +static int rbd_init_request(void *data, struct request *rq, + unsigned int hctx_idx, unsigned int request_idx, + unsigned int numa_node) +{ + struct work_struct *work = blk_mq_rq_to_pdu(rq); + + INIT_WORK(work, rbd_queue_workfn); return 0; } +static struct blk_mq_ops rbd_mq_ops = { + .queue_rq = rbd_queue_rq, + .map_queue = blk_mq_map_queue, + .init_request = rbd_init_request, +}; + static int rbd_init_disk(struct rbd_device *rbd_dev) { struct gendisk *disk; struct request_queue *q; u64 segment_size; + int err; /* create gendisk info */ disk = alloc_disk(single_major ? @@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; - q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); - if (!q) + memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); + rbd_dev->tag_set.ops = &rbd_mq_ops; + rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ; + rbd_dev->tag_set.numa_node = NUMA_NO_NODE; + rbd_dev->tag_set.flags = + BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; + rbd_dev->tag_set.nr_hw_queues = 1; + rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); + + err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); + if (err) goto out_disk; + q = blk_mq_init_queue(&rbd_dev->tag_set); + if (IS_ERR(q)) { + err = PTR_ERR(q); + goto out_tag_set; + } + /* We use the default size, but let's be explicit about it. */ blk_queue_physical_block_size(q, SECTOR_SIZE); @@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) rbd_dev->disk = disk; return 0; +out_tag_set: + blk_mq_free_tag_set(&rbd_dev->tag_set); out_disk: put_disk(disk); - - return -ENOMEM; + return err; } /* @@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, return NULL; spin_lock_init(&rbd_dev->lock); - INIT_LIST_HEAD(&rbd_dev->rq_queue); - INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn); rbd_dev->flags = 0; atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); @@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) } /* - * We always update the parent overlap. If it's zero we - * treat it specially. + * We always update the parent overlap. If it's zero we issue + * a warning, as we will proceed as if there was no parent. */ - rbd_dev->parent_overlap = overlap; if (!overlap) { - - /* A null parent_spec indicates it's the initial probe */ - if (parent_spec) { - /* - * The overlap has become zero, so the clone - * must have been resized down to 0 at some - * point. Treat this the same as a flatten. - */ - rbd_dev_parent_put(rbd_dev); - pr_info("%s: clone image now standalone\n", - rbd_dev->disk->disk_name); + /* refresh, careful to warn just once */ + if (rbd_dev->parent_overlap) + rbd_warn(rbd_dev, + "clone now standalone (overlap became 0)"); } else { - /* - * For the initial probe, if we find the - * overlap is zero we just pretend there was - * no parent image. - */ - rbd_warn(rbd_dev, "ignoring parent with overlap 0"); + /* initial probe */ + rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); } } + rbd_dev->parent_overlap = overlap; + out: ret = 0; out_err: @@ -4771,36 +4774,6 @@ static inline size_t next_token(const char **buf) } /* - * Finds the next token in *buf, and if the provided token buffer is - * big enough, copies the found token into it. The result, if - * copied, is guaranteed to be terminated with '\0'. Note that *buf - * must be terminated with '\0' on entry. - * - * Returns the length of the token found (not including the '\0'). - * Return value will be 0 if no token is found, and it will be >= - * token_size if the token would not fit. - * - * The *buf pointer will be updated to point beyond the end of the - * found token. Note that this occurs even if the token buffer is - * too small to hold it. - */ -static inline size_t copy_token(const char **buf, - char *token, - size_t token_size) -{ - size_t len; - - len = next_token(buf); - if (len < token_size) { - memcpy(token, *buf, len); - *(token + len) = '\0'; - } - *buf += len; - - return len; -} - -/* * Finds the next token in *buf, dynamically allocates a buffer big * enough to hold a copy of it, and copies the token into the new * buffer. The copy is guaranteed to be terminated with '\0'. Note diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 5bd853ba44ff..64fa248343f6 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode, spin_unlock(&ci->i_ceph_lock); } -static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode, - int type) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct posix_acl *acl = ACL_NOT_CACHED; - - spin_lock(&ci->i_ceph_lock); - if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) - acl = get_cached_acl(inode, type); - spin_unlock(&ci->i_ceph_lock); - - return acl; -} - struct posix_acl *ceph_get_acl(struct inode *inode, int type) { int size; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 24be059fd1f8..fd5599d32362 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page) u64 len = PAGE_CACHE_SIZE; if (off >= i_size_read(inode)) { - zero_user_segment(page, err, PAGE_CACHE_SIZE); + zero_user_segment(page, 0, PAGE_CACHE_SIZE); SetPageUptodate(page); return 0; } - /* - * Uptodate inline data should have been added into page cache - * while getting Fcr caps. - */ - if (ci->i_inline_version != CEPH_INLINE_NONE) - return -EINVAL; + if (ci->i_inline_version != CEPH_INLINE_NONE) { + /* + * Uptodate inline data should have been added + * into page cache while getting Fcr caps. + */ + if (off == 0) + return -EINVAL; + zero_user_segment(page, 0, PAGE_CACHE_SIZE); + SetPageUptodate(page); + return 0; + } err = ceph_readpage_from_fscache(inode, page); if (err == 0) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b93c631c6c87..8172775428a0 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode, struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, realmino); if (realm) { - ceph_get_snap_realm(mdsc, realm); spin_lock(&realm->inodes_with_caps_lock); ci->i_snap_realm = realm; list_add(&ci->i_snap_realm_item, @@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode, spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); - ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; if (list_empty(&ci->i_flushing_item)) { + ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; dout(" inode %p now flushing seq %lld\n", inode, @@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) * requested from the MDS. */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - loff_t endoff, int *got, struct page **pinned_page, - int *check_max, int *err) + loff_t endoff, int *got, int *check_max, int *err) { struct inode *inode = &ci->vfs_inode; int ret = 0; - int have, implemented, _got = 0; + int have, implemented; int file_wanted; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); -again: + spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ @@ -2138,50 +2136,34 @@ again: inode, ceph_cap_string(have), ceph_cap_string(not), ceph_cap_string(revoking)); if ((revoking & not) == 0) { - _got = need | (have & want); - __take_cap_refs(ci, _got); + *got = need | (have & want); + __take_cap_refs(ci, *got); ret = 1; } } else { + int session_readonly = false; + if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) { + struct ceph_mds_session *s = ci->i_auth_cap->session; + spin_lock(&s->s_cap_lock); + session_readonly = s->s_readonly; + spin_unlock(&s->s_cap_lock); + } + if (session_readonly) { + dout("get_cap_refs %p needed %s but mds%d readonly\n", + inode, ceph_cap_string(need), ci->i_auth_cap->mds); + *err = -EROFS; + ret = 1; + goto out_unlock; + } + dout("get_cap_refs %p have %s needed %s\n", inode, ceph_cap_string(have), ceph_cap_string(need)); } out_unlock: spin_unlock(&ci->i_ceph_lock); - if (ci->i_inline_version != CEPH_INLINE_NONE && - (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - i_size_read(inode) > 0) { - int ret1; - struct page *page = find_get_page(inode->i_mapping, 0); - if (page) { - if (PageUptodate(page)) { - *pinned_page = page; - goto out; - } - page_cache_release(page); - } - /* - * drop cap refs first because getattr while holding - * caps refs can cause deadlock. - */ - ceph_put_cap_refs(ci, _got); - _got = 0; - - /* getattr request will bring inline data into page cache */ - ret1 = __ceph_do_getattr(inode, NULL, - CEPH_STAT_CAP_INLINE_DATA, true); - if (ret1 >= 0) { - ret = 0; - goto again; - } - *err = ret1; - ret = 1; - } -out: dout("get_cap_refs %p ret %d got %s\n", inode, - ret, ceph_cap_string(_got)); - *got = _got; + ret, ceph_cap_string(*got)); return ret; } @@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff) int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page) { - int check_max, ret, err; + int _got, check_max, ret, err = 0; retry: if (endoff > 0) check_max_size(&ci->vfs_inode, endoff); + _got = 0; check_max = 0; - err = 0; ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - got, pinned_page, - &check_max, &err)); + try_get_cap_refs(ci, need, want, endoff, + &_got, &check_max, &err)); if (err) ret = err; + if (ret < 0) + return ret; + if (check_max) goto retry; - return ret; + + if (ci->i_inline_version != CEPH_INLINE_NONE && + (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + i_size_read(&ci->vfs_inode) > 0) { + struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + *pinned_page = page; + goto out; + } + page_cache_release(page); + } + /* + * drop cap refs first because getattr while holding + * caps refs can cause deadlock. + */ + ceph_put_cap_refs(ci, _got); + _got = 0; + + /* getattr request will bring inline data into page cache */ + ret = __ceph_do_getattr(&ci->vfs_inode, NULL, + CEPH_STAT_CAP_INLINE_DATA, true); + if (ret < 0) + return ret; + goto retry; + } +out: + *got = _got; + return 0; } /* @@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode) */ static void handle_cap_grant(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *grant, - void *snaptrace, int snaptrace_len, u64 inline_version, void *inline_data, int inline_len, struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, struct ceph_cap *cap, int issued) __releases(ci->i_ceph_lock) + __releases(mdsc->snap_rwsem) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, spin_unlock(&ci->i_ceph_lock); if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { - down_write(&mdsc->snap_rwsem); - ceph_update_snap_trace(mdsc, snaptrace, - snaptrace + snaptrace_len, false); - downgrade_write(&mdsc->snap_rwsem); kick_flushing_inode_caps(mdsc, session, inode); up_read(&mdsc->snap_rwsem); if (newcaps & ~issued) @@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_cap *cap; struct ceph_mds_caps *h; struct ceph_mds_cap_peer *peer = NULL; + struct ceph_snap_realm *realm; int mds = session->s_mds; int op, issued; u32 seq, mseq; @@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done_unlocked; case CEPH_CAP_OP_IMPORT: + realm = NULL; + if (snaptrace_len) { + down_write(&mdsc->snap_rwsem); + ceph_update_snap_trace(mdsc, snaptrace, + snaptrace + snaptrace_len, + false, &realm); + downgrade_write(&mdsc->snap_rwsem); + } else { + down_read(&mdsc->snap_rwsem); + } handle_cap_import(mdsc, inode, h, peer, session, &cap, &issued); - handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, + handle_cap_grant(mdsc, inode, h, inline_version, inline_data, inline_len, msg->middle, session, cap, issued); + if (realm) + ceph_put_snap_realm(mdsc, realm); goto done_unlocked; } @@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_GRANT: __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, NULL, 0, + handle_cap_grant(mdsc, inode, h, inline_version, inline_data, inline_len, msg->middle, session, cap, issued); goto done_unlocked; diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index c241603764fd..0411dbb15815 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -26,8 +26,6 @@ * point by name. */ -const struct inode_operations ceph_dir_iops; -const struct file_operations ceph_dir_fops; const struct dentry_operations ceph_dentry_ops; /* @@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) /* * We created the item, then did a lookup, and found * it was already linked to another inode we already - * had in our cache (and thus got spliced). Link our - * dentry to that inode, but don't hash it, just in - * case the VFS wants to dereference it. + * had in our cache (and thus got spliced). To not + * confuse VFS (especially when inode is a directory), + * we don't link our dentry to that inode, return an + * error instead. + * + * This event should be rare and it happens only when + * we talk to old MDS. Recent MDS does not send traceless + * reply for request that creates new inode. */ - BUG_ON(!result->d_inode); - d_instantiate(dentry, result->d_inode); - return 0; + d_drop(result); + return -ESTALE; } return PTR_ERR(result); } @@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = { .fsync = ceph_dir_fsync, }; +const struct file_operations ceph_snapdir_fops = { + .iterate = ceph_readdir, + .llseek = ceph_dir_llseek, + .open = ceph_open, + .release = ceph_release, +}; + const struct inode_operations ceph_dir_iops = { .lookup = ceph_lookup, .permission = ceph_permission, @@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = { .atomic_open = ceph_atomic_open, }; +const struct inode_operations ceph_snapdir_iops = { + .lookup = ceph_lookup, + .permission = ceph_permission, + .getattr = ceph_getattr, + .mkdir = ceph_mkdir, + .rmdir = ceph_unlink, +}; + const struct dentry_operations ceph_dentry_ops = { .d_revalidate = ceph_d_revalidate, .d_release = ceph_d_release, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 905986dd4c3c..a3d774b35149 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); + err = ceph_handle_snapdir(req, dentry, err); if (err) goto out_req; - err = ceph_handle_snapdir(req, dentry, err); if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -392,13 +392,14 @@ more: if (ret >= 0) { int didpages; if (was_short && (pos + ret < inode->i_size)) { - u64 tmp = min(this_len - ret, - inode->i_size - pos - ret); + int zlen = min(this_len - ret, + inode->i_size - pos - ret); + int zoff = (o_direct ? buf_align : io_align) + + read + ret; dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + tmp); - ceph_zero_page_vector_range(page_align + read + ret, - tmp, pages); - ret += tmp; + pos + ret, pos + ret + zlen); + ceph_zero_page_vector_range(zoff, zlen, pages); + ret += zlen; } didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; @@ -878,28 +879,34 @@ again: i_size = i_size_read(inode); if (retry_op == READ_INLINE) { - /* does not support inline data > PAGE_SIZE */ - if (i_size > PAGE_CACHE_SIZE) { - ret = -EIO; - } else if (iocb->ki_pos < i_size) { + BUG_ON(ret > 0 || read > 0); + if (iocb->ki_pos < i_size && + iocb->ki_pos < PAGE_CACHE_SIZE) { loff_t end = min_t(loff_t, i_size, iocb->ki_pos + len); + end = min_t(loff_t, end, PAGE_CACHE_SIZE); if (statret < end) zero_user_segment(page, statret, end); ret = copy_page_to_iter(page, iocb->ki_pos & ~PAGE_MASK, end - iocb->ki_pos, to); iocb->ki_pos += ret; - } else { - ret = 0; + read += ret; + } + if (iocb->ki_pos < i_size && read < len) { + size_t zlen = min_t(size_t, len - read, + i_size - iocb->ki_pos); + ret = iov_iter_zero(zlen, to); + iocb->ki_pos += ret; + read += ret; } __free_pages(page, 0); - return ret; + return read; } /* hit EOF or hole? */ if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && - ret < len) { + ret < len) { dout("sync_read hit hole, ppos %lld < size %lld" ", reading more\n", iocb->ki_pos, inode->i_size); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6b5173605154..119c43c80638 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent) inode->i_mode = parent->i_mode; inode->i_uid = parent->i_uid; inode->i_gid = parent->i_gid; - inode->i_op = &ceph_dir_iops; - inode->i_fop = &ceph_dir_fops; + inode->i_op = &ceph_snapdir_iops; + inode->i_fop = &ceph_snapdir_fops; ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */ ci->i_rbytes = 0; return inode; @@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ceph_vinop(inode), inode->i_mode); } - /* set dir completion flag? */ - if (S_ISDIR(inode->i_mode) && - ci->i_files == 0 && ci->i_subdirs == 0 && - ceph_snap(inode) == CEPH_NOSNAP && - (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && - (issued & CEPH_CAP_FILE_EXCL) == 0 && - !__ceph_dir_is_complete(ci)) { - dout(" marking %p complete (empty)\n", inode); - __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count), - ci->i_ordered_count); - } - /* were we issued a capability? */ if (info->cap.caps) { if (ceph_snap(inode) == CEPH_NOSNAP) { + unsigned caps = le32_to_cpu(info->cap.caps); ceph_add_cap(inode, session, le64_to_cpu(info->cap.cap_id), - cap_fmode, - le32_to_cpu(info->cap.caps), + cap_fmode, caps, le32_to_cpu(info->cap.wanted), le32_to_cpu(info->cap.seq), le32_to_cpu(info->cap.mseq), le64_to_cpu(info->cap.realm), info->cap.flags, &new_cap); + + /* set dir completion flag? */ + if (S_ISDIR(inode->i_mode) && + ci->i_files == 0 && ci->i_subdirs == 0 && + (caps & CEPH_CAP_FILE_SHARED) && + (issued & CEPH_CAP_FILE_EXCL) == 0 && + !__ceph_dir_is_complete(ci)) { + dout(" marking %p complete (empty)\n", inode); + __ceph_dir_set_complete(ci, + atomic_read(&ci->i_release_count), + ci->i_ordered_count); + } + wake = true; } else { dout(" %p got snap_caps %s\n", inode, @@ -1446,12 +1447,14 @@ retry_lookup: } if (!dn->d_inode) { - dn = splice_dentry(dn, in, NULL); - if (IS_ERR(dn)) { - err = PTR_ERR(dn); + struct dentry *realdn = splice_dentry(dn, in, NULL); + if (IS_ERR(realdn)) { + err = PTR_ERR(realdn); + d_drop(dn); dn = NULL; goto next_item; } + dn = realdn; } di = dn->d_fsdata; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 5f62fb7a5d0a..71c073f38e54 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, mdsc->max_sessions = newmax; } mdsc->sessions[mds] = s; + atomic_inc(&mdsc->num_sessions); atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, @@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc, mdsc->sessions[s->s_mds] = NULL; ceph_con_close(&s->s_con); ceph_put_mds_session(s); + atomic_dec(&mdsc->num_sessions); } /* @@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 struct ceph_options *opt = mdsc->fsc->client->options; void *p; - const char* metadata[3][2] = { + const char* metadata[][2] = { {"hostname", utsname()->nodename}, + {"kernel_version", utsname()->release}, {"entity_id", opt->name ? opt->name : ""}, {NULL, NULL} }; @@ -1464,19 +1467,33 @@ out_unlocked: return err; } +static int check_cap_flush(struct inode *inode, u64 want_flush_seq) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + spin_lock(&ci->i_ceph_lock); + if (ci->i_flushing_caps) + ret = ci->i_cap_flush_seq >= want_flush_seq; + else + ret = 1; + spin_unlock(&ci->i_ceph_lock); + return ret; +} + /* * flush all dirty inode data to disk. * * returns true if we've flushed through want_flush_seq */ -static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) +static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) { - int mds, ret = 1; + int mds; dout("check_cap_flush want %lld\n", want_flush_seq); mutex_lock(&mdsc->mutex); - for (mds = 0; ret && mds < mdsc->ma |