diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2013-05-06 13:11:19 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2013-05-06 13:11:19 -0700 |
commit | 91f8575685e35f3bd021286bc82d26397458f5a9 (patch) | |
tree | 09de8d889758a12071adb9427ed741e27c907aa6 /fs | |
parent | 2e378f3eebd28feefbb1f9953834a5a19482f053 (diff) | |
parent | b5b09be30cf99f9c699e825629f02e3bce555d44 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Alex Elder:
"This is a big pull.
Most of it is culmination of Alex's work to implement RBD image
layering, which is now complete (yay!).
There is also some work from Yan to fix i_mutex behavior surrounding
writes in cephfs, a sync write fix, a fix for RBD images that get
resized while they are mapped, and a few patches from me that resolve
annoying auth warnings and fix several bugs in the ceph auth code."
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits)
rbd: fix image request leak on parent read
libceph: use slab cache for osd client requests
libceph: allocate ceph message data with a slab allocator
libceph: allocate ceph messages with a slab allocator
rbd: allocate image object names with a slab allocator
rbd: allocate object requests with a slab allocator
rbd: allocate name separate from obj_request
rbd: allocate image requests with a slab allocator
rbd: use binary search for snapshot lookup
rbd: clear EXISTS flag if mapped snapshot disappears
rbd: kill off the snapshot list
rbd: define rbd_snap_size() and rbd_snap_features()
rbd: use snap_id not index to look up snap info
rbd: look up snapshot name in names buffer
rbd: drop obj_request->version
rbd: drop rbd_obj_method_sync() version parameter
rbd: more version parameter removal
rbd: get rid of some version parameters
rbd: stop tracking header object version
rbd: snap names are pointer to constant data
...
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/addr.c | 222 | ||||
-rw-r--r-- | fs/ceph/caps.c | 33 | ||||
-rw-r--r-- | fs/ceph/dir.c | 65 | ||||
-rw-r--r-- | fs/ceph/file.c | 241 | ||||
-rw-r--r-- | fs/ceph/inode.c | 59 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 5 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 79 | ||||
-rw-r--r-- | fs/ceph/mdsmap.c | 8 | ||||
-rw-r--r-- | fs/ceph/snap.c | 3 | ||||
-rw-r--r-- | fs/ceph/super.c | 7 | ||||
-rw-r--r-- | fs/ceph/super.h | 65 |
11 files changed, 398 insertions, 389 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index a60ea977af6f..3e68ac101040 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -236,15 +236,21 @@ static int ceph_readpage(struct file *filp, struct page *page) static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) { struct inode *inode = req->r_inode; + struct ceph_osd_data *osd_data; int rc = req->r_result; int bytes = le32_to_cpu(msg->hdr.data_len); + int num_pages; int i; dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) { - struct page *page = req->r_pages[i]; + osd_data = osd_req_op_extent_osd_data(req, 0); + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); + num_pages = calc_pages_for((u64)osd_data->alignment, + (u64)osd_data->length); + for (i = 0; i < num_pages; i++) { + struct page *page = osd_data->pages[i]; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ @@ -257,8 +263,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) SetPageUptodate(page); unlock_page(page); page_cache_release(page); + bytes -= PAGE_CACHE_SIZE; } - kfree(req->r_pages); + kfree(osd_data->pages); } static void ceph_unlock_page_vector(struct page **pages, int num_pages) @@ -279,6 +286,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) &ceph_inode_to_client(inode)->client->osdc; struct ceph_inode_info *ci = ceph_inode(inode); struct page *page = list_entry(page_list->prev, struct page, lru); + struct ceph_vino vino; struct ceph_osd_request *req; u64 off; u64 len; @@ -303,18 +311,17 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) len = nr_pages << PAGE_CACHE_SHIFT; dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, off, len); - - req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), - off, &len, - CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, - NULL, 0, + vino = ceph_vino(inode); + req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, + 1, CEPH_OSD_OP_READ, + CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, - NULL, false, 0); + false); if (IS_ERR(req)) return PTR_ERR(req); /* build page vector */ - nr_pages = len >> PAGE_CACHE_SHIFT; + nr_pages = calc_pages_for(0, len); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); ret = -ENOMEM; if (!pages) @@ -336,11 +343,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - req->r_pages = pages; - req->r_num_pages = nr_pages; + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false); req->r_callback = finish_read; req->r_inode = inode; + ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); + dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); ret = ceph_osdc_start_request(osdc, req, false); if (ret < 0) @@ -373,7 +381,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; - dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages, + dout("readpages %p file %p nr_pages %d max %d\n", inode, + file, nr_pages, max); while (!list_empty(page_list)) { rc = start_read(inode, page_list, max); @@ -548,17 +557,23 @@ static void writepages_finish(struct ceph_osd_request *req, { struct inode *inode = req->r_inode; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_osd_data *osd_data; unsigned wrote; struct page *page; + int num_pages; int i; struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; int rc = req->r_result; - u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length); + u64 bytes = req->r_ops[0].extent.length; struct ceph_fs_client *fsc = ceph_inode_to_client(inode); long writeback_stat; unsigned issued = ceph_caps_issued(ci); + osd_data = osd_req_op_extent_osd_data(req, 0); + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); + num_pages = calc_pages_for((u64)osd_data->alignment, + (u64)osd_data->length); if (rc >= 0) { /* * Assume we wrote the pages we originally sent. The @@ -566,7 +581,7 @@ static void writepages_finish(struct ceph_osd_request *req, * raced with a truncation and was adjusted at the osd, * so don't believe the reply. */ - wrote = req->r_num_pages; + wrote = num_pages; } else { wrote = 0; mapping_set_error(mapping, rc); @@ -575,8 +590,8 @@ static void writepages_finish(struct ceph_osd_request *req, inode, rc, bytes, wrote); /* clean all pages */ - for (i = 0; i < req->r_num_pages; i++) { - page = req->r_pages[i]; + for (i = 0; i < num_pages; i++) { + page = osd_data->pages[i]; BUG_ON(!page); WARN_ON(!PageUptodate(page)); @@ -605,32 +620,34 @@ static void writepages_finish(struct ceph_osd_request *req, unlock_page(page); } dout("%p wrote+cleaned %d pages\n", inode, wrote); - ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc); + ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc); - ceph_release_pages(req->r_pages, req->r_num_pages); - if (req->r_pages_from_pool) - mempool_free(req->r_pages, + ceph_release_pages(osd_data->pages, num_pages); + if (osd_data->pages_from_pool) + mempool_free(osd_data->pages, ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); else - kfree(req->r_pages); + kfree(osd_data->pages); ceph_osdc_put_request(req); } -/* - * allocate a page vec, either directly, or if necessary, via a the - * mempool. we avoid the mempool if we can because req->r_num_pages - * may be less than the maximum write size. - */ -static void alloc_page_vec(struct ceph_fs_client *fsc, - struct ceph_osd_request *req) +static struct ceph_osd_request * +ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len, + struct ceph_snap_context *snapc, int num_ops) { - req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages, - GFP_NOFS); - if (!req->r_pages) { - req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); - req->r_pages_from_pool = 1; - WARN_ON(!req->r_pages); - } + struct ceph_fs_client *fsc; + struct ceph_inode_info *ci; + struct ceph_vino vino; + + fsc = ceph_inode_to_client(inode); + ci = ceph_inode(inode); + vino = ceph_vino(inode); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + + return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, offset, len, num_ops, CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK, + snapc, ci->i_truncate_seq, ci->i_truncate_size, true); } /* @@ -653,7 +670,7 @@ static int ceph_writepages_start(struct address_space *mapping, unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; int do_sync; - u64 snap_size = 0; + u64 snap_size; /* * Include a 'sync' in the OSD request if this is a data @@ -699,6 +716,7 @@ static int ceph_writepages_start(struct address_space *mapping, retry: /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); + snap_size = 0; snapc = get_oldest_context(inode, &snap_size); if (!snapc) { /* hmm, why does writepages get called when there @@ -706,6 +724,8 @@ retry: dout(" no snap context with dirty data?\n"); goto out; } + if (snap_size == 0) + snap_size = i_size_read(inode); dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); if (last_snapc && snapc != last_snapc) { @@ -718,10 +738,14 @@ retry: last_snapc = snapc; while (!done && index <= end) { + int num_ops = do_sync ? 2 : 1; + struct ceph_vino vino; unsigned i; int first; pgoff_t next; int pvec_pages, locked_pages; + struct page **pages = NULL; + mempool_t *pool = NULL; /* Becomes non-null if mempool used */ struct page *page; int want; u64 offset, len; @@ -773,11 +797,8 @@ get_more_pages: dout("waiting on writeback %p\n", page); wait_on_page_writeback(page); } - if ((snap_size && page_offset(page) > snap_size) || - (!snap_size && - page_offset(page) > i_size_read(inode))) { - dout("%p page eof %llu\n", page, snap_size ? - snap_size : i_size_read(inode)); + if (page_offset(page) >= snap_size) { + dout("%p page eof %llu\n", page, snap_size); done = 1; unlock_page(page); break; @@ -805,22 +826,23 @@ get_more_pages: break; } - /* ok */ + /* + * We have something to write. If this is + * the first locked page this time through, + * allocate an osd request and a page array + * that it will use. + */ if (locked_pages == 0) { + size_t size; + + BUG_ON(pages); + /* prepare async write request */ - offset = (u64) page_offset(page); + offset = (u64)page_offset(page); len = wsize; - req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, - ceph_vino(inode), - offset, &len, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, - snapc, do_sync, - ci->i_truncate_seq, - ci->i_truncate_size, - &inode->i_mtime, true, 0); + req = ceph_writepages_osd_request(inode, + offset, &len, snapc, + num_ops); if (IS_ERR(req)) { rc = PTR_ERR(req); @@ -828,11 +850,17 @@ get_more_pages: break; } - max_pages = req->r_num_pages; - - alloc_page_vec(fsc, req); req->r_callback = writepages_finish; req->r_inode = inode; + + max_pages = calc_pages_for(0, (u64)len); + size = max_pages * sizeof (*pages); + pages = kmalloc(size, GFP_NOFS); + if (!pages) { + pool = fsc->wb_pagevec_pool; + pages = mempool_alloc(pool, GFP_NOFS); + BUG_ON(!pages); + } } /* note position of first page in pvec */ @@ -850,7 +878,7 @@ get_more_pages: } set_page_writeback(page); - req->r_pages[locked_pages] = page; + pages[locked_pages] = page; locked_pages++; next = page->index + 1; } @@ -879,18 +907,27 @@ get_more_pages: pvec.nr -= i-first; } - /* submit the write */ - offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT; - len = min((snap_size ? snap_size : i_size_read(inode)) - offset, + /* Format the osd request message and submit the write */ + + offset = page_offset(pages[0]); + len = min(snap_size - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - /* revise final length, page count */ - req->r_num_pages = locked_pages; - req->r_request_ops[0].extent.length = cpu_to_le64(len); - req->r_request_ops[0].payload_len = cpu_to_le32(len); - req->r_request->hdr.data_len = cpu_to_le32(len); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, + !!pool, false); + + pages = NULL; /* request message now owns the pages array */ + pool = NULL; + + /* Update the write op length in case we changed it */ + + osd_req_op_extent_update(req, 0, len); + + vino = ceph_vino(inode); + ceph_osdc_build_request(req, offset, snapc, vino.snap, + &inode->i_mtime); rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); BUG_ON(rc); @@ -1067,51 +1104,23 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, struct page **pagep, void **fsdata) { struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_file_info *fi = file->private_data; struct page *page; pgoff_t index = pos >> PAGE_CACHE_SHIFT; - int r, want, got = 0; - - if (fi->fmode & CEPH_FILE_MODE_LAZY) - want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; - else - want = CEPH_CAP_FILE_BUFFER; - - dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n", - inode, ceph_vinop(inode), pos, len, inode->i_size); - r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len); - if (r < 0) - return r; - dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n", - inode, ceph_vinop(inode), pos, len, ceph_cap_string(got)); - if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) { - ceph_put_cap_refs(ci, got); - return -EAGAIN; - } + int r; do { /* get a page */ page = grab_cache_page_write_begin(mapping, index, 0); - if (!page) { - r = -ENOMEM; - break; - } + if (!page) + return -ENOMEM; + *pagep = page; dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len); r = ceph_update_writeable_page(file, pos, len, page); - if (r) - page_cache_release(page); } while (r == -EAGAIN); - if (r) { - ceph_put_cap_refs(ci, got); - } else { - *pagep = page; - *(int *)fsdata = got; - } return r; } @@ -1125,12 +1134,10 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, struct page *page, void *fsdata) { struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_mds_client *mdsc = fsc->mdsc; unsigned from = pos & (PAGE_CACHE_SIZE - 1); int check_cap = 0; - int got = (unsigned long)fsdata; dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, inode, page, (int)pos, (int)copied, (int)len); @@ -1153,19 +1160,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, up_read(&mdsc->snap_rwsem); page_cache_release(page); - if (copied > 0) { - int dirty; - spin_lock(&ci->i_ceph_lock); - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); - spin_unlock(&ci->i_ceph_lock); - if (dirty) - __mark_inode_dirty(inode, dirty); - } - - dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n", - inode, ceph_vinop(inode), pos, len, ceph_cap_string(got)); - ceph_put_cap_refs(ci, got); - if (check_cap) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 78e2f575247d..da0f9b8a3bcb 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -490,15 +490,17 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, ci->i_rdcache_gen++; /* - * if we are newly issued FILE_SHARED, clear D_COMPLETE; we + * if we are newly issued FILE_SHARED, mark dir not complete; we * don't know what happened to this directory while we didn't * have the cap. */ if ((issued & CEPH_CAP_FILE_SHARED) && (had & CEPH_CAP_FILE_SHARED) == 0) { ci->i_shared_gen++; - if (S_ISDIR(ci->vfs_inode.i_mode)) - ceph_dir_clear_complete(&ci->vfs_inode); + if (S_ISDIR(ci->vfs_inode.i_mode)) { + dout(" marking %p NOT complete\n", &ci->vfs_inode); + __ceph_dir_clear_complete(ci); + } } } @@ -553,6 +555,7 @@ retry: cap->implemented = 0; cap->mds = mds; cap->mds_wanted = 0; + cap->mseq = 0; cap->ci = ci; __insert_cap_node(ci, cap); @@ -628,7 +631,10 @@ retry: cap->cap_id = cap_id; cap->issued = issued; cap->implemented |= issued; - cap->mds_wanted |= wanted; + if (mseq > cap->mseq) + cap->mds_wanted = wanted; + else + cap->mds_wanted |= wanted; cap->seq = seq; cap->issue_seq = seq; cap->mseq = mseq; @@ -997,9 +1003,9 @@ static int send_cap_msg(struct ceph_mds_session *session, return 0; } -static void __queue_cap_release(struct ceph_mds_session *session, - u64 ino, u64 cap_id, u32 migrate_seq, - u32 issue_seq) +void __queue_cap_release(struct ceph_mds_session *session, + u64 ino, u64 cap_id, u32 migrate_seq, + u32 issue_seq) { struct ceph_msg *msg; struct ceph_mds_cap_release *head; @@ -2046,6 +2052,13 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, goto out; } + /* finish pending truncate */ + while (ci->i_truncate_pending) { + spin_unlock(&ci->i_ceph_lock); + __ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR)); + spin_lock(&ci->i_ceph_lock); + } + if (need & CEPH_CAP_FILE_WR) { if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { dout("get_cap_refs %p endoff %llu > maxsize %llu\n", @@ -2067,12 +2080,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, } have = __ceph_caps_issued(ci, &implemented); - /* - * disallow writes while a truncate is pending - */ - if (ci->i_truncate_pending) - have &= ~CEPH_CAP_FILE_WR; - if ((have & need) == need) { /* * Look at (implemented & ~have & not) so that we keep waiting diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 6d797f46d772..f02d82b7933e 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -107,7 +107,7 @@ static unsigned fpos_off(loff_t p) * falling back to a "normal" sync readdir if any dentries in the dir * are dropped. * - * D_COMPLETE tells indicates we have all dentries in the dir. It is + * Complete dir indicates that we have all dentries in the dir. It is * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by * the MDS if/when the directory is modified). */ @@ -198,8 +198,8 @@ more: filp->f_pos++; /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_test_complete(dir)) { - dout(" lost D_COMPLETE on %p; falling back to mds\n", dir); + if (!ceph_dir_is_complete(dir)) { + dout(" lost dir complete on %p; falling back to mds\n", dir); err = -EAGAIN; goto out; } @@ -258,7 +258,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) if (filp->f_pos == 0) { /* note dir version at start of readdir so we can tell * if any dentries get dropped */ - fi->dir_release_count = ci->i_release_count; + fi->dir_release_count = atomic_read(&ci->i_release_count); dout("readdir off 0 -> '.'\n"); if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0), @@ -284,7 +284,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) if ((filp->f_pos == 2 || fi->dentry) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && - ceph_dir_test_complete(inode) && + __ceph_dir_is_complete(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { spin_unlock(&ci->i_ceph_lock); err = __dcache_readdir(filp, dirent, filldir); @@ -350,7 +350,8 @@ more: if (!req->r_did_prepopulate) { dout("readdir !did_prepopulate"); - fi->dir_release_count--; /* preclude D_COMPLETE */ + /* preclude from marking dir complete */ + fi->dir_release_count--; } /* note next offset and last dentry name */ @@ -428,8 +429,9 @@ more: * the complete dir contents in our cache. */ spin_lock(&ci->i_ceph_lock); - if (ci->i_release_count == fi->dir_release_count) { - ceph_dir_set_complete(inode); + if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { + dout(" marking %p complete\n", inode); + __ceph_dir_set_complete(ci, fi->dir_release_count); ci->i_max_offset = filp->f_pos; } spin_unlock(&ci->i_ceph_lock); @@ -604,7 +606,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, fsc->mount_options->snapdir_name, dentry->d_name.len) && !is_root_ceph_dentry(dir, dentry) && - ceph_dir_test_complete(dir) && + __ceph_dir_is_complete(ci) && (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { spin_unlock(&ci->i_ceph_lock); dout(" dir %p complete, -ENOENT\n", dir); @@ -1065,44 +1067,6 @@ static int ceph_snapdir_d_revalidate(struct dentry *dentry, } /* - * Set/clear/test dir complete flag on the dir's dentry. - */ -void ceph_dir_set_complete(struct inode *inode) -{ - struct dentry *dentry = d_find_any_alias(inode); - - if (dentry && ceph_dentry(dentry) && - ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) { - dout(" marking %p (%p) complete\n", inode, dentry); - set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags); - } - dput(dentry); -} - -void ceph_dir_clear_complete(struct inode *inode) -{ - struct dentry *dentry = d_find_any_alias(inode); - - if (dentry && ceph_dentry(dentry)) { - dout(" marking %p (%p) complete\n", inode, dentry); - set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags); - } - dput(dentry); -} - -bool ceph_dir_test_complete(struct inode *inode) -{ - struct dentry *dentry = d_find_any_alias(inode); - - if (dentry && ceph_dentry(dentry)) { - dout(" marking %p (%p) NOT complete\n", inode, dentry); - clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags); - } - dput(dentry); - return false; -} - -/* * When the VFS prunes a dentry from the cache, we need to clear the * complete flag on the parent directory. * @@ -1110,15 +1074,13 @@ bool ceph_dir_test_complete(struct inode *inode) */ static void ceph_d_prune(struct dentry *dentry) { - struct ceph_dentry_info *di; - dout("ceph_d_prune %p\n", dentry); /* do we have a valid parent? */ if (IS_ROOT(dentry)) return; - /* if we are not hashed, we don't affect D_COMPLETE */ + /* if we are not hashed, we don't affect dir's completeness */ if (d_unhashed(dentry)) return; @@ -1126,8 +1088,7 @@ static void ceph_d_prune(struct dentry *dentry) * we hold d_lock, so d_parent is stable, and d_fsdata is never * cleared until d_release */ - di = ceph_dentry(dentry->d_parent); - clear_bit(CEPH_D_COMPLETE, &di->flags); + ceph_dir_clear_complete(dentry->d_parent->d_inode); } /* diff --git a/fs/ceph/file.c b/fs/ceph/file.c index bf338d9b67e3..d70830c66833 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -446,19 +446,35 @@ done: } /* - * Write commit callback, called if we requested both an ACK and - * ONDISK commit reply from the OSD. + * Write commit request unsafe callback, called to tell us when a + * request is unsafe (that is, in flight--has been handed to the + * messenger to send to its target osd). It is called again when + * we've received a response message indicating the request is + * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request + * is completed early (and unsuccessfully) due to a timeout or + * interrupt. + * + * This is used if we requested both an ACK and ONDISK commit reply + * from the OSD. */ -static void sync_write_commit(struct ceph_osd_request *req, - struct ceph_msg *msg) +static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) { struct ceph_inode_info *ci = ceph_inode(req->r_inode); - dout("sync_write_commit %p tid %llu\n", req, req->r_tid); - spin_lock(&ci->i_unsafe_lock); - list_del_init(&req->r_unsafe_item); - spin_unlock(&ci->i_unsafe_lock); - ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); + dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid, + unsafe ? "un" : ""); + if (unsafe) { + ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_item, + &ci->i_unsafe_writes); + spin_unlock(&ci->i_unsafe_lock); + } else { + spin_lock(&ci->i_unsafe_lock); + list_del_init(&req->r_unsafe_item); + spin_unlock(&ci->i_unsafe_lock); + ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); + } } /* @@ -470,36 +486,33 @@ static void sync_write_commit(struct ceph_osd_request *req, * objects, rollback on failure, etc.) */ static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t *offset) + size_t left, loff_t pos, loff_t *ppos) { struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; struct ceph_osd_request *req; + int num_ops = 1; struct page **pages; int num_pages; - long long unsigned pos; u64 len; int written = 0; int flags; - int do_sync = 0; int check_caps = 0; int page_align, io_align; unsigned long buf_align; int ret; struct timespec mtime = CURRENT_TIME; + bool own_pages = false; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, *offset, + dout("sync_write on file %p %lld~%u %s\n", file, pos, (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); - if (file->f_flags & O_APPEND) - pos = i_size_read(inode); - else - pos = *offset; - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); if (ret < 0) return ret; @@ -516,7 +529,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) flags |= CEPH_OSD_FLAG_ACK; else - do_sync = 1; + num_ops++; /* Also include a 'startsync' command. */ /* * we may need to do multiple writes here if we span an object @@ -526,25 +539,20 @@ more: io_align = pos & ~PAGE_MASK; buf_align = (unsigned long)data & ~PAGE_MASK; len = left; - if (file->f_flags & O_DIRECT) { - /* write from beginning of first page, regardless of - io alignment */ - page_align = (pos - io_align + buf_align) & ~PAGE_MASK; - num_pages = calc_pages_for((unsigned long)data, len); - } else { - page_align = pos & ~PAGE_MASK; - num_pages = calc_pages_for(pos, len); - } + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - ceph_vino(inode), pos, &len, - CEPH_OSD_OP_WRITE, flags, - ci->i_snap_realm->cached_context, - do_sync, + vino, pos, &len, num_ops, + CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, - &mtime, false, page_align); + false); if (IS_ERR(req)) return PTR_ERR(req); + /* write from beginning of first page, regardless of io alignment */ + page_align = file->f_flags & O_DIRECT ? buf_align : io_align; + num_pages = calc_pages_for(page_align, len); if (file->f_flags & O_DIRECT) { pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { @@ -572,36 +580,20 @@ more: if ((file->f_flags & O_SYNC) == 0) { /* get a second commit callback */ - req->r_safe_callback = sync_write_commit; - req->r_own_pages = 1; + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; + own_pages = true; } } - req->r_pages = pages; - req->r_num_pages = num_pages; - req->r_inode = inode; + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, own_pages); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) { - if (req->r_safe_callback) { - /* - * Add to inode unsafe list only after we - * start_request so that a tid has been assigned. - */ - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_item, - &ci->i_unsafe_writes); - spin_unlock(&ci->i_unsafe_lock); - ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); - } - + if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); - if (ret < 0 && req->r_safe_callback) { - spin_lock(&ci->i_unsafe_lock); - list_del_init(&req->r_unsafe_item); - spin_unlock(&ci->i_unsafe_lock); - ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); - } - } if (file->f_flags & O_DIRECT) ceph_put_page_vector(pages, num_pages, false); @@ -614,12 +606,12 @@ out: pos += len; written += len; left -= len; - data += written; + data += len; if (left) goto more; ret = written; - *offset = pos; + *ppos = pos; if (pos > i_size_read(inode)) check_caps = ceph_inode_set_size(inode, pos); if (check_caps) @@ -653,7 +645,6 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", inode, ceph_vinop(inode), pos, (unsigned)len, inode); again: - __ceph_do_pending_vmtruncate(inode); if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else @@ -717,55 +708,75 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; - loff_t endoff = pos + iov->iov_len; - int got = 0; - int ret, err, written; + ssize_t count, written = 0; + int err, want, got; + bool hold_mutex; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; -retry_snap: - written = 0; - if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) - return -ENOSPC; - __ceph_do_pending_vmtruncate(inode); + sb_start_write(inode->i_sb); + mutex_lock(&inode->i_mutex); + hold_mutex = true; - /* - * try to do a buffered write. if we don't have sufficient - * caps, we'll get -EAGAIN from generic_file_aio_write, or a - * short write if we only get caps for some pages. - */ - if (!(iocb->ki_filp->f_flags & O_DIRECT) && - !(inode->i_sb->s_flags & MS_SYNCHRONOUS) && - !(fi->flags & CEPH_F_SYNC)) { - ret = generic_file_aio_write(iocb, iov, nr_segs, pos); - if (ret >= 0) - written = ret; - - if ((ret >= 0 || ret == -EIOCBQUEUED) && - ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) - || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { - err = vfs_fsync_range(file, pos, pos + written - 1, 1); - if (err < 0) - ret = err; - } - if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff) - goto out; + err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ); + if (err) + goto out; + + /* We can write back this queue in page reclaim */ + current->backing_dev_info = file->f_mapping->backing_dev_info; + + err = generic_write_checks(file, &pos, &count, S_ISBLK(inode->i_mode)); + if (err) + goto out; + + if (count == 0) + goto out; + + err = file_remove_suid(file); + if (err) + goto out; + + err = file_update_time(file); + if (err) + goto out; + +retry_snap: + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { + err = -ENOSPC; + goto out; } - dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n", - inode, ceph_vinop(inode), pos + written, - (unsigned)iov->iov_len - written, inode->i_size); - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff); - if (ret < 0) + dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", + inode, ceph_vinop(inode), pos, count, inode->i_size); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + got = 0; |