diff options
-rw-r--r-- | drivers/block/rbd.c | 111 | ||||
-rw-r--r-- | fs/ceph/acl.c | 4 | ||||
-rw-r--r-- | fs/ceph/addr.c | 308 | ||||
-rw-r--r-- | fs/ceph/caps.c | 836 | ||||
-rw-r--r-- | fs/ceph/dir.c | 383 | ||||
-rw-r--r-- | fs/ceph/file.c | 61 | ||||
-rw-r--r-- | fs/ceph/inode.c | 155 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 425 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 23 | ||||
-rw-r--r-- | fs/ceph/snap.c | 173 | ||||
-rw-r--r-- | fs/ceph/super.c | 25 | ||||
-rw-r--r-- | fs/ceph/super.h | 125 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 65 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 21 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 2 | ||||
-rw-r--r-- | include/linux/crush/crush.h | 40 | ||||
-rw-r--r-- | include/linux/crush/hash.h | 6 | ||||
-rw-r--r-- | include/linux/crush/mapper.h | 2 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 50 | ||||
-rw-r--r-- | net/ceph/crush/crush.c | 13 | ||||
-rw-r--r-- | net/ceph/crush/crush_ln_table.h | 32 | ||||
-rw-r--r-- | net/ceph/crush/hash.c | 8 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 148 | ||||
-rw-r--r-- | net/ceph/messenger.c | 3 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 13 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 42 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 2 | ||||
-rw-r--r-- | net/ceph/pagevec.c | 5 |
28 files changed, 2010 insertions, 1071 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ec6c5c6e1ac9..d94529d5c8e9 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -346,6 +346,7 @@ struct rbd_device { struct rbd_image_header header; unsigned long flags; /* possibly lock protected */ struct rbd_spec *spec; + struct rbd_options *opts; char *header_name; @@ -724,34 +725,36 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) } /* - * mount options + * (Per device) rbd map options */ enum { + Opt_queue_depth, Opt_last_int, /* int args above */ Opt_last_string, /* string args above */ Opt_read_only, Opt_read_write, - /* Boolean args above */ - Opt_last_bool, + Opt_err }; static match_table_t rbd_opts_tokens = { + {Opt_queue_depth, "queue_depth=%d"}, /* int args above */ /* string args above */ {Opt_read_only, "read_only"}, {Opt_read_only, "ro"}, /* Alternate spelling */ {Opt_read_write, "read_write"}, {Opt_read_write, "rw"}, /* Alternate spelling */ - /* Boolean args above */ - {-1, NULL} + {Opt_err, NULL} }; struct rbd_options { + int queue_depth; bool read_only; }; +#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ #define RBD_READ_ONLY_DEFAULT false static int parse_rbd_opts_token(char *c, void *private) @@ -761,27 +764,27 @@ static int parse_rbd_opts_token(char *c, void *private) int token, intval, ret; token = match_token(c, rbd_opts_tokens, argstr); - if (token < 0) - return -EINVAL; - if (token < Opt_last_int) { ret = match_int(&argstr[0], &intval); if (ret < 0) { - pr_err("bad mount option arg (not int) " - "at '%s'\n", c); + pr_err("bad mount option arg (not int) at '%s'\n", c); return ret; } dout("got int token %d val %d\n", token, intval); } else if (token > Opt_last_int && token < Opt_last_string) { - dout("got string token %d val %s\n", token, - argstr[0].from); - } else if (token > Opt_last_string && token < Opt_last_bool) { - dout("got Boolean token %d\n", token); + dout("got string token %d val %s\n", token, argstr[0].from); } else { dout("got token %d\n", token); } switch (token) { + case Opt_queue_depth: + if (intval < 1) { + pr_err("queue_depth out of range\n"); + return -EINVAL; + } + rbd_opts->queue_depth = intval; + break; case Opt_read_only: rbd_opts->read_only = true; break; @@ -789,9 +792,10 @@ static int parse_rbd_opts_token(char *c, void *private) rbd_opts->read_only = false; break; default: - rbd_assert(false); - break; + /* libceph prints "bad option" msg */ + return -EINVAL; } + return 0; } @@ -1563,22 +1567,39 @@ static void rbd_obj_request_end(struct rbd_obj_request *obj_request) /* * Wait for an object request to complete. If interrupted, cancel the * underlying osd request. + * + * @timeout: in jiffies, 0 means "wait forever" */ -static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) +static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request, + unsigned long timeout) { - int ret; + long ret; dout("%s %p\n", __func__, obj_request); - - ret = wait_for_completion_interruptible(&obj_request->completion); - if (ret < 0) { - dout("%s %p interrupted\n", __func__, obj_request); + ret = wait_for_completion_interruptible_timeout( + &obj_request->completion, + ceph_timeout_jiffies(timeout)); + if (ret <= 0) { + if (ret == 0) + ret = -ETIMEDOUT; rbd_obj_request_end(obj_request); - return ret; + } else { + ret = 0; } - dout("%s %p done\n", __func__, obj_request); - return 0; + dout("%s %p ret %d\n", __func__, obj_request, (int)ret); + return ret; +} + +static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) +{ + return __rbd_obj_request_wait(obj_request, 0); +} + +static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request, + unsigned long timeout) +{ + return __rbd_obj_request_wait(obj_request, timeout); } static void rbd_img_request_complete(struct rbd_img_request *img_request) @@ -2001,11 +2022,11 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, rbd_assert(obj_request_type_valid(type)); size = strlen(object_name) + 1; - name = kmalloc(size, GFP_KERNEL); + name = kmalloc(size, GFP_NOIO); if (!name) return NULL; - obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL); + obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); if (!obj_request) { kfree(name); return NULL; @@ -2376,7 +2397,7 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, } if (opcode == CEPH_OSD_OP_DELETE) - osd_req_op_init(osd_request, num_ops, opcode); + osd_req_op_init(osd_request, num_ops, opcode, 0); else osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length, 0, 0); @@ -2848,7 +2869,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) goto out; stat_request->callback = rbd_img_obj_exists_callback; - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, false, false); rbd_osd_req_format_read(stat_request); @@ -3122,6 +3143,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper( bool watch) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_options *opts = osdc->client->options; struct rbd_obj_request *obj_request; int ret; @@ -3148,7 +3170,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper( if (ret) goto out; - ret = rbd_obj_request_wait(obj_request); + ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout); if (ret) goto out; @@ -3750,10 +3772,9 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); rbd_dev->tag_set.ops = &rbd_mq_ops; - rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ; + rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; rbd_dev->tag_set.numa_node = NUMA_NO_NODE; - rbd_dev->tag_set.flags = - BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; + rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; rbd_dev->tag_set.nr_hw_queues = 1; rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); @@ -3773,6 +3794,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) /* set io sizes to object size */ segment_size = rbd_obj_bytes(&rbd_dev->header); blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); + blk_queue_max_segments(q, segment_size / SECTOR_SIZE); blk_queue_max_segment_size(q, segment_size); blk_queue_io_min(q, segment_size); blk_queue_io_opt(q, segment_size); @@ -4044,7 +4066,8 @@ static void rbd_spec_free(struct kref *kref) } static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, - struct rbd_spec *spec) + struct rbd_spec *spec, + struct rbd_options *opts) { struct rbd_device *rbd_dev; @@ -4058,8 +4081,9 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); - rbd_dev->spec = spec; rbd_dev->rbd_client = rbdc; + rbd_dev->spec = spec; + rbd_dev->opts = opts; /* Initialize the layout used for all rbd requests */ @@ -4075,6 +4099,7 @@ static void rbd_dev_destroy(struct rbd_device *rbd_dev) { rbd_put_client(rbd_dev->rbd_client); rbd_spec_put(rbd_dev->spec); + kfree(rbd_dev->opts); kfree(rbd_dev); } @@ -4933,6 +4958,7 @@ static int rbd_add_parse_args(const char *buf, goto out_mem; rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; + rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; copts = ceph_parse_options(options, mon_addrs, mon_addrs + mon_addrs_size - 1, @@ -4963,8 +4989,8 @@ out_err: */ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) { + struct ceph_options *opts = rbdc->client->options; u64 newest_epoch; - unsigned long timeout = rbdc->client->options->mount_timeout * HZ; int tries = 0; int ret; @@ -4979,7 +5005,8 @@ again: if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { ceph_monc_request_next_osdmap(&rbdc->client->monc); (void) ceph_monc_wait_osdmap(&rbdc->client->monc, - newest_epoch, timeout); + newest_epoch, + opts->mount_timeout); goto again; } else { /* the osdmap we have is new enough */ @@ -5148,7 +5175,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev) rbdc = __rbd_get_client(rbd_dev->rbd_client); ret = -ENOMEM; - parent = rbd_dev_create(rbdc, parent_spec); + parent = rbd_dev_create(rbdc, parent_spec, NULL); if (!parent) goto out_err; @@ -5394,9 +5421,6 @@ static ssize_t do_rbd_add(struct bus_type *bus, rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); if (rc < 0) goto err_out_module; - read_only = rbd_opts->read_only; - kfree(rbd_opts); - rbd_opts = NULL; /* done with this */ rbdc = rbd_get_client(ceph_opts); if (IS_ERR(rbdc)) { @@ -5422,11 +5446,12 @@ static ssize_t do_rbd_add(struct bus_type *bus, goto err_out_client; } - rbd_dev = rbd_dev_create(rbdc, spec); + rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); if (!rbd_dev) goto err_out_client; rbdc = NULL; /* rbd_dev now owns this */ spec = NULL; /* rbd_dev now owns this */ + rbd_opts = NULL; /* rbd_dev now owns this */ rc = rbd_dev_image_probe(rbd_dev, true); if (rc < 0) @@ -5434,6 +5459,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, /* If we are mapping a snapshot it must be marked read-only */ + read_only = rbd_dev->opts->read_only; if (rbd_dev->spec->snap_id != CEPH_NOSNAP) read_only = true; rbd_dev->mapping.read_only = read_only; @@ -5458,6 +5484,7 @@ err_out_client: rbd_put_client(rbdc); err_out_args: rbd_spec_put(spec); + kfree(rbd_opts); err_out_module: module_put(THIS_MODULE); diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 64fa248343f6..8f84646f10e9 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -187,10 +187,10 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode, val_size2 = posix_acl_xattr_size(default_acl->a_count); err = -ENOMEM; - tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS); + tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL); if (!tmp_buf) goto out_err; - pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS); + pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL); if (!pagelist) goto out_err; ceph_pagelist_init(pagelist); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e162bcd105ee..890c50971a69 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page) inode = mapping->host; ci = ceph_inode(inode); - /* - * Note that we're grabbing a snapc ref here without holding - * any locks! - */ - snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); - /* dirty the head */ spin_lock(&ci->i_ceph_lock); - if (ci->i_head_snapc == NULL) - ci->i_head_snapc = ceph_get_snap_context(snapc); - ++ci->i_wrbuffer_ref_head; + BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + snapc = ceph_get_snap_context(capsnap->context); + capsnap->dirty_pages++; + } else { + BUG_ON(!ci->i_head_snapc); + snapc = ceph_get_snap_context(ci->i_head_snapc); + ++ci->i_wrbuffer_ref_head; + } if (ci->i_wrbuffer_ref == 0) ihold(inode); ++ci->i_wrbuffer_ref; @@ -346,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) /* build page vector */ nr_pages = calc_pages_for(0, len); - pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); + pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); ret = -ENOMEM; if (!pages) goto out; @@ -358,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) dout("start_read %p adding %p idx %lu\n", inode, page, page->index); if (add_to_page_cache_lru(page, &inode->i_data, page->index, - GFP_NOFS)) { + GFP_KERNEL)) { ceph_fscache_uncache_page(inode, page); page_cache_release(page); dout("start_read %p add_to_page_cache failed %p\n", @@ -436,7 +440,7 @@ out: * only snap context we are allowed to write back. */ static struct ceph_snap_context *get_oldest_context(struct inode *inode, - u64 *snap_size) + loff_t *snap_size) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc = NULL; @@ -476,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) struct ceph_osd_client *osdc; struct ceph_snap_context *snapc, *oldest; loff_t page_off = page_offset(page); + loff_t snap_size = -1; long writeback_stat; - u64 truncate_size, snap_size = 0; + u64 truncate_size; u32 truncate_seq; int err = 0, len = PAGE_CACHE_SIZE; @@ -512,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) spin_lock(&ci->i_ceph_lock); truncate_seq = ci->i_truncate_seq; truncate_size = ci->i_truncate_size; - if (!snap_size) + if (snap_size == -1) snap_size = i_size_read(inode); spin_unlock(&ci->i_ceph_lock); @@ -695,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping, unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; int do_sync = 0; - u64 truncate_size, snap_size; + loff_t snap_size, i_size; + u64 truncate_size; u32 truncate_seq; /* @@ -741,7 +747,7 @@ static int ceph_writepages_start(struct address_space *mapping, retry: /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); - snap_size = 0; + snap_size = -1; snapc = get_oldest_context(inode, &snap_size); if (!snapc) { /* hmm, why does writepages get called when there @@ -749,16 +755,13 @@ retry: dout(" no snap context with dirty data?\n"); goto out; } - if (snap_size == 0) - snap_size = i_size_read(inode); dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); spin_lock(&ci->i_ceph_lock); truncate_seq = ci->i_truncate_seq; truncate_size = ci->i_truncate_size; - if (!snap_size) - snap_size = i_size_read(inode); + i_size = i_size_read(inode); spin_unlock(&ci->i_ceph_lock); if (last_snapc && snapc != last_snapc) { @@ -828,8 +831,10 @@ get_more_pages: dout("waiting on writeback %p\n", page); wait_on_page_writeback(page); } - if (page_offset(page) >= snap_size) { - dout("%p page eof %llu\n", page, snap_size); + if (page_offset(page) >= + (snap_size == -1 ? i_size : snap_size)) { + dout("%p page eof %llu\n", page, + (snap_size == -1 ? i_size : snap_size)); done = 1; unlock_page(page); break; @@ -884,7 +889,8 @@ get_more_pages: } if (do_sync) - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(req, 1, + CEPH_OSD_OP_STARTSYNC, 0); req->r_callback = writepages_finish; req->r_inode = inode; @@ -944,10 +950,18 @@ get_more_pages: } /* Format the osd request message and submit the write */ - offset = page_offset(pages[0]); - len = min(snap_size - offset, - (u64)locked_pages << PAGE_CACHE_SHIFT); + len = (u64)locked_pages << PAGE_CACHE_SHIFT; + if (snap_size == -1) { + len = min(len, (u64)i_size_read(inode) - offset); + /* writepages_finish() clears writeback pages + * according to the data length, so make sure + * data length covers all locked pages */ + len = max(len, 1 + + ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT)); + } else { + len = min(len, snap_size - offset); + } dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); @@ -1032,7 +1046,6 @@ static int ceph_update_writeable_page(struct file *file, { struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; loff_t page_off = pos & PAGE_CACHE_MASK; int pos_in_page = pos & ~PAGE_CACHE_MASK; int end_in_page = pos_in_page + len; @@ -1044,10 +1057,6 @@ retry_locked: /* writepages currently holds page lock, but if we change that later, */ wait_on_page_writeback(page); - /* check snap context */ - BUG_ON(!ci->i_snap_realm); - down_read(&mdsc->snap_rwsem); - BUG_ON(!ci->i_snap_realm->cached_context); snapc = page_snap_context(page); if (snapc && snapc != ci->i_head_snapc) { /* @@ -1055,7 +1064,6 @@ retry_locked: * context! is it writeable now? */ oldest = get_oldest_context(inode, NULL); - up_read(&mdsc->snap_rwsem); if (snapc->seq > oldest->seq) { ceph_put_snap_context(oldest); @@ -1112,7 +1120,6 @@ retry_locked: } /* we need to read it. */ - up_read(&mdsc->snap_rwsem); r = readpage_nounlock(file, page); if (r < 0) goto fail_nosnap; @@ -1157,16 +1164,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, /* * we don't do anything in here that simple_write_end doesn't do - * except adjust dirty page accounting and drop read lock on - * mdsc->snap_rwsem. + * except adjust dirty page accounting */ static int ceph_write_end(struct file *file, struct address_space *mapping, loff_t pos, unsigned len, unsigned copied, struct page *page, void *fsdata) { struct inode *inode = file_inode(file); - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_mds_client *mdsc = fsc->mdsc; unsigned from = pos & (PAGE_CACHE_SIZE - 1); int check_cap = 0; @@ -1188,7 +1192,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, set_page_dirty(page); unlock_page(page); - up_read(&mdsc->snap_rwsem); page_cache_release(page); if (check_cap) @@ -1314,13 +1317,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_cap_flush *prealloc_cf; struct page *page = vmf->page; loff_t off = page_offset(page); loff_t size = i_size_read(inode); size_t len; int want, got, ret; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return VM_FAULT_SIGBUS; + if (ci->i_inline_version != CEPH_INLINE_NONE) { struct page *locked_page = NULL; if (off == 0) { @@ -1330,8 +1337,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = ceph_uninline_data(vma->vm_file, locked_page); if (locked_page) unlock_page(locked_page); - if (ret < 0) - return VM_FAULT_SIGBUS; + if (ret < 0) { + ret = VM_FAULT_SIGBUS; + goto out_free; + } } if (off + PAGE_CACHE_SIZE <= size) @@ -1353,7 +1362,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) break; if (ret != -ERESTARTSYS) { WARN_ON(1); - return VM_FAULT_SIGBUS; + ret = VM_FAULT_SIGBUS; + goto out_free; } } dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", @@ -1373,7 +1383,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) if (ret == 0) { /* success. we'll keep the page locked. */ set_page_dirty(page); - up_read(&mdsc->snap_rwsem); ret = VM_FAULT_LOCKED; } else { if (ret == -ENOMEM) @@ -1389,7 +1398,8 @@ out: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1398,6 +1408,8 @@ out: dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, len, ceph_cap_string(got), ret); ceph_put_cap_refs(ci, got); +out_free: + ceph_free_cap_flush(prealloc_cf); return ret; } @@ -1509,8 +1521,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_vino(inode), 0, &len, 0, 1, CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - ci->i_snap_realm->cached_context, - 0, 0, false); + ceph_empty_snapc, 0, 0, false); if (IS_ERR(req)) { err = PTR_ERR(req); goto out; @@ -1528,7 +1539,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_vino(inode), 0, &len, 1, 3, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - ci->i_snap_realm->cached_context, + ceph_empty_snapc, ci->i_truncate_seq, ci->i_truncate_size, false); if (IS_ERR(req)) { @@ -1597,3 +1608,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma) vma->vm_ops = &ceph_vmops; return 0; } + +enum { + POOL_READ = 1, + POOL_WRITE = 2, +}; + +static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) +{ + struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); + struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; + struct rb_node **p, *parent; + struct ceph_pool_perm *perm; + struct page **pages; + int err = 0, err2 = 0, have = 0; + + down_read(&mdsc->pool_perm_rwsem); + p = &mdsc->pool_perm_tree.rb_node; + while (*p) { + perm = rb_entry(*p, struct ceph_pool_perm, node); + if (pool < perm->pool) + p = &(*p)->rb_left; + else if (pool > perm->pool) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } + } + up_read(&mdsc->pool_perm_rwsem); + if (*p) + goto out; + + dout("__ceph_pool_perm_get pool %u no perm cached\n", pool); + + down_write(&mdsc->pool_perm_rwsem); + parent = NULL; + while (*p) { + parent = *p; + perm = rb_entry(parent, struct ceph_pool_perm, node); + if (pool < perm->pool) + p = &(*p)->rb_left; + else if (pool > perm->pool) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } + } + if (*p) { + up_write(&mdsc->pool_perm_rwsem); + goto out; + } + + rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, + ceph_empty_snapc, + 1, false, GFP_NOFS); + if (!rd_req) { + err = -ENOMEM; + goto out_unlock; + } + + rd_req->r_flags = CEPH_OSD_FLAG_READ; + osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); + rd_req->r_base_oloc.pool = pool; + snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name), + "%llx.00000000", ci->i_vino.ino); + rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); + + wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, + ceph_empty_snapc, + 1, false, GFP_NOFS); + if (!wr_req) { + err = -ENOMEM; + goto out_unlock; + } + + wr_req->r_flags = CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); + wr_req->r_base_oloc.pool = pool; + wr_req->r_base_oid = rd_req->r_base_oid; + + /* one page should be large enough for STAT data */ + pages = ceph_alloc_page_vector(1, GFP_KERNEL); + if (IS_ERR(pages)) { + err = PTR_ERR(pages); + goto out_unlock; + } + + osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, + 0, false, true); + ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP, + &ci->vfs_inode.i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); + + ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, + &ci->vfs_inode.i_mtime); + err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); + + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); + if (!err2) + err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); + + if (err >= 0 || err == -ENOENT) + have |= POOL_READ; + else if (err != -EPERM) + goto out_unlock; + + if (err2 == 0 || err2 == -EEXIST) + have |= POOL_WRITE; + else if (err2 != -EPERM) { + err = err2; + goto out_unlock; + } + + perm = kmalloc(sizeof(*perm), GFP_NOFS); + if (!perm) { + err = -ENOMEM; + goto out_unlock; + } + + perm->pool = pool; + perm->perm = have; + rb_link_node(&perm->node, parent, p); + rb_insert_color(&perm->node, &mdsc->pool_perm_tree); + err = 0; +out_unlock: + up_write(&mdsc->pool_perm_rwsem); + + if (rd_req) + ceph_osdc_put_request(rd_req); + if (wr_req) + ceph_osdc_put_request(wr_req); +out: + if (!err) + err = have; + dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err); + return err; +} + +int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) +{ + u32 pool; + int ret, flags; + + if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), + NOPOOLPERM)) + return 0; + + spin_lock(&ci->i_ceph_lock); + flags = ci->i_ceph_flags; + pool = ceph_file_layout_pg_pool(ci->i_layout); + spin_unlock(&ci->i_ceph_lock); +check: + if (flags & CEPH_I_POOL_PERM) { + if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { + dout("ceph_pool_perm_check pool %u no read perm\n", + pool); + return -EPERM; + } + if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { + dout("ceph_pool_perm_check pool %u no write perm\n", + pool); + return -EPERM; + } + return 0; + } + + ret = __ceph_pool_perm_get(ci, pool); + if (ret < 0) + return ret; + + flags = CEPH_I_POOL_PERM; + if (ret & POOL_READ) + flags |= CEPH_I_POOL_RD; + if (ret & POOL_WRITE) + flags |= CEPH_I_POOL_WR; + + spin_lock(&ci->i_ceph_lock); + if (pool == ceph_file_layout_pg_pool(ci->i_layout)) { + ci->i_ceph_flags = flags; + } else { + pool = ceph_file_layout_pg_pool(ci->i_layout); + flags = ci->i_ceph_flags; + } + spin_unlock(&ci->i_ceph_lock); + goto check; +} + +void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) +{ + struct ceph_pool_perm *perm; + struct rb_node *n; + + while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { + n = rb_first(&mdsc->pool_perm_tree); + perm = rb_entry(n, struct ceph_pool_perm, node); + rb_erase(n, &mdsc->pool_perm_tree); + kfree(perm); + } +} diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index be5ea6af8366..dc10c9dd36c1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci) used |= CEPH_CAP_PIN; if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; - if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) + if (ci->i_rdcache_ref || + (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ + ci->vfs_inode.i_data.nrpages)) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; @@ -926,16 +928,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) /* remove from session list */ spin_lock(&session->s_cap_lock); - /* - * s_cap_reconnect is protected by s_cap_lock. no one changes - * s_cap_gen while session is in the reconnect state. - */ - if (queue_release && - (!session->s_cap_reconnect || - cap->cap_gen == session->s_cap_gen)) - __queue_cap_release(session, ci->i_vino.ino, cap->cap_id, - cap->mseq, cap->issue_seq); - if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */ dout("__ceph_remove_cap delaying %p removal from session %p\n", @@ -948,6 +940,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) } /* protect backpointer with s_cap_lock: see iterate_session_caps */ cap->ci = NULL; + + /* + * s_cap_reconnect is pr |