summaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-07-02 11:35:00 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2015-07-02 11:35:00 -0700
commit0c76c6ba246043bbc5c0f9620a0645ae78217421 (patch)
tree644a4db58706c4e97478951f0a3a0087ddf26e5e /fs/ceph
parent8688d9540cc6e17df4cba71615e27f04e0378fe6 (diff)
parent5a60e87603c4c533492c515b7f62578189b03c9c (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "We have a pile of bug fixes from Ilya, including a few patches that sync up the CRUSH code with the latest from userspace. There is also a long series from Zheng that fixes various issues with snapshots, inline data, and directory fsync, some simplification and improvement in the cap release code, and a rework of the caching of directory contents. To top it off there are a few small fixes and cleanups from Benoit and Hong" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits) rbd: use GFP_NOIO in rbd_obj_request_create() crush: fix a bug in tree bucket decode libceph: Fix ceph_tcp_sendpage()'s more boolean usage libceph: Remove spurious kunmap() of the zero page rbd: queue_depth map option rbd: store rbd_options in rbd_device rbd: terminate rbd_opts_tokens with Opt_err ceph: fix ceph_writepages_start() rbd: bump queue_max_segments ceph: rework dcache readdir crush: sync up with userspace crush: fix crash from invalid 'take' argument ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL ceph: pre-allocate data structure that tracks caps flushing ceph: re-send flushing caps (which are revoked) in reconnect stage ceph: send TID of the oldest pending caps flush to MDS ceph: track pending caps flushing globally ceph: track pending caps flushing accurately libceph: fix wrong name "Ceph filesystem for Linux" ceph: fix directory fsync ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/acl.c4
-rw-r--r--fs/ceph/addr.c308
-rw-r--r--fs/ceph/caps.c836
-rw-r--r--fs/ceph/dir.c383
-rw-r--r--fs/ceph/file.c61
-rw-r--r--fs/ceph/inode.c155
-rw-r--r--fs/ceph/mds_client.c425
-rw-r--r--fs/ceph/mds_client.h23
-rw-r--r--fs/ceph/snap.c173
-rw-r--r--fs/ceph/super.c25
-rw-r--r--fs/ceph/super.h125
-rw-r--r--fs/ceph/xattr.c65
12 files changed, 1689 insertions, 894 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 64fa248343f6..8f84646f10e9 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -187,10 +187,10 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
val_size2 = posix_acl_xattr_size(default_acl->a_count);
err = -ENOMEM;
- tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
+ tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
if (!tmp_buf)
goto out_err;
- pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
+ pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL);
if (!pagelist)
goto out_err;
ceph_pagelist_init(pagelist);
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index e162bcd105ee..890c50971a69 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page)
inode = mapping->host;
ci = ceph_inode(inode);
- /*
- * Note that we're grabbing a snapc ref here without holding
- * any locks!
- */
- snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
-
/* dirty the head */
spin_lock(&ci->i_ceph_lock);
- if (ci->i_head_snapc == NULL)
- ci->i_head_snapc = ceph_get_snap_context(snapc);
- ++ci->i_wrbuffer_ref_head;
+ BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
+ if (__ceph_have_pending_cap_snap(ci)) {
+ struct ceph_cap_snap *capsnap =
+ list_last_entry(&ci->i_cap_snaps,
+ struct ceph_cap_snap,
+ ci_item);
+ snapc = ceph_get_snap_context(capsnap->context);
+ capsnap->dirty_pages++;
+ } else {
+ BUG_ON(!ci->i_head_snapc);
+ snapc = ceph_get_snap_context(ci->i_head_snapc);
+ ++ci->i_wrbuffer_ref_head;
+ }
if (ci->i_wrbuffer_ref == 0)
ihold(inode);
++ci->i_wrbuffer_ref;
@@ -346,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
/* build page vector */
nr_pages = calc_pages_for(0, len);
- pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+ pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
ret = -ENOMEM;
if (!pages)
goto out;
@@ -358,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
dout("start_read %p adding %p idx %lu\n", inode, page,
page->index);
if (add_to_page_cache_lru(page, &inode->i_data, page->index,
- GFP_NOFS)) {
+ GFP_KERNEL)) {
ceph_fscache_uncache_page(inode, page);
page_cache_release(page);
dout("start_read %p add_to_page_cache failed %p\n",
@@ -436,7 +440,7 @@ out:
* only snap context we are allowed to write back.
*/
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
- u64 *snap_size)
+ loff_t *snap_size)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc = NULL;
@@ -476,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
struct ceph_osd_client *osdc;
struct ceph_snap_context *snapc, *oldest;
loff_t page_off = page_offset(page);
+ loff_t snap_size = -1;
long writeback_stat;
- u64 truncate_size, snap_size = 0;
+ u64 truncate_size;
u32 truncate_seq;
int err = 0, len = PAGE_CACHE_SIZE;
@@ -512,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
- if (!snap_size)
+ if (snap_size == -1)
snap_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);
@@ -695,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping,
unsigned wsize = 1 << inode->i_blkbits;
struct ceph_osd_request *req = NULL;
int do_sync = 0;
- u64 truncate_size, snap_size;
+ loff_t snap_size, i_size;
+ u64 truncate_size;
u32 truncate_seq;
/*
@@ -741,7 +747,7 @@ static int ceph_writepages_start(struct address_space *mapping,
retry:
/* find oldest snap context with dirty data */
ceph_put_snap_context(snapc);
- snap_size = 0;
+ snap_size = -1;
snapc = get_oldest_context(inode, &snap_size);
if (!snapc) {
/* hmm, why does writepages get called when there
@@ -749,16 +755,13 @@ retry:
dout(" no snap context with dirty data?\n");
goto out;
}
- if (snap_size == 0)
- snap_size = i_size_read(inode);
dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);
spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
- if (!snap_size)
- snap_size = i_size_read(inode);
+ i_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);
if (last_snapc && snapc != last_snapc) {
@@ -828,8 +831,10 @@ get_more_pages:
dout("waiting on writeback %p\n", page);
wait_on_page_writeback(page);
}
- if (page_offset(page) >= snap_size) {
- dout("%p page eof %llu\n", page, snap_size);
+ if (page_offset(page) >=
+ (snap_size == -1 ? i_size : snap_size)) {
+ dout("%p page eof %llu\n", page,
+ (snap_size == -1 ? i_size : snap_size));
done = 1;
unlock_page(page);
break;
@@ -884,7 +889,8 @@ get_more_pages:
}
if (do_sync)
- osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+ osd_req_op_init(req, 1,
+ CEPH_OSD_OP_STARTSYNC, 0);
req->r_callback = writepages_finish;
req->r_inode = inode;
@@ -944,10 +950,18 @@ get_more_pages:
}
/* Format the osd request message and submit the write */
-
offset = page_offset(pages[0]);
- len = min(snap_size - offset,
- (u64)locked_pages << PAGE_CACHE_SHIFT);
+ len = (u64)locked_pages << PAGE_CACHE_SHIFT;
+ if (snap_size == -1) {
+ len = min(len, (u64)i_size_read(inode) - offset);
+ /* writepages_finish() clears writeback pages
+ * according to the data length, so make sure
+ * data length covers all locked pages */
+ len = max(len, 1 +
+ ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
+ } else {
+ len = min(len, snap_size - offset);
+ }
dout("writepages got %d pages at %llu~%llu\n",
locked_pages, offset, len);
@@ -1032,7 +1046,6 @@ static int ceph_update_writeable_page(struct file *file,
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
loff_t page_off = pos & PAGE_CACHE_MASK;
int pos_in_page = pos & ~PAGE_CACHE_MASK;
int end_in_page = pos_in_page + len;
@@ -1044,10 +1057,6 @@ retry_locked:
/* writepages currently holds page lock, but if we change that later, */
wait_on_page_writeback(page);
- /* check snap context */
- BUG_ON(!ci->i_snap_realm);
- down_read(&mdsc->snap_rwsem);
- BUG_ON(!ci->i_snap_realm->cached_context);
snapc = page_snap_context(page);
if (snapc && snapc != ci->i_head_snapc) {
/*
@@ -1055,7 +1064,6 @@ retry_locked:
* context! is it writeable now?
*/
oldest = get_oldest_context(inode, NULL);
- up_read(&mdsc->snap_rwsem);
if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
@@ -1112,7 +1120,6 @@ retry_locked:
}
/* we need to read it. */
- up_read(&mdsc->snap_rwsem);
r = readpage_nounlock(file, page);
if (r < 0)
goto fail_nosnap;
@@ -1157,16 +1164,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
/*
* we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting and drop read lock on
- * mdsc->snap_rwsem.
+ * except adjust dirty page accounting
*/
static int ceph_write_end(struct file *file, struct address_space *mapping,
loff_t pos, unsigned len, unsigned copied,
struct page *page, void *fsdata)
{
struct inode *inode = file_inode(file);
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_mds_client *mdsc = fsc->mdsc;
unsigned from = pos & (PAGE_CACHE_SIZE - 1);
int check_cap = 0;
@@ -1188,7 +1192,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
set_page_dirty(page);
unlock_page(page);
- up_read(&mdsc->snap_rwsem);
page_cache_release(page);
if (check_cap)
@@ -1314,13 +1317,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = file_inode(vma->vm_file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
- struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+ struct ceph_cap_flush *prealloc_cf;
struct page *page = vmf->page;
loff_t off = page_offset(page);
loff_t size = i_size_read(inode);
size_t len;
int want, got, ret;
+ prealloc_cf = ceph_alloc_cap_flush();
+ if (!prealloc_cf)
+ return VM_FAULT_SIGBUS;
+
if (ci->i_inline_version != CEPH_INLINE_NONE) {
struct page *locked_page = NULL;
if (off == 0) {
@@ -1330,8 +1337,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = ceph_uninline_data(vma->vm_file, locked_page);
if (locked_page)
unlock_page(locked_page);
- if (ret < 0)
- return VM_FAULT_SIGBUS;
+ if (ret < 0) {
+ ret = VM_FAULT_SIGBUS;
+ goto out_free;
+ }
}
if (off + PAGE_CACHE_SIZE <= size)
@@ -1353,7 +1362,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
break;
if (ret != -ERESTARTSYS) {
WARN_ON(1);
- return VM_FAULT_SIGBUS;
+ ret = VM_FAULT_SIGBUS;
+ goto out_free;
}
}
dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
@@ -1373,7 +1383,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
if (ret == 0) {
/* success. we'll keep the page locked. */
set_page_dirty(page);
- up_read(&mdsc->snap_rwsem);
ret = VM_FAULT_LOCKED;
} else {
if (ret == -ENOMEM)
@@ -1389,7 +1398,8 @@ out:
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
- dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+ dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+ &prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
@@ -1398,6 +1408,8 @@ out:
dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
inode, off, len, ceph_cap_string(got), ret);
ceph_put_cap_refs(ci, got);
+out_free:
+ ceph_free_cap_flush(prealloc_cf);
return ret;
}
@@ -1509,8 +1521,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_vino(inode), 0, &len, 0, 1,
CEPH_OSD_OP_CREATE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
- ci->i_snap_realm->cached_context,
- 0, 0, false);
+ ceph_empty_snapc, 0, 0, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
@@ -1528,7 +1539,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_vino(inode), 0, &len, 1, 3,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
- ci->i_snap_realm->cached_context,
+ ceph_empty_snapc,
ci->i_truncate_seq, ci->i_truncate_size,
false);
if (IS_ERR(req)) {
@@ -1597,3 +1608,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)
vma->vm_ops = &ceph_vmops;
return 0;
}
+
+enum {
+ POOL_READ = 1,
+ POOL_WRITE = 2,
+};
+
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
+{
+ struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+ struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
+ struct rb_node **p, *parent;
+ struct ceph_pool_perm *perm;
+ struct page **pages;
+ int err = 0, err2 = 0, have = 0;
+
+ down_read(&mdsc->pool_perm_rwsem);
+ p = &mdsc->pool_perm_tree.rb_node;
+ while (*p) {
+ perm = rb_entry(*p, struct ceph_pool_perm, node);
+ if (pool < perm->pool)
+ p = &(*p)->rb_left;
+ else if (pool > perm->pool)
+ p = &(*p)->rb_right;
+ else {
+ have = perm->perm;
+ break;
+ }
+ }
+ up_read(&mdsc->pool_perm_rwsem);
+ if (*p)
+ goto out;
+
+ dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);
+
+ down_write(&mdsc->pool_perm_rwsem);
+ parent = NULL;
+ while (*p) {
+ parent = *p;
+ perm = rb_entry(parent, struct ceph_pool_perm, node);
+ if (pool < perm->pool)
+ p = &(*p)->rb_left;
+ else if (pool > perm->pool)
+ p = &(*p)->rb_right;
+ else {
+ have = perm->perm;
+ break;
+ }
+ }
+ if (*p) {
+ up_write(&mdsc->pool_perm_rwsem);
+ goto out;
+ }
+
+ rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
+ ceph_empty_snapc,
+ 1, false, GFP_NOFS);
+ if (!rd_req) {
+ err = -ENOMEM;
+ goto out_unlock;
+ }
+
+ rd_req->r_flags = CEPH_OSD_FLAG_READ;
+ osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
+ rd_req->r_base_oloc.pool = pool;
+ snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name),
+ "%llx.00000000", ci->i_vino.ino);
+ rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
+
+ wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
+ ceph_empty_snapc,
+ 1, false, GFP_NOFS);
+ if (!wr_req) {
+ err = -ENOMEM;
+ goto out_unlock;
+ }
+
+ wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
+ wr_req->r_base_oloc.pool = pool;
+ wr_req->r_base_oid = rd_req->r_base_oid;
+
+ /* one page should be large enough for STAT data */
+ pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+ if (IS_ERR(pages)) {
+ err = PTR_ERR(pages);
+ goto out_unlock;
+ }
+
+ osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
+ 0, false, true);
+ ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
+ &ci->vfs_inode.i_mtime);
+ err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
+
+ ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
+ &ci->vfs_inode.i_mtime);
+ err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
+
+ if (!err)
+ err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
+ if (!err2)
+ err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
+
+ if (err >= 0 || err == -ENOENT)
+ have |= POOL_READ;
+ else if (err != -EPERM)
+ goto out_unlock;
+
+ if (err2 == 0 || err2 == -EEXIST)
+ have |= POOL_WRITE;
+ else if (err2 != -EPERM) {
+ err = err2;
+ goto out_unlock;
+ }
+
+ perm = kmalloc(sizeof(*perm), GFP_NOFS);
+ if (!perm) {
+ err = -ENOMEM;
+ goto out_unlock;
+ }
+
+ perm->pool = pool;
+ perm->perm = have;
+ rb_link_node(&perm->node, parent, p);
+ rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
+ err = 0;
+out_unlock:
+ up_write(&mdsc->pool_perm_rwsem);
+
+ if (rd_req)
+ ceph_osdc_put_request(rd_req);
+ if (wr_req)
+ ceph_osdc_put_request(wr_req);
+out:
+ if (!err)
+ err = have;
+ dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
+ return err;
+}
+
+int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
+{
+ u32 pool;
+ int ret, flags;
+
+ if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
+ NOPOOLPERM))
+ return 0;
+
+ spin_lock(&ci->i_ceph_lock);
+ flags = ci->i_ceph_flags;
+ pool = ceph_file_layout_pg_pool(ci->i_layout);
+ spin_unlock(&ci->i_ceph_lock);
+check:
+ if (flags & CEPH_I_POOL_PERM) {
+ if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
+ dout("ceph_pool_perm_check pool %u no read perm\n",
+ pool);
+ return -EPERM;
+ }
+ if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
+ dout("ceph_pool_perm_check pool %u no write perm\n",
+ pool);
+ return -EPERM;
+ }
+ return 0;
+ }
+
+ ret = __ceph_pool_perm_get(ci, pool);
+ if (ret < 0)
+ return ret;
+
+ flags = CEPH_I_POOL_PERM;
+ if (ret & POOL_READ)
+ flags |= CEPH_I_POOL_RD;
+ if (ret & POOL_WRITE)
+ flags |= CEPH_I_POOL_WR;
+
+ spin_lock(&ci->i_ceph_lock);
+ if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
+ ci->i_ceph_flags = flags;
+ } else {
+ pool = ceph_file_layout_pg_pool(ci->i_layout);
+ flags = ci->i_ceph_flags;
+ }
+ spin_unlock(&ci->i_ceph_lock);
+ goto check;
+}
+
+void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
+{
+ struct ceph_pool_perm *perm;
+ struct rb_node *n;
+
+ while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
+ n = rb_first(&mdsc->pool_perm_tree);
+ perm = rb_entry(n, struct ceph_pool_perm, node);
+ rb_erase(n, &mdsc->pool_perm_tree);
+ kfree(perm);
+ }
+}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index be5ea6af8366..dc10c9dd36c1 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
used |= CEPH_CAP_PIN;
if (ci->i_rd_ref)
used |= CEPH_CAP_FILE_RD;
- if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
+ if (ci->i_rdcache_ref ||
+ (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+ ci->vfs_inode.i_data.nrpages))
used |= CEPH_CAP_FILE_CACHE;
if (ci->i_wr_ref)
used |= CEPH_CAP_FILE_WR;
@@ -926,16 +928,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
/* remove from session list */
spin_lock(&session->s_cap_lock);
- /*
- * s_cap_reconnect is protected by s_cap_lock. no one changes
- * s_cap_gen while session is in the reconnect state.
- */
- if (queue_release &&
- (!session->s_cap_reconnect ||
- cap->cap_gen == session->s_cap_gen))
- __queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
- cap->mseq, cap->issue_seq);
-
if (session->s_cap_iterator == cap) {
/* not yet, we are iterating over this very cap */
dout("__ceph_remove_cap delaying %p removal from session %p\n",
@@ -948,6 +940,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
}
/* protect backpointer with s_cap_lock: see iterate_session_caps */
cap->ci = NULL;
+
+ /*
+ * s_cap_reconnect is protected by s_cap_lock. no one changes
+ * s_cap_gen while session is in the reconnect state.
+ */
+ if (queue_release &&
+ (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
+ cap->queue_release = 1;
+ if (removed) {
+ list_add_tail(&cap->session_caps,
+ &session->s_cap_releases);
+ session->s_num_cap_releases++;
+ removed = 0;
+ }
+ } else {
+ cap->queue_release = 0;
+ }
+ cap->cap_ino = ci->i_vino.ino;
+
spin_unlock(&session->s_cap_lock);
/* remove from inode list */
@@ -977,8 +988,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
static int send_cap_msg(struct ceph_mds_session *session,
u64 ino, u64 cid, int op,
int caps, int wanted, int dirty,
- u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq,
- u64 size, u64 max_size,
+ u32 seq, u64 flush_tid, u64 oldest_flush_tid,
+ u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
u64 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
@@ -992,20 +1003,23 @@ static int send_cap_msg(struct ceph_mds_session *session,
size_t extra_len;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
- " seq %u/%u mseq %u follows %lld size %llu/%llu"
+ " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
" xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
ceph_cap_string(dirty),
- seq, issue_seq, mseq, follows, size, max_size,
+ seq, issue_seq, flush_tid, oldest_flush_tid,
+ mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
- /* flock buffer size + inline version + inline data size */
- extra_len = 4 + 8 + 4;
+ /* flock buffer size + inline version + inline data size +
+ * osd_epoch_barrier + oldest_flush_tid */
+ extra_len = 4 + 8 + 4 + 4 + 8;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;
+ msg->hdr.version = cpu_to_le16(6);
msg->hdr.tid = cpu_to_le64(flush_tid);
fc = msg->front.iov_base;
@@ -1041,6 +1055,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
+ /* osd_epoch_barrier */
+ ceph_encode_32(&p, 0);
+ /* oldest_flush_tid */
+ ceph_encode_64(&p, oldest_flush_tid);
fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) {
@@ -1053,44 +1071,6 @@ static int send_cap_msg(struct ceph_mds_session *session,
return 0;
}
-void __queue_cap_release(struct ceph_mds_session *session,
- u64 ino, u64 cap_id, u32 migrate_seq,
- u32 issue_seq)
-{
- struct ceph_msg *msg;
- struct ceph_mds_cap_release *head;
- struct ceph_mds_cap_item *item;
-
- BUG_ON(!session->s_num_cap_releases);
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
-
- dout(" adding %llx release to mds%d msg %p (%d left)\n",
- ino, session->s_mds, msg, session->s_num_cap_releases);
-
- BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
- head = msg->front.iov_base;
- le32_add_cpu(&head->num, 1);
- item = msg->front.iov_base + msg->front.iov_len;
- item->ino = cpu_to_le64(ino);
- item->cap_id = cpu_to_le64(cap_id);
- item->migrate_seq = cpu_to_le32(migrate_seq);
- item->seq = cpu_to_le32(issue_seq);
-
- session->s_num_cap_releases--;
-
- msg->front.iov_len += sizeof(*item);
- if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
- dout(" release msg %p full\n", msg);
- list_move_tail(&msg->list_head, &session->s_cap_releases_done);
- } else {
- dout(" release msg %p at %d/%d (%d)\n", msg,
- (int)le32_to_cpu(head->num),
- (int)CEPH_CAPS_PER_RELEASE,
- (int)msg->front.iov_len);
- }
-}
-
/*
* Queue cap releases when an inode is dropped from our cache. Since
* inode is about to be destroyed, there is no need for i_ceph_lock.
@@ -1127,7 +1107,7 @@ void ceph_queue_caps_release(struct inode *inode)
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int op, int used, int want, int retain, int flushing,
- unsigned *pflush_tid)
+ u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
@@ -1145,8 +1125,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
u64 xattr_version = 0;
struct ceph_buffer *xattr_blob = NULL;
int delayed = 0;
- u64 flush_tid = 0;
- int i;
int ret;
bool inline_data;
@@ -1190,26 +1168,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
- if (flushing) {
- /*
- * assign a tid for flush operations so we can avoid
- * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
- * clean type races. track latest tid for every bit
- * so we can handle flush AxFw, flush Fw, and have the
- * first ack clean Ax.
- */
- flush_tid = ++ci->i_cap_flush_last_tid;
- if (pflush_tid)
- *pflush_tid = flush_tid;
- dout(" cap_flush_tid %d\n", (int)flush_tid);
- for (i = 0; i < CEPH_CAP_BITS; i++)
- if (flushing & (1 << i))
- ci->i_cap_flush_tid[i] = flush_tid;
-
- follows = ci->i_head_snapc->seq;
- } else {
- follows = 0;
- }
+ follows = flushing ? ci->i_head_snapc->seq : 0;
keep = cap->implemented;
seq = cap->seq;
@@ -1237,7 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
- op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
+ op, keep, want, flushing, seq,
+ flush_tid, oldest_flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows, inline_data);
@@ -1259,14 +1219,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
* asynchronously back to the MDS once sync writes complete and dirty
* data is written out.
*
- * Unless @again is true, skip cap_snaps that were already sent to
+ * Unless @kick is true, skip cap_snaps that were already sent to
* the MDS (i.e., during this session).
*
* Called under i_ceph_lock. Takes s_mutex as needed.
*/
void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession,
- int again)
+ int kick)
__releases(ci->i_ceph_lock)
__acquires(ci->i_ceph_lock)
{
@@ -1297,11 +1257,8 @@ retry:
if (capsnap->dirty_pages || capsnap->writing)
break;
- /*
- * if cap writeback already occurred, we should have dropped
- * the capsnap in ceph_put_wrbuffer_cap_refs.
- */
- BUG_ON(capsnap->dirty == 0);
+ /* should be removed by ceph_try_drop_cap_snap() */
+ BUG_ON(!capsnap->need_flush);
/* pick mds, take s_mutex */
if (ci->i_auth_cap == NULL) {
@@ -1310,7 +1267,7 @@ retry:
}
/* only flush each capsnap once */
- if (!again && !list_empty(&capsnap->flushing_item)) {
+ if (!kick && !list_empty(&capsnap->flushing_item)) {
dout("already flushed %p, skipping\n", capsnap);
continue;
}
@@ -1320,6 +1277,9 @@ retry:
if (session && session->s_mds != mds) {
dout("oops, wrong session %p mutex\n", session);
+ if (kick)
+ goto out;
+
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
session = NULL;
@@ -1343,20 +1303,22 @@ retry:
goto retry;
}
- capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
+ spin_lock(&mdsc->cap_dirty_lock);
+ capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+ spin_unlock(&mdsc->cap_dirty_lock);
+
atomic_inc(&capsnap->nref);
- if (!list_empty(&capsnap->flushing_item))
- list_del_init(&capsnap->flushing_item);
- list_add_tail(&capsnap->flushing_item,
- &session->s_cap_snaps_flushing);
+ if (list_empty(&capsnap->flushing_item))
+ list_add_tail(&capsnap->flushing_item,
+ &session->s_cap_snaps_flushing);
spin_unlock(&ci->i_ceph_lock);
dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
inode, capsnap, capsnap->follows, capsnap->flush_tid);
send_cap_msg(session, ceph_vino(inode).ino, 0,
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
- capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
- capsnap->size, 0,
+ capsnap->dirty, 0, capsnap->flush_tid, 0,
+ 0, mseq, capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
@@ -1396,7 +1358,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
* Caller is then responsible for calling __mark_inode_dirty with the
* returned flags value.
*/
-int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
+int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
+ struct ceph_cap_flush **pcf)
{
struct ceph_mds_client *mdsc =
ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
@@ -1416,9 +1379,14 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
ceph_cap_string(was | mask));
ci->i_dirty_caps |= mask;
if (was == 0) {
- if (!ci->i_head_snapc)
+ WARN_ON_ONCE(ci->i_prealloc_cap_flush);
+ swap(ci->i_prealloc_cap_flush, *pcf);
+
+ if (!ci->i_head_snapc) {
+ WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
ci->i_head_snapc = ceph_get_snap_context(
ci->i_snap_realm->cached_context);
+ }
dout(" inode %p now dirty snapc %p auth cap %p\n",
&ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
BUG_ON(!list_empty(&ci->i_dirty_item));
@@ -1429,6 +1397,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
ihold(inode);
dirty |= I_DIRTY_SYNC;
}
+ } else {
+ WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
}
BUG_ON(list_empty(&ci->i_dirty_item));
if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
@@ -1438,6 +1408,74 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
return dirty;
}
+static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
+ struct ceph_cap_flush *cf)
+{
+ struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_cap_flush *other = NULL;
+
+ while (*p) {
+ parent = *p;
+ other = rb_entry(parent, struct ceph_cap_flush, i_node);
+
+ if (cf->tid < other->tid)
+ p = &(*p)->rb_left;
+ else if (cf->tid > other->tid)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ rb_link_node(&cf->i_node, parent, p);
+ rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
+}
+
+static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
+ struct ceph_cap_flush *cf)
+{
+ struct rb_node **p = &mdsc->cap_flush_tree.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_cap_flush *other = NULL;
+
+ while (*p) {
+ parent = *p;
+ other = rb_entry(parent, struct ceph_cap_flush, g_node);
+
+ if (cf->tid < other->tid)
+ p = &(*p)->rb_left;
+ else if (cf->tid > other->tid)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ rb_link_node(&cf->g_node, parent, p);
+ rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
+}
+
+struct ceph_cap_flush *ceph_alloc_cap_flush(void)
+{
+ return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+}
+
+void ceph_free_cap_flush(struct ceph_cap_flush *cf)
+{
+ if (cf)
+ kmem_cache_free(ceph_cap_flush_cachep, cf);
+}
+
+static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
+{
+ struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
+ if (n) {
+ struct ceph_cap_flush *cf =
+ rb_entry(n, struct ceph_cap_flush, g_node);
+ return cf->tid;
+ }
+ return 0;
+}
+
/*
* Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving.
@@ -1445,14 +1483,17 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
* Called under i_ceph_lock.
*/
static int __mark_caps_flushing(struct inode *inode,
- struct ceph_mds_session *session)
+ struct ceph_mds_session *session,
+