summaryrefslogtreecommitdiff
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/acl.c16
-rw-r--r--fs/ceph/addr.c412
-rw-r--r--fs/ceph/cache.c145
-rw-r--r--fs/ceph/cache.h44
-rw-r--r--fs/ceph/caps.c949
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c456
-rw-r--r--fs/ceph/export.c10
-rw-r--r--fs/ceph/file.c236
-rw-r--r--fs/ceph/inode.c240
-rw-r--r--fs/ceph/ioctl.c44
-rw-r--r--fs/ceph/mds_client.c508
-rw-r--r--fs/ceph/mds_client.h38
-rw-r--r--fs/ceph/mdsmap.c43
-rw-r--r--fs/ceph/snap.c10
-rw-r--r--fs/ceph/super.c82
-rw-r--r--fs/ceph/super.h70
-rw-r--r--fs/ceph/xattr.c319
18 files changed, 2018 insertions, 1606 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index f19708487e2f..4f67227f69a5 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -37,6 +37,8 @@ static inline void ceph_set_cached_acl(struct inode *inode,
spin_lock(&ci->i_ceph_lock);
if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
set_cached_acl(inode, type, acl);
+ else
+ forget_cached_acl(inode, type);
spin_unlock(&ci->i_ceph_lock);
}
@@ -88,7 +90,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
char *value = NULL;
struct iattr newattrs;
umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
- struct dentry *dentry;
switch (type) {
case ACL_TYPE_ACCESS:
@@ -126,29 +127,26 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
goto out_free;
}
- dentry = d_find_alias(inode);
if (new_mode != old_mode) {
newattrs.ia_mode = new_mode;
newattrs.ia_valid = ATTR_MODE;
- ret = ceph_setattr(dentry, &newattrs);
+ ret = __ceph_setattr(inode, &newattrs);
if (ret)
- goto out_dput;
+ goto out_free;
}
- ret = __ceph_setxattr(dentry, name, value, size, 0);
+ ret = __ceph_setxattr(inode, name, value, size, 0);
if (ret) {
if (new_mode != old_mode) {
newattrs.ia_mode = old_mode;
newattrs.ia_valid = ATTR_MODE;
- ceph_setattr(dentry, &newattrs);
+ __ceph_setattr(inode, &newattrs);
}
- goto out_dput;
+ goto out_free;
}
ceph_set_cached_acl(inode, type, acl);
-out_dput:
- dput(dentry);
out_free:
kfree(value);
out:
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index fc5cae2a0db2..d5b6f959a3c3 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -143,7 +143,7 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
inode = page->mapping->host;
ci = ceph_inode(inode);
- if (offset != 0 || length != PAGE_CACHE_SIZE) {
+ if (offset != 0 || length != PAGE_SIZE) {
dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
inode, page, page->index, offset, length);
return;
@@ -197,10 +197,10 @@ static int readpage_nounlock(struct file *filp, struct page *page)
&ceph_inode_to_client(inode)->client->osdc;
int err = 0;
u64 off = page_offset(page);
- u64 len = PAGE_CACHE_SIZE;
+ u64 len = PAGE_SIZE;
if (off >= i_size_read(inode)) {
- zero_user_segment(page, 0, PAGE_CACHE_SIZE);
+ zero_user_segment(page, 0, PAGE_SIZE);
SetPageUptodate(page);
return 0;
}
@@ -212,7 +212,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
*/
if (off == 0)
return -EINVAL;
- zero_user_segment(page, 0, PAGE_CACHE_SIZE);
+ zero_user_segment(page, 0, PAGE_SIZE);
SetPageUptodate(page);
return 0;
}
@@ -234,9 +234,9 @@ static int readpage_nounlock(struct file *filp, struct page *page)
ceph_fscache_readpage_cancel(inode, page);
goto out;
}
- if (err < PAGE_CACHE_SIZE)
+ if (err < PAGE_SIZE)
/* zero fill remainder of page */
- zero_user_segment(page, err, PAGE_CACHE_SIZE);
+ zero_user_segment(page, err, PAGE_SIZE);
else
flush_dcache_page(page);
@@ -257,12 +257,12 @@ static int ceph_readpage(struct file *filp, struct page *page)
/*
* Finish an async read(ahead) op.
*/
-static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void finish_read(struct ceph_osd_request *req)
{
struct inode *inode = req->r_inode;
struct ceph_osd_data *osd_data;
- int rc = req->r_result;
- int bytes = le32_to_cpu(msg->hdr.data_len);
+ int rc = req->r_result <= 0 ? req->r_result : 0;
+ int bytes = req->r_result >= 0 ? req->r_result : 0;
int num_pages;
int i;
@@ -276,12 +276,14 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
for (i = 0; i < num_pages; i++) {
struct page *page = osd_data->pages[i];
- if (rc < 0 && rc != -ENOENT)
+ if (rc < 0 && rc != -ENOENT) {
+ ceph_fscache_readpage_cancel(inode, page);
goto unlock;
- if (bytes < (int)PAGE_CACHE_SIZE) {
+ }
+ if (bytes < (int)PAGE_SIZE) {
/* zero (remainder of) page */
int s = bytes < 0 ? 0 : bytes;
- zero_user_segment(page, s, PAGE_CACHE_SIZE);
+ zero_user_segment(page, s, PAGE_SIZE);
}
dout("finish_read %p uptodate %p idx %lu\n", inode, page,
page->index);
@@ -290,8 +292,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
ceph_readpage_to_fscache(inode, page);
unlock:
unlock_page(page);
- page_cache_release(page);
- bytes -= PAGE_CACHE_SIZE;
+ put_page(page);
+ bytes -= PAGE_SIZE;
}
kfree(osd_data->pages);
}
@@ -336,7 +338,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
if (max && nr_pages == max)
break;
}
- len = nr_pages << PAGE_CACHE_SHIFT;
+ len = nr_pages << PAGE_SHIFT;
dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
off, len);
vino = ceph_vino(inode);
@@ -364,7 +366,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
if (add_to_page_cache_lru(page, &inode->i_data, page->index,
GFP_KERNEL)) {
ceph_fscache_uncache_page(inode, page);
- page_cache_release(page);
+ put_page(page);
dout("start_read %p add_to_page_cache failed %p\n",
inode, page);
nr_pages = i;
@@ -376,8 +378,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
req->r_callback = finish_read;
req->r_inode = inode;
- ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
-
dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
ret = ceph_osdc_start_request(osdc, req, false);
if (ret < 0)
@@ -415,8 +415,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
if (rc == 0)
goto out;
- if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
- max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
+ if (fsc->mount_options->rsize >= PAGE_SIZE)
+ max = (fsc->mount_options->rsize + PAGE_SIZE - 1)
>> PAGE_SHIFT;
dout("readpages %p file %p nr_pages %d max %d\n", inode,
@@ -484,7 +484,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
long writeback_stat;
u64 truncate_size;
u32 truncate_seq;
- int err = 0, len = PAGE_CACHE_SIZE;
+ int err = 0, len = PAGE_SIZE;
dout("writepage %p idx %lu\n", page, page->index);
@@ -537,8 +537,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
- ceph_readpage_to_fscache(inode, page);
-
set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc,
@@ -546,11 +544,21 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
truncate_seq, truncate_size,
&inode->i_mtime, &page, 1);
if (err < 0) {
- dout("writepage setting page/mapping error %d %p\n", err, page);
+ struct writeback_control tmp_wbc;
+ if (!wbc)
+ wbc = &tmp_wbc;
+ if (err == -ERESTARTSYS) {
+ /* killed by SIGKILL */
+ dout("writepage interrupted page %p\n", page);
+ redirty_page_for_writepage(wbc, page);
+ end_page_writeback(page);
+ goto out;
+ }
+ dout("writepage setting page/mapping error %d %p\n",
+ err, page);
SetPageError(page);
mapping_set_error(&inode->i_data, err);
- if (wbc)
- wbc->pages_skipped++;
+ wbc->pages_skipped++;
} else {
dout("writepage cleaned page %p\n", page);
err = 0; /* vfs expects us to return 0 */
@@ -571,12 +579,16 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
BUG_ON(!inode);
ihold(inode);
err = writepage_nounlock(page, wbc);
+ if (err == -ERESTARTSYS) {
+ /* direct memory reclaimer was killed by SIGKILL. return 0
+ * to prevent caller from setting mapping/page error */
+ err = 0;
+ }
unlock_page(page);
iput(inode);
return err;
}
-
/*
* lame release_pages helper. release_pages() isn't exported to
* modules.
@@ -600,8 +612,7 @@ static void ceph_release_pages(struct page **pages, int num)
* If we get an error, set the mapping error bit, but not the individual
* page error bits.
*/
-static void writepages_finish(struct ceph_osd_request *req,
- struct ceph_msg *msg)
+static void writepages_finish(struct ceph_osd_request *req)
{
struct inode *inode = req->r_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
@@ -615,7 +626,6 @@ static void writepages_finish(struct ceph_osd_request *req,
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
bool remove_page;
-
dout("writepages_finish %p rc %d\n", inode, rc);
if (rc < 0)
mapping_set_error(mapping, rc);
@@ -650,6 +660,9 @@ static void writepages_finish(struct ceph_osd_request *req,
clear_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC);
+ if (rc < 0)
+ SetPageError(page);
+
ceph_put_snap_context(page_snap_context(page));
page->private = 0;
ClearPagePrivate(page);
@@ -718,16 +731,19 @@ static int ceph_writepages_start(struct address_space *mapping,
(wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
- pr_warn("writepage_start %p on forced umount\n", inode);
- truncate_pagecache(inode, 0);
+ if (ci->i_wrbuffer_ref > 0) {
+ pr_warn_ratelimited(
+ "writepage_start %p %lld forced umount\n",
+ inode, ceph_ino(inode));
+ }
mapping_set_error(mapping, -EIO);
return -EIO; /* we're in a forced umount, don't write! */
}
if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
wsize = fsc->mount_options->wsize;
- if (wsize < PAGE_CACHE_SIZE)
- wsize = PAGE_CACHE_SIZE;
- max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
+ if (wsize < PAGE_SIZE)
+ wsize = PAGE_SIZE;
+ max_pages_ever = wsize >> PAGE_SHIFT;
pagevec_init(&pvec, 0);
@@ -737,8 +753,8 @@ static int ceph_writepages_start(struct address_space *mapping,
end = -1;
dout(" cyclic, start at %lu\n", start);
} else {
- start = wbc->range_start >> PAGE_CACHE_SHIFT;
- end = wbc->range_end >> PAGE_CACHE_SHIFT;
+ start = wbc->range_start >> PAGE_SHIFT;
+ end = wbc->range_end >> PAGE_SHIFT;
if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
range_whole = 1;
should_loop = 0;
@@ -887,7 +903,7 @@ get_more_pages:
num_ops = 1 + do_sync;
strip_unit_end = page->index +
- ((len - 1) >> PAGE_CACHE_SHIFT);
+ ((len - 1) >> PAGE_SHIFT);
BUG_ON(pages);
max_pages = calc_pages_for(0, (u64)len);
@@ -901,7 +917,7 @@ get_more_pages:
len = 0;
} else if (page->index !=
- (offset + len) >> PAGE_CACHE_SHIFT) {
+ (offset + len) >> PAGE_SHIFT) {
if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
CEPH_OSD_MAX_OPS)) {
redirty_page_for_writepage(wbc, page);
@@ -929,7 +945,7 @@ get_more_pages:
pages[locked_pages] = page;
locked_pages++;
- len += PAGE_CACHE_SIZE;
+ len += PAGE_SIZE;
}
/* did we get anything? */
@@ -981,7 +997,7 @@ new_request:
BUG_ON(IS_ERR(req));
}
BUG_ON(len < page_offset(pages[locked_pages - 1]) +
- PAGE_CACHE_SIZE - offset);
+ PAGE_SIZE - offset);
req->r_callback = writepages_finish;
req->r_inode = inode;
@@ -1011,7 +1027,7 @@ new_request:
}
set_page_writeback(pages[i]);
- len += PAGE_CACHE_SIZE;
+ len += PAGE_SIZE;
}
if (snap_size != -1) {
@@ -1020,7 +1036,7 @@ new_request:
/* writepages_finish() clears writeback pages
* according to the data length, so make sure
* data length covers all locked pages */
- u64 min_len = len + 1 - PAGE_CACHE_SIZE;
+ u64 min_len = len + 1 - PAGE_SIZE;
len = min(len, (u64)i_size_read(inode) - offset);
len = max(len, min_len);
}
@@ -1063,10 +1079,7 @@ new_request:
pages = NULL;
}
- vino = ceph_vino(inode);
- ceph_osdc_build_request(req, offset, snapc, vino.snap,
- &inode->i_mtime);
-
+ req->r_mtime = inode->i_mtime;
rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
BUG_ON(rc);
req = NULL;
@@ -1099,8 +1112,7 @@ release_pvec_pages:
mapping->writeback_index = index;
out:
- if (req)
- ceph_osdc_put_request(req);
+ ceph_osdc_put_request(req);
ceph_put_snap_context(snapc);
dout("writepages done, rc = %d\n", rc);
return rc;
@@ -1134,14 +1146,21 @@ static int ceph_update_writeable_page(struct file *file,
struct page *page)
{
struct inode *inode = file_inode(file);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
- loff_t page_off = pos & PAGE_CACHE_MASK;
- int pos_in_page = pos & ~PAGE_CACHE_MASK;
+ loff_t page_off = pos & PAGE_MASK;
+ int pos_in_page = pos & ~PAGE_MASK;
int end_in_page = pos_in_page + len;
loff_t i_size;
int r;
struct ceph_snap_context *snapc, *oldest;
+ if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ dout(" page %p forced umount\n", page);
+ unlock_page(page);
+ return -EIO;
+ }
+
retry_locked:
/* writepages currently holds page lock, but if we change that later, */
wait_on_page_writeback(page);
@@ -1165,7 +1184,7 @@ retry_locked:
snapc = ceph_get_snap_context(snapc);
unlock_page(page);
ceph_queue_writeback(inode);
- r = wait_event_interruptible(ci->i_cap_wq,
+ r = wait_event_killable(ci->i_cap_wq,
context_is_writeable_or_written(inode, snapc));
ceph_put_snap_context(snapc);
if (r == -ERESTARTSYS)
@@ -1191,7 +1210,7 @@ retry_locked:
}
/* full page? */
- if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
+ if (pos_in_page == 0 && len == PAGE_SIZE)
return 0;
/* past end of file? */
@@ -1199,12 +1218,12 @@ retry_locked:
if (page_off >= i_size ||
(pos_in_page == 0 && (pos+len) >= i_size &&
- end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
+ end_in_page - pos_in_page != PAGE_SIZE)) {
dout(" zeroing %p 0 - %d and %d - %d\n",
- page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
+ page, pos_in_page, end_in_page, (int)PAGE_SIZE);
zero_user_segments(page,
0, pos_in_page,
- end_in_page, PAGE_CACHE_SIZE);
+ end_in_page, PAGE_SIZE);
return 0;
}
@@ -1228,7 +1247,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
{
struct inode *inode = file_inode(file);
struct page *page;
- pgoff_t index = pos >> PAGE_CACHE_SHIFT;
+ pgoff_t index = pos >> PAGE_SHIFT;
int r;
do {
@@ -1242,7 +1261,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
r = ceph_update_writeable_page(file, pos, len, page);
if (r < 0)
- page_cache_release(page);
+ put_page(page);
else
*pagep = page;
} while (r == -EAGAIN);
@@ -1259,7 +1278,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata)
{
struct inode *inode = file_inode(file);
- unsigned from = pos & (PAGE_CACHE_SIZE - 1);
+ unsigned from = pos & (PAGE_SIZE - 1);
int check_cap = 0;
dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
@@ -1279,7 +1298,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
set_page_dirty(page);
unlock_page(page);
- page_cache_release(page);
+ put_page(page);
if (check_cap)
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
@@ -1292,8 +1311,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
* intercept O_DIRECT reads and writes early, this function should
* never get called.
*/
-static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter,
- loff_t pos)
+static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
{
WARN_ON(1);
return -EINVAL;
@@ -1312,6 +1330,17 @@ const struct address_space_operations ceph_aops = {
.direct_IO = ceph_direct_io,
};
+static void ceph_block_sigs(sigset_t *oldset)
+{
+ sigset_t mask;
+ siginitsetinv(&mask, sigmask(SIGKILL));
+ sigprocmask(SIG_BLOCK, &mask, oldset);
+}
+
+static void ceph_restore_sigs(sigset_t *oldset)
+{
+ sigprocmask(SIG_SETMASK, oldset, NULL);
+}
/*
* vm ops
@@ -1322,28 +1351,26 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
struct page *pinned_page = NULL;
- loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
+ loff_t off = vmf->pgoff << PAGE_SHIFT;
int want, got, ret;
+ sigset_t oldset;
+
+ ceph_block_sigs(&oldset);
dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
- inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE);
+ inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- while (1) {
- got = 0;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
- -1, &got, &pinned_page);
- if (ret == 0)
- break;
- if (ret != -ERESTARTSYS) {
- WARN_ON(1);
- return VM_FAULT_SIGBUS;
- }
- }
+
+ got = 0;
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
+ if (ret < 0)
+ goto out_restore;
+
dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
- inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
+ inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
ci->i_inline_version == CEPH_INLINE_NONE)
@@ -1352,16 +1379,16 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
- inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
+ inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got), ret);
if (pinned_page)
- page_cache_release(pinned_page);
+ put_page(pinned_page);
ceph_put_cap_refs(ci, got);
if (ret != -EAGAIN)
- return ret;
+ goto out_restore;
/* read inline data */
- if (off >= PAGE_CACHE_SIZE) {
+ if (off >= PAGE_SIZE) {
/* does not support inline data > PAGE_SIZE */
ret = VM_FAULT_SIGBUS;
} else {
@@ -1372,27 +1399,35 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
~__GFP_FS));
if (!page) {
ret = VM_FAULT_OOM;
- goto out;
+ goto out_inline;
}
ret1 = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret1 < 0 || off >= i_size_read(inode)) {
unlock_page(page);
- page_cache_release(page);
- ret = VM_FAULT_SIGBUS;
- goto out;
+ put_page(page);
+ if (ret1 < 0)
+ ret = ret1;
+ else
+ ret = VM_FAULT_SIGBUS;
+ goto out_inline;
}
- if (ret1 < PAGE_CACHE_SIZE)
- zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
+ if (ret1 < PAGE_SIZE)
+ zero_user_segment(page, ret1, PAGE_SIZE);
else
flush_dcache_page(page);
SetPageUptodate(page);
vmf->page = page;
ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
+out_inline:
+ dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
+ inode, off, (size_t)PAGE_SIZE, ret);
}
-out:
- dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
- inode, off, (size_t)PAGE_CACHE_SIZE, ret);
+out_restore:
+ ceph_restore_sigs(&oldset);
+ if (ret < 0)
+ ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
+
return ret;
}
@@ -1410,10 +1445,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
loff_t size = i_size_read(inode);
size_t len;
int want, got, ret;
+ sigset_t oldset;
prealloc_cf = ceph_alloc_cap_flush();
if (!prealloc_cf)
- return VM_FAULT_SIGBUS;
+ return VM_FAULT_OOM;
+
+ ceph_block_sigs(&oldset);
if (ci->i_inline_version != CEPH_INLINE_NONE) {
struct page *locked_page = NULL;
@@ -1424,16 +1462,14 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = ceph_uninline_data(vma->vm_file, locked_page);
if (locked_page)
unlock_page(locked_page);
- if (ret < 0) {
- ret = VM_FAULT_SIGBUS;
+ if (ret < 0)
goto out_free;
- }
}
- if (off + PAGE_CACHE_SIZE <= size)
- len = PAGE_CACHE_SIZE;
+ if (off + PAGE_SIZE <= size)
+ len = PAGE_SIZE;
else
- len = size & ~PAGE_CACHE_MASK;
+ len = size & ~PAGE_MASK;
dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
inode, ceph_vinop(inode), off, len, size);
@@ -1441,45 +1477,36 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
- while (1) {
- got = 0;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
- &got, NULL);
- if (ret == 0)
- break;
- if (ret != -ERESTARTSYS) {
- WARN_ON(1);
- ret = VM_FAULT_SIGBUS;
- goto out_free;
- }
- }
+
+ got = 0;
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
+ &got, NULL);
+ if (ret < 0)
+ goto out_free;
+
dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
inode, off, len, ceph_cap_string(got));
/* Update time before taking page lock */
file_update_time(vma->vm_file);
- lock_page(page);
+ do {
+ lock_page(page);
- ret = VM_FAULT_NOPAGE;
- if ((off > size) ||
- (page->mapping != inode->i_mapping)) {
- unlock_page(page);
- goto out;
- }
+ if ((off > size) || (page->mapping != inode->i_mapping)) {
+ unlock_page(page);
+ ret = VM_FAULT_NOPAGE;
+ break;
+ }
+
+ ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
+ if (ret >= 0) {
+ /* success. we'll keep the page locked. */
+ set_page_dirty(page);
+ ret = VM_FAULT_LOCKED;
+ }
+ } while (ret == -EAGAIN);
- ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
- if (ret >= 0) {
- /* success. we'll keep the page locked. */
- set_page_dirty(page);
- ret = VM_FAULT_LOCKED;
- } else {
- if (ret == -ENOMEM)
- ret = VM_FAULT_OOM;
- else
- ret = VM_FAULT_SIGBUS;
- }
-out:
if (ret == VM_FAULT_LOCKED ||
ci->i_inline_version != CEPH_INLINE_NONE) {
int dirty;
@@ -1496,8 +1523,10 @@ out:
inode, off, len, ceph_cap_string(got), ret);
ceph_put_cap_refs(ci, got);
out_free:
+ ceph_restore_sigs(&oldset);
ceph_free_cap_flush(prealloc_cf);
-
+ if (ret < 0)
+ ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
return ret;
}
@@ -1519,7 +1548,7 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
return;
if (PageUptodate(page)) {
unlock_page(page);
- page_cache_release(page);
+ put_page(page);
return;
}
}
@@ -1534,14 +1563,14 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
}
if (page != locked_page) {
- if (len < PAGE_CACHE_SIZE)
- zero_user_segment(page, len, PAGE_CACHE_SIZE);
+ if (len < PAGE_SIZE)
+ zero_user_segment(page, len, PAGE_SIZE);
else
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
- page_cache_release(page);
+ put_page(page);
}
}
@@ -1578,7 +1607,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
from_pagecache = true;
lock_page(page);
} else {
- page_cache_release(page);
+ put_page(page);
page = NULL;
}
}
@@ -1586,8 +1615,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
if (page) {
len = i_size_read(inode);
- if (len > PAGE_CACHE_SIZE)
- len = PAGE_CACHE_SIZE;
+ if (len > PAGE_SIZE)
+ len = PAGE_SIZE;
} else {
page = __page_cache_alloc(GFP_NOFS);
if (!page) {
@@ -1615,7 +1644,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
goto out;
}
- ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+ req->r_mtime = inode->i_mtime;
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1658,7 +1687,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
goto out_put;
}
- ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+ req->r_mtime = inode->i_mtime;
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1670,7 +1699,7 @@ out:
if (page && page != locked_page) {
if (from_pagecache) {
unlock_page(page);
- page_cache_release(page);
+ put_page(page);
} else
__free_pages(page, 0);
}
@@ -1701,7 +1730,8 @@ enum {
POOL_WRITE = 2,
};
-static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
+ s64 pool, struct ceph_string *pool_ns)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1709,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
struct rb_node **p, *parent;
struct ceph_pool_perm *perm;
struct page **pages;
+ size_t pool_ns_len;
int err = 0, err2 = 0, have = 0;
down_read(&mdsc->pool_perm_rwsem);
@@ -1720,17 +1751,31 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
else if (pool > perm->pool)
p = &(*p)->rb_right;
else {
- have = perm->perm;
- break;
+ int ret = ceph_compare_string(pool_ns,
+ perm->pool_ns,
+ perm->pool_ns_len);
+ if (ret < 0)
+ p = &(*p)->rb_left;
+ else if (ret > 0)
+ p = &(*p)->rb_right;
+ else {
+ have = perm->perm;
+ break;
+ }
}
}
up_read(&mdsc->pool_perm_rwsem);
if (*p)
goto out;
- dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);
+ if (pool_ns)
+ dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
+ pool, (int)pool_ns->len, pool_ns->str);
+ else
+ dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
down_write(&mdsc->pool_perm_rwsem);
+ p = &mdsc->pool_perm_tree.rb_node;
parent = NULL;
while (*p) {
parent = *p;
@@ -1740,8 +1785,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
else if (pool > perm->pool)
p = &(*p)->rb_right;
else {
- have = perm->perm;
- break;
+ int ret = ceph_compare_string(pool_ns,
+ perm->pool_ns,
+ perm->pool_ns_len);
+ if (ret < 0)
+ p = &(*p)->rb_left;
+ else if (ret > 0)
+ p = &(*p)->rb_right;
+ else {
+ have = perm->perm;
+ break;
+ }
}
}
if (*p) {
@@ -1759,9 +1813,13 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
rd_req->r_flags = CEPH_OSD_FLAG_READ;
osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
rd_req->r_base_oloc.pool = pool;
- snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name),
- "%llx.00000000", ci->i_vino.ino);
- rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
+ if (pool_ns)
+ rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
+ ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
+
+ err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
+ if (err)
+ goto out_unlock;
wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1, false, GFP_NOFS);
@@ -1770,11 +1828,14 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
goto out_unlock;
}
- wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK;
osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
- wr_req->r_base_oloc.pool = pool;
- wr_req->r_base_oid = rd_req->r_base_oid;
+ ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
+ ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
+
+ err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
+ if (err)
+ goto out_unlock;
/* one page should be large enough for STAT data */
pages = ceph_alloc_page_vector(1, GFP_KERNEL);
@@ -1785,12 +1846,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
0, false, true);
- ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
- &ci->vfs_inode.i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
- ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
- &ci->vfs_inode.i_mtime);
+ wr_req->r_mtime = ci->vfs_inode.i_mtime;
err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
if (!err)
@@ -1810,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
goto out_unlock;
}
- perm = kmalloc(sizeof(*perm), GFP_NOFS);
+ pool_ns_len = pool_ns ? pool_ns->len : 0;
+ perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
if (!perm) {
err = -ENOMEM;
goto out_unlock;
@@ -1818,56 +1877,62 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
perm->pool = pool;
perm->perm = have;
+ perm->pool_ns_len = pool_ns_len;
+ if (pool_ns_len > 0)
+ memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
+ perm->pool_ns[pool_ns_len] = 0;
+
rb_link_node(&perm->node, parent, p);
rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
err = 0;
out_unlock:
up_write(&mdsc->pool_perm_rwsem);
- if (rd_req)
- ceph_osdc_put_request(rd_req);
- if (wr_req)
- ceph_osdc_put_request(wr_req);
+ ceph_osdc_put_request(rd_req);
+ ceph_osdc_put_request(wr_req);
out:
if (!err)
err = have;
- dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
+ if (pool_ns)
+ dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
+ pool, (int)pool_ns->len, pool_ns->str, err);
+ else
+ dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
return err;
}
int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
{
- u32 pool;
+ s64 pool;
+ struct ceph_string *pool_ns;
int ret, flags;
- /* does not support pool namespace yet */
- if (ci->i_pool_ns_len)
- return -EIO;
-
if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM))
return 0;
spin_lock(&ci->i_ceph_lock);
flags = ci->i_ceph_flags;
- pool = ceph_file_layout_pg_pool(ci->i_layout);
+ pool = ci->i_layout.pool_id;
spin_unlock(&ci->i_ceph_lock);
check:
if (flags & CEPH_I_POOL_PERM) {
if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
- dout("ceph_pool_perm_check pool %u no read perm\n",
+ dout("ceph_pool_perm_check pool %lld no read perm\n",
pool);
return -EPERM;
}
if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
- dout("ceph_pool_perm_check pool %u no write perm\n",
+ dout("ceph_pool_perm_check pool %lld no write perm\n",
pool);
return -EPERM;
}
return 0;
}
- ret = __ceph_pool_perm_get(ci, pool);
+ pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
+ ret = __ceph_pool_perm_get(ci, pool, pool_ns);
+ ceph_put_string(pool_ns);
if (ret < 0)
return ret;
@@ -1878,10 +1943,11 @@ check:
flags |= CEPH_I_POOL_WR;
spin_lock(&ci->i_ceph_lock);
- if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
- ci->i_ceph_flags = flags;
+ if (pool == ci->i_layout.pool_id &&
+ pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
+ ci->i_ceph_flags |= flags;
} else {
- pool = ceph_file_layout_pg_pool(ci->i_layout);
+ pool = ci->i_layout.pool_id;
flags = ci->i_ceph_flags;
}
spin_unlock(&ci->i_ceph_lock);
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index a351480dbabc..5bc5d37b1217 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -25,6 +25,7 @@
#include "cache.h"
struct ceph_aux_inode {
+ u64 version;
struct timespec mtime;
loff_t size;
};
@@ -69,15 +70,8 @@ int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
&ceph_fscache_fsid_object_def,
fsc, true);
-
- if (fsc->fscache == NULL) {
- pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
- return 0;
- }
-
- fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1);
- if (fsc->revalidate_wq == NULL)
- return -ENOMEM;
+ if (!fsc->fscache)
+ pr_err("Unable to register fsid: %p fscache cookie\n", fsc);
return 0;
}
@@ -105,6 +99,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
const struct inode* inode = &ci->vfs_inode;
memset(&aux, 0, sizeof(aux));
+ aux.version = ci->i_version;
aux.mtime = inode->i_mtime;
aux.size = i_size_read(inode);
@@ -131,6 +126,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
return FSCACHE_CHECKAUX_OBSOLETE;
memset(&aux, 0, sizeof(aux));
+ aux.version = ci->i_version;
aux.mtime = inode->i_mtime;
aux.size = i_size_read(inode);
@@ -181,32 +177,26 @@ static const struct fscache_cookie_def ceph_fscache_inode_object_def = {
.now_uncached = ceph_fscache_inode_now_uncached,
};
-void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
- struct ceph_inode_info* ci)
+void ceph_fscache_register_inode_cookie(struct inode *inode)
{
- struct inode* inode = &ci->vfs_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
/* No caching for filesystem */
if (fsc->fscache == NULL)
return;
/* Only cache for regular files that are read only */
- if ((ci->vfs_inode.i_mode & S_IFREG) == 0)
+ if (!S_ISREG(inode->i_mode))
return;
- /* Avoid multiple racing open requests */
- inode_lock(inode);
-
- if (ci->fscache)
- goto done;
-
- ci->fscache = fscache_acquire_cookie(fsc->fscache,
- &ceph_fscache_inode_object_def,
- ci, true);
- fscache_check_consistency(ci->fscache);
-done:
+ inode_lock_nested(inode, I_MUTEX_CHILD);
+ if (!ci->fscache) {
+ ci->fscache = fscache_acquire_cookie(fsc->fscache,
+ &ceph_fscache_inode_object_def,
+ ci, false);
+ }
inode_unlock(inode);
-
}
void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
@@ -222,6 +212,34 @@ void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
fscache_relinquish_cookie(cookie, 0);
}
+static bool ceph_fscache_can_enable(void *data)
+{
+ struct inode *inode = data;
+ return !inode_is_open_for_write(inode);
+}
+
+void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ if (!fscache_cookie_valid(ci->fscache))
+ return;
+
+ if (inode_is_open_for_write(inode)) {
+ dout("fscache_file_set_cookie %p %p disabling cache\n",
+ inode, filp);
+ fscache_disable_cookie(ci->fscache, false);
+ fscache_uncache_all_inode_pages(ci->fscache, inode);
+ } else {
+ fscache_enable_cookie(ci->fscache, ceph_fscache_can_enable,
+ inode);
+ if (fscache_cookie_enabled(ci->fscache)) {
+ dout("fscache_file_set_cookie %p %p enabing cache\n",
+ inode, filp);
+ }
+ }
+}
+
static void ceph_vfs_readpage_complete(struct page *page, void *data, int error)
{
if (!error)
@@ -236,10 +254,9 @@ static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int
unlock_page(page);
}
-static inline int cache_valid(struct ceph_inode_info *ci)
+static inline bool cache_valid(struct ceph_inode_info *ci)
{
- return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&
- (ci->i_fscache_gen == ci->i_rdcache_gen));
+ return ci->i_fscache_gen == ci->i_rdcache_gen;
}
@@ -332,69 +349,27 @@ void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
{
- if (fsc->revalidate_wq)
- destroy_workqueue(fsc->revalidate_wq);
-
fscache_relinquish_cookie(fsc->fscache, 0);
fsc->fscache = NULL;
}
-static void ceph_revalidate_work(struct work_struct *work)
-{
- int issued;
- u32 orig_gen;
- struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
- i_revalidate_work);
- struct inode *inode = &ci->vfs_inode;
-
- spin_lock(&ci->i_ceph_lock);
- issued = __ceph_caps_issued(ci, NULL);
- orig_gen = ci->i_rdcache_gen;
- spin_unlock(&ci->i_ceph_lock);
-
- if (!(issued & CEPH_CAP_FILE_CACHE)) {
- dout("revalidate_work lost cache before validation %p\n",
- inode);
- goto out;
- }
-
- if (!fscache_check_consistency(ci->fscache))
- fscache_invalidate(ci->fscache);
-
- spin_lock(&ci->i_ceph_lock);
- /* Update the new valid generation (backwards sanity check too) */
- if (orig_gen > ci->i_fscache_gen) {
- ci->i_fscache_gen = orig_gen;
- }
- spin_unlock(&ci->i_ceph_lock);
-
-out:
- iput(&ci->vfs_inode);
-}
-
-void ceph_queue_revalidate(struct inode *inode)
+/*
+ * caller should hold CEPH_CAP_FILE_{RD,CACHE}
+ */
+void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci)
{
- struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
- struct ceph_inode_info *ci = ceph_inode(inode);
-
- if (fsc->revalidate_wq == NULL || ci->fscache == NULL)
+ if (cache_valid(ci))
return;
- ihold(inode);
-
- if (queue_work(ceph_sb_to_client(inode->i_sb)->revalidate_wq,
- &ci->i_revalidate_work)) {
- dout("ceph_queue_revalidate %p\n", inode);
- } else {
- dout("ceph_queue_revalidate %p failed\n)", inode);
- iput(inode);
+ /* resue i_truncate_mutex. There should be no pending
+ * truncate while the caller holds CEPH_CAP_FILE_RD */
+ mutex_lock(&ci->i_truncate_mutex);
+ if (!cache_valid(ci)) {
+ if (fscache_check_consistency(ci->fscache))
+ fscache_invalidate(ci->fscache);
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_fscache_gen = ci->i_rdcache_gen;
+ spin_unlock(&ci->i_ceph_lock);
}
-}
-
-void ceph_fscache_inode_init(struct ceph_inode_info *ci)
-{
- ci->fscache = NULL;
- /* The first load is verifed cookie open time */
- ci->i_fscache_gen = 1;
- INIT_WORK(&ci->i_revalidate_work, ceph_revalidate_work);
+ mutex_unlock(&ci->i_truncate_mutex);
}
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index 5ac591bd012b..7e72c7594f0c 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -34,10 +34,10 @@ void ceph_fscache_unregister(void);
int ceph_fscache_register_fs(struct ceph_fs_client* fsc);
void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc);
-void ceph_fscache_inode_init(struct ceph_inode_info *ci);
-void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
- struct ceph_inode_info* ci);
+void ceph_fscache_register_inode_cookie(struct inode *inode);
void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci);
+void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp);
+void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci);
int ceph_readpage_from_fscache(struct inode *inode, struct page *page);
int ceph_readpages_from_fscache(struct inode *inode,
@@ -46,12 +46,11 @@ int ceph_readpages_from_fscache(struct inode *inode,
unsigned *nr_pages);
void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
-void ceph_queue_revalidate(struct inode *inode);
-static inline void ceph_fscache_update_objectsize(struct inode *inode)
+static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
{
- struct ceph_inode_info *ci = ceph_inode(inode);
- fscache_attr_changed(ci->fscache);
+ ci->fscache = NULL;
+ ci->i_fscache_gen = 0;
}
static inline void ceph_fscache_invalidate(struct inode *inode)
@@ -88,6 +87,11 @@ static inline void ceph_fscache_readpages_cancel(struct inode *inode,
return fscache_readpages_cancel(ci->fscache, pages);
}
+static inline void ceph_disable_fscache_readpage(struct ceph_inode_info *ci)
+{
+ ci->i_fscache_gen = ci->i_rdcache_gen - 1;
+}
+
#else
static inline int ceph_fscache_register(void)
@@ -112,8 +116,20 @@ static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
{
}
-static inline void ceph_fscache_register_inode_cookie(struct ceph_fs_client* parent_fsc,
- struct ceph_inode_info* ci)
+static inline void ceph_fscache_register_inode_cookie(struct inode *inode)
+{
+}
+
+static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
+{
+}
+
+static inline void ceph_fscache_file_set_cookie(struct inode *inode,
+ struct file *filp)
+{
+}
+
+static inline void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci)
{
}
@@ -141,10 +157,6 @@ static inline void ceph_readpage_to_fscache(struct inode *inode,
{
}
-static inline void ceph_fscache_update_objectsize(struct inode *inode)
-{
-}
-
static inline void ceph_fscache_invalidate(struct inode *inode)
{
}
@@ -154,10 +166,6 @@ static inline void ceph_invalidate_fscache_page(struct inode *inode,
{
}
-static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
-{
-}
-
static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
{
return 1;
@@ -173,7 +181,7 @@ static inline void ceph_fscache_readpages_cancel(struct inode *inode,
{
}
-static inline void ceph_queue_revalidate(struct inode *inode)
+static inline void ceph_disable_fscache_readpage(struct ceph_inode_info *ci)
{
}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index de17bb232ff8..99115cae1652 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -40,6 +40,11 @@
* cluster to release server state.
*/
+static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
+static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ struct ceph_inode_info *ci,
+ u64 oldest_flush_tid);
/*
* Generate readable cap strings for debugging output.
@@ -849,12 +854,14 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
*/
int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
- int want = 0;
- int mode;
- for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
- if (ci->i_nr_by_mode[mode])
- want |= ceph_caps_for_mode(mode);
- return want;
+ int i, bits = 0;
+ for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+ if (ci->i_nr_by_mode[i])
+ bits |= 1 << i;
+ }
+ if (bits == 0)
+ return 0;
+ return ceph_caps_for_mode(bits >> 1);
}
/*
@@ -991,7 +998,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
u32 seq, u64 flush_tid, u64 oldest_flush_tid,
u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
- struct timespec *ctime, u64 time_warp_seq,
+ struct timespec *ctime, u32 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version,
struct ceph_buffer *xattrs_buf,
@@ -1116,8 +1123,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
struct inode *inode = &ci->vfs_inode;
u64 cap_id = cap->cap_id;
int held, revoking, dropping, keep;
- u64 seq, issue_seq, mseq, time_warp_seq, follows;
- u64 size, max_size;
+ u64 follows, size, max_size;
+ u32 seq, issue_seq, mseq, time_warp_seq;
struct timespec mtime, atime, ctime;
int wake = 0;
umode_t mode;
@@ -1215,6 +1222,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
return delayed;
}
+static inline int __send_flush_snap(struct inode *inode,
+ struct ceph_mds_session *session,
+ struct ceph_cap_snap *capsnap,
+ u32 mseq, u64 oldest_flush_tid)
+{
+ return send_cap_msg(session, ceph_vino(inode).ino, 0,
+ CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
+ capsnap->dirty, 0, capsnap->cap_flush.tid,
+ oldest_flush_tid, 0, mseq, capsnap->size, 0,
+ &capsnap->mtime, &capsnap->atime,
+ &capsnap->ctime, capsnap->time_warp_seq,
+ capsnap->uid, capsnap->gid, capsnap->mode,
+ capsnap->xattr_version, capsnap->xattr_blob,
+ capsnap->follows, capsnap->inline_data);
+}
+
/*
* When a snapshot is taken, clients accumulate dirty metadata on
* inodes with capabilities in ceph_cap_snaps to describe the file
@@ -1222,37 +1245,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
* asynchronously back to the MDS once sync writes complete and dirty
* data is written out.
*
- * Unless @kick is true, skip cap_snaps that were already sent to
- * the MDS (i.e., during this session).
- *
* Called under i_ceph_lock. Takes s_mutex as needed.
*/
-void __ceph_flush_snaps(struct ceph_inode_info *ci,
- struct ceph_mds_session **psession,
- int kick)
+static void __ceph_flush_snaps(struct ceph_inode_info *ci,
+ struct ceph_mds_session *session)
__releases(ci->i_ceph_lock)
__acquires(ci->i_ceph_lock)
{
struct inode *inode = &ci->vfs_inode;
- int mds;
+ struct ceph_mds_client *mdsc = session->s_mdsc;
struct ceph_cap_snap *capsnap;
- u32 mseq;
- struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
- struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
- session->s_mutex */
- u64 next_follows = 0; /* keep track of how far we've gotten through the
- i_cap_snaps list, and skip these entries next time
- around to avoid an infinite loop */
+ u64 oldest_flush_tid = 0;
+ u64 first_tid = 1, last_tid = 0;
- if (psession)
- session = *psession;
+ dout("__flush_snaps %p session %p\n", inode, session);
- dout("__flush_snaps %p\n", inode);
-retry:
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
- /* avoid an infiniute loop after retry */
- if (capsnap->follows < next_follows)
- continue;
/*
* we need to wait for sync writes to complete and for dirty
* pages to be written out.
@@ -1263,97 +1271,129 @@ retry:
/* should be removed by ceph_try_drop_cap_snap() */
BUG_ON(!capsnap->need_flush);
- /* pick mds, take s_mutex */
- if (ci->i_auth_cap == NULL) {
- dout("no auth cap (migrating?), doing nothing\n");
- goto out;
- }
-
/* only flush each capsnap once */
- if (!kick && !list_empty(&capsnap->flushing_item)) {
- dout("already flushed %p, skipping\n", capsnap);
+ if (capsnap->cap_flush.tid > 0) {
+ dout(" already flushed %p, skipping\n", capsnap);
continue;
}
- mds = ci->i_auth_cap->session->s_mds;
- mseq = ci->i_auth_cap->mseq;
+ spin_lock(&mdsc->cap_dirty_lock);
+ capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
+ list_add_tail(&capsnap->cap_flush.g_list,
+ &mdsc->cap_flush_list);
+ if (oldest_flush_tid == 0)
+ oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+ if (list_empty(&ci->i_flushing_item)) {
+ list_add_tail(&ci->i_flushing_item,
+ &session->s_cap_flushing);
+ }
+ spin_unlock(&mdsc->cap_dirty_lock);
- if (session && session->s_mds != mds) {
- dout("oops, wrong session %p mutex\n", session);
- if (kick)
- goto out;
+ list_add_tail(&capsnap->cap_flush.i_list,
+ &ci->i_cap_flush_list);
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
- session = NULL;
+ if (first_tid == 1)
+ first_tid = capsnap->cap_flush.tid;
+ last_tid = capsnap->cap_flush.tid;
+ }
+
+ ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
+
+ while (first_tid <= last_tid) {
+ struct ceph_cap *cap = ci->i_auth_cap;
+ struct ceph_cap_flush *cf;
+ int ret;
+
+ if (!(cap && cap->session == session)) {
+ dout("__flush_snaps %p auth cap %p not mds%d, "
+ "stop\n", inode, cap, session->s_mds);
+ break;
}
- if (!session) {
- spin_unlock(&ci->i_ceph_lock);
- mutex_lock(&mdsc->mutex);
- session = __ceph_lookup_mds_session(mdsc, mds);
- mutex_unlock(&mdsc->mutex);
- if (session) {
- dout("inverting session/ino locks on %p\n",
- session);
- mutex_lock(&session->s_mutex);
+
+ ret = -ENOENT;
+ list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
+ if (cf->tid >= first_tid) {
+ ret = 0;
+ break;
}
- /*
- * if session == NULL, we raced against a cap
- * deletion or migration. retry, and we'll
- * get a better @mds value next time.
- */
- spin_lock(&ci->i_ceph_lock);
- goto retry;
}
+ if (ret < 0)
+ break;
- spin_lock(&mdsc->cap_dirty_lock);
- capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
- spin_unlock(&mdsc->cap_dirty_lock);
+ first_tid = cf->tid + 1;
+ capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
atomic_inc(&capsnap->nref);
- if (list_empty(&capsnap->flushing_item))
- list_add_tail(&capsnap->flushing_item,
- &session->s_cap_snaps_flushing);
spin_unlock(&ci->i_ceph_lock);
- dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
- inode, capsnap, capsnap->follows, capsnap->flush_tid);
- send_cap_msg(session, ceph_vino(inode).ino, 0,
- CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
- capsnap->dirty, 0, capsnap->flush_tid, 0,
- 0, mseq, capsnap->size, 0,
- &capsnap->mtime, &capsnap->atime,
- &capsnap->ctime, capsnap->time_warp_seq,
- capsnap->uid, capsnap->gid, capsnap->mode,
- capsnap->xattr_version, capsnap->xattr_blob,
- capsnap->follows, capsnap->inline_data);
-
- next_follows = capsnap->follows + 1;
- ceph_put_cap_snap(capsnap);
+ dout("__flush_snaps %p capsnap %p tid %llu %s\n",
+ inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
+ ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
+ oldest_flush_tid);
+ if (ret < 0) {
+ pr_err("__flush_snaps: error sending cap flushsnap, "
+ "ino (%llx.%llx) tid %llu follows %llu\n",
+ ceph_vinop(inode), cf->tid, capsnap->follows);
+ }
+
+ ceph_put_cap_snap(capsnap);
spin_lock(&ci->i_ceph_lock);
- goto retry;
}
+}
- /* we flushed them all; remove this inode from the queue */
- spin_lock(&mdsc->snap_flush_lock);
- list_del_init(&ci->i_snap_flush_item);
- spin_unlock(&mdsc->snap_flush_lock);
+void ceph_flush_snaps(struct ceph_inode_info *ci,
+ struct ceph_mds_session **psession)
+{
+ struct inode *inode = &ci->vfs_inode;
+ struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+ struct ceph_mds_session *session = *psession;
+ int mds;
+ dout("ceph_flush_snaps %p\n", inode);
+retry:
+ spin_lock(&ci->i_ceph_lock);
+ if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
+ dout(" no capsnap needs flush, doing nothing\n");
+ goto out;
+ }
+ if (!ci->i_auth_cap) {
+ dout(" no auth cap (migrating?), doing nothing\n");
+ goto out;
+ }
-out:
- if (psession)
- *psession = session;
- else if (session) {
+ mds = ci->i_auth_cap->session->s_mds;
+ if (session && session->s_mds != mds) {
+ dout(" oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
+ session = NULL;
+ }
+ if (!session) {
+ spin_unlock(&ci->i_ceph_lock);
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_lookup_mds_session(mdsc, mds);
+ mutex_unlock(&mdsc->mutex);
+ if (session) {
+ dout(" inverting session/ino locks on %p\n", session);
+ mutex_lock(&session->s_mutex);
+ }
+ goto retry;
}
-}
-static void ceph_flush_snaps(struct ceph_inode_info *ci)
-{
- spin_lock(&ci->i_ceph_lock);
- __ceph_flush_snaps(ci, NULL, 0);
+ __ceph_flush_snaps(ci, session);
+out:
spin_unlock(&ci->i_ceph_lock);
+
+ if (psession) {
+ *psession = session;
+ } else {
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ }
+ /* we flushed them all; remove this inode from the queue */
+ spin_lock(&mdsc->snap_flush_lock);
+ list_del_init(&ci->i_snap_flush_item);
+ spin_unlock(&mdsc->snap_flush_lock);
}
/*
@@ -1411,52 +1451,6 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
return dirty;
}
-static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
- struct ceph_cap_flush *cf)
-{
- struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
- struct rb_node *parent = NULL;
- struct ceph_cap_flush *other = NULL;
-
- while (*p) {
- parent = *p;
- other = rb_entry(parent, struct ceph_cap_flush, i_node);
-
- if (cf->tid < other->tid)
- p = &(*p)->rb_left;
- else if (cf->tid > other->tid)
- p = &(*p)->rb_right;
- else
- BUG();
- }
-
- rb_link_node(&cf->i_node, parent, p);
- rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
-}
-
-static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
- struct ceph_cap_flush *cf)
-{
- struct rb_node **p = &mdsc->cap_flush_tree.rb_node;
- struct rb_node *parent = NULL;
- struct ceph_cap_flush *other = NULL;
-
- while (*p) {
- parent = *p;
- other = rb_entry(parent, struct ceph_cap_flush, g_node);
-
- if (cf->tid < other->tid)
- p = &(*p)->rb_left;
- else if (cf->tid > other->tid)
- p = &(*p)->rb_right;
- else
- BUG();
- }
-
- rb_link_node(&cf->g_node, parent, p);
- rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
-}
-
struct ceph_cap_flush *ceph_alloc_cap_flush(void)
{
return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
@@ -1470,23 +1464,54 @@ void ceph_free_cap_flush(struct ceph_cap_flush *cf)
static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
{
- struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
- if (n) {
+ if (!list_empty(&mdsc->cap_flush_list)) {
struct ceph_cap_flush *cf =
- rb_entry(n, struct ceph_cap_flush, g_node);
+ list_first_entry(&mdsc->cap_flush_list,
+ struct ceph_cap_flush, g_list);
return cf->tid;
}
return 0;
}
/*
+ * Remove cap_flush from the mdsc's or inode's flushing cap list.
+ * Return true if caller needs to wake up flush waiters.
+ */
+static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
+ struct ceph_inode_info *ci,
+ struct ceph_cap_flush *cf)
+{
+ struct ceph_cap_flush *prev;
+ bool wake = cf->wake;
+ if (mdsc) {
+ /* are there older pending cap flushes? */
+ if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
+ prev = list_prev_entry(cf, g_list);
+ prev->wake = true;
+ wake = false;
+ }
+ list_del(&cf->g_list);
+ } else if (ci) {
+ if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
+ prev = list_prev_entry(cf, i_list);
+ prev->wake = true;
+ wake = false;
+ }
+ list_del(&cf->i_list);
+ } else {
+ BUG_ON(1);
+ }
+ return wake;
+}
+
+/*
* Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving.
*
* Called under i_ceph_lock.
*/
static int __mark_caps_flushing(struct inode *inode,
- struct ceph_mds_session *session,
+ struct ceph_mds_session *session, bool wake,
u64 *flush_tid, u64 *oldest_flush_tid)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -1509,26 +1534,22 @@ static int __mark_caps_flushing(struct inode *inode,
swap(cf, ci->i_prealloc_cap_flush);
cf->caps = flushing;
+ cf->wake = wake;
spin_lock(&mdsc->cap_dirty_lock);
list_del_init(&ci->i_dirty_item);
cf->tid = ++mdsc->last_cap_flush_tid;
- __add_cap_flushing_to_mdsc(mdsc, cf);
+ list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
*oldest_flush_tid = __get_oldest_flush_tid(mdsc);
if (list_empty(&ci->i_flushing_item)) {
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
mdsc->num_cap_flushing++;
- dout(" inode %p now flushing tid %llu\n", inode, cf->tid);
- } else {
- list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
- dout(" inode %p now flushing (more) tid %llu\n",
- inode, cf->tid);
}
spin_unlock(&mdsc->cap_dirty_lock);
- __add_cap_flushing_to_inode(ci, cf);
+ list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
*flush_tid = cf->tid;
return flushing;
@@ -1583,10 +1604,11 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */
struct rb_node *p;
- int tried_invalidate = 0;
- int delayed = 0, sent = 0, force_requeue = 0, num;
- int queue_invalidate = 0;
- int is_delayed = flags & CHECK_CAPS_NODELAY;
+ int delayed = 0, sent = 0, num;
+ bool is_delayed = flags & CHECK_CAPS_NODELAY;
+ bool queue_invalidate = false;
+ bool force_requeue = false;
+ bool tried_invalidate = false;
/* if we are unmounting, flush any unused caps immediately. */
if (mdsc->stopping)
@@ -1597,9 +1619,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH;
- /* flush snaps first time around only */
- if (!list_empty(&ci->i_cap_snaps))
- __ceph_flush_snaps(ci, &session, 0);
goto retry_locked;
retry:
spin_lock(&ci->i_ceph_lock);
@@ -1656,7 +1675,7 @@ retry_locked:
*/
if ((!is_delayed || mdsc->stopping) &&
!S_ISDIR(inode->i_mode) && /* ignore readdir cache */
- ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
+ !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
inode->i_data.nrpages && /* have cached pages */
(revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
@@ -1666,17 +1685,17 @@ retry_locked:
if (revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) {
dout("check_caps queuing invalidate\n");
- queue_invalidate = 1;
+ queue_invalidate = true;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
} else {
dout("check_caps failed to invalidate pages\n");
/* we failed to invalidate pages. check these
caps again later. */
- force_requeue = 1;
+ force_requeue = true;
__cap_set_timeouts(mdsc, ci);
}
}
- tried_invalidate = 1;
+ tried_invalidate = true;
goto retry_locked;
}
@@ -1698,8 +1717,8 @@ retry_locked:
revoking = cap->implemented & ~cap->issued;
dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
- cap->mds, cap, ceph_cap_string(cap->issued),
- ceph_cap_string(cap_used),
+ cap->mds, cap, ceph_cap_string(cap_used),
+ ceph_cap_string(cap->issued),
ceph_cap_string(cap->implemented),
ceph_cap_string(revoking));
@@ -1720,10 +1739,15 @@ retry_locked:
}
}
/* flush anything dirty? */
- if (cap == ci->i_auth_cap && (flags & CHECK_CAPS_FLUSH) &&
- ci->i_dirty_caps) {
- dout("flushing dirty caps\n");
- goto ack;
+ if (cap == ci->i_auth_cap) {
+ if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
+ dout("flushing dirty caps\n");
+ goto ack;
+ }
+ if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
+ dout("flushing snap caps\n");
+ goto ack;
+ }
}
/* completed revocation? going down and there are no caps? */
@@ -1782,6 +1806,26 @@ ack:
goto retry;
}
}
+
+ /* kick flushing and flush snaps before sending normal
+ * cap message */
+ if (cap == ci->i_auth_cap &&
+ (ci->i_ceph_flags &
+ (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
+ if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+ spin_lock(&mdsc->cap_dirty_lock);
+ oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+ spin_unlock(&mdsc->cap_dirty_lock);
+ __kick_flushing_caps(mdsc, session, ci,
+ oldest_flush_tid);
+ ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+ }
+ if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
+ __ceph_flush_snaps(ci, session);
+
+ goto retry_locked;
+ }
+
/* take snap_rwsem after session mutex */
if (!took_snap_rwsem) {
if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
@@ -1796,7 +1840,7 @@ ack:
}
if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
- flushing = __mark_caps_flushing(inode, session,
+ flushing = __mark_caps_flushing(inode, session, false,
&flush_tid,
&oldest_flush_tid);
} else {
@@ -1822,7 +1866,7 @@ ack:
* otherwise cancel.
*/
if (delayed && is_delayed)
- force_requeue = 1; /* __send_cap delayed release; requeue */
+ force_requeue = true; /* __send_cap delayed release; requeue */
if (!delayed && !is_delayed)
__cap_delay_cancel(mdsc, ci);
else if (!is_delayed || force_requeue)
@@ -1873,8 +1917,8 @@ retry:
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
goto out;
- flushing = __mark_caps_flushing(inode, session, &flush_tid,
- &oldest_flush_tid);
+ flushing = __mark_caps_flushing(inode, session, true,
+ &flush_tid, &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
@@ -1887,10 +1931,11 @@ retry:
spin_unlock(&ci->i_ceph_lock);
}
} else {
- struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
- if (n) {
+ if (!list_empty(&ci->i_cap_flush_list)) {
struct ceph_cap_flush *cf =
- rb_entry(n, struct ceph_cap_flush, i_node);
+ list_last_entry(&ci->i_cap_flush_list,
+ struct ceph_cap_flush, i_list);
+ cf->wake = true;
flush_tid = cf->tid;
}
flushing = ci->i_flushing_caps;
@@ -1910,14 +1955,13 @@ out:
static int caps_are_flushed(struct inode *inode, u64 flush_tid)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_cap_flush *cf;
- struct rb_node *n;
int ret = 1;
spin_lock(&ci->i_ceph_lock);
- n = rb_first(&ci->i_cap_flush_tree);
- if (n) {
- cf = rb_entry(n, struct ceph_cap_flush, i_node);
+ if (!list_empty(&ci->i_cap_flush_list)) {
+ struct ceph_cap_flush * cf =
+ list_first_entry(&ci->i_cap_flush_list,
+ struct ceph_cap_flush, i_list);
if (cf->tid <= flush_tid)
ret = 0;
}
@@ -1926,53 +1970,6 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid)
}
/*
- * Wait on any unsafe replies for the given inode. First wait on the
- * newest request, and make that the upper bound. Then, if there are
- * more requests, keep waiting on the oldest as long as it is still older
- * than the original request.
- */
-static void sync_write_wait(struct inode *inode)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct list_head *head = &ci->i_unsafe_writes;
- struct ceph_osd_request *req;
- u64 last_tid;
-
- if (!S_ISREG(inode->i_mode))
- return;
-
- spin_lock(&ci->i_unsafe_lock);
- if (list_empty(head))
- goto out;
-
- /* set upper bound as _last_ entry in chain */
- req = list_last_entry(head, struct ceph_osd_request,
- r_unsafe_item);
- last_tid = req->r_tid;
-
- do {
- ceph_osdc_get_request(req);
- spin_unlock(&ci->i_unsafe_lock);
- dout("sync_write_wait on tid %llu (until %llu)\n",
- req->r_tid, last_tid);
- wait_for_completion(&req->r_safe_completion);
- spin_lock(&ci->i_unsafe_lock);
- ceph_osdc_put_request(req);
-
- /*
- * from here on look at first entry in chain, since we
- * only want to wait for anything older than last_tid
- */
- if (list_empty(head))
- break;
- req = list_first_entry(head, struct ceph_osd_request,
- r_unsafe_item);
- } while (req->r_tid < last_tid);
-out:
- spin_unlock(&ci->i_unsafe_lock);
-}
-
-/*
* wait for any unsafe requests to complete.
*/
static int unsafe_request_wait(struct inode *inode)
@@ -2024,7 +2021,8 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
int dirty;
dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
- sync_write_wait(inode);
+
+ ceph_sync_write_wait(inode);
ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
if (ret < 0)
@@ -2087,87 +2085,74 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
return err;
}
-/*
- * After a recovering MDS goes active, we need to resend any caps
- * we were flushing.
- *
- * Caller holds session->s_mutex.
- */
-static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- struct ceph_cap_snap *capsnap;
-
- dout("kick_flushing_capsnaps mds%d\n", session->s_mds);
- list_for_each_entry(capsnap, &session->s_cap_snaps_flushing,
- flushing_item) {
- struct ceph_inode_info *ci = capsnap->ci;
- struct inode *inode = &ci->vfs_inode;
- struct ceph_cap *cap;
-
- spin_lock(&ci->i_ceph_lock);
- cap = ci->i_auth_cap;
- if (cap && cap->session == session) {
- dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
- cap, capsnap);
- __ceph_flush_snaps(ci, &session, 1);
- } else {
- pr_err("%p auth cap %p not mds%d ???\n", inode,
- cap, session->s_mds);
- }
- spin_unlock(&ci->i_ceph_lock);
- }
-}
-
-static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- struct ceph_inode_info *ci)
+static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ struct ceph_inode_info *ci,
+ u64 oldest_flush_tid)
+ __releases(ci->i_ceph_lock)
+ __acquires(ci->i_ceph_lock)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap;
struct ceph_cap_flush *cf;
- struct rb_node *n;
- int delayed = 0;
+ int ret;
u64 first_tid = 0;
- u64 oldest_flush_tid;
- spin_lock(&mdsc->cap_dirty_lock);
- oldest_flush_tid = __get_oldest_flush_tid(mdsc);
- spin_unlock(&mdsc->cap_dirty_lock);
+ list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
+ if (cf->tid < first_tid)
+ continue;
- while (true) {
- spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
if (!(cap && cap->session == session)) {
- pr_err("%p auth cap %p not mds%d ???\n", inode,
- cap, session->s_mds);
- spin_unlock(&ci->i_ceph_lock);
+ pr_err("%p auth cap %p not mds%d ???\n",
+ inode, cap, session->s_mds);
break;
}
- for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
- cf = rb_entry(n, struct ceph_cap_flush, i_node);
- if (cf->tid >= first_tid)
- break;
- }
- if (!n) {
+ first_tid = cf->tid + 1;
+
+ if (cf->caps) {
+ dout("kick_flushing_caps %p cap %p tid %llu %s\n",
+ inode, cap, cf->tid, ceph_cap_string(cf->caps));
+ ci->i_ceph_flags |= CEPH_I_NODELAY;
+ ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+ __ceph_caps_used(ci),
+ __ceph_caps_wanted(ci),
+ cap->issued | cap->implemented,
+ cf->caps, cf->tid, oldest_flush_tid);
+ if (ret) {
+ pr_err("kick_flushing_caps: error sending "
+ "cap flush, ino (%llx.%llx) "
+ "tid %llu flushing %s\n",
+ ceph_vinop(inode), cf->tid,
+ ceph_cap_string(cf->caps));
+ }
+ } else {
+ struct ceph_cap_snap *capsnap =
+ container_of(cf, struct ceph_cap_snap,
+ cap_flush);
+ dout("kick_flushing_caps %p capsnap %p tid %llu %s\n",
+ inode, capsnap, cf->tid,
+ ceph_cap_string(capsnap->dirty));
+
+ atomic_inc(&capsnap->nref);
spin_unlock(&ci->i_ceph_lock);
- break;
- }
- cf = rb_entry(n, struct ceph_cap_flush, i_node);
+ ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
+ oldest_flush_tid);
+ if (ret < 0) {
+ pr_err("kick_flushing_caps: error sending "
+ "cap flushsnap, ino (%llx.%llx) "
+ "tid %llu follows %llu\n",
+ ceph_vinop(inode), cf->tid,
+ capsnap->follows);
+ }
- first_tid = cf->tid + 1;
+ ceph_put_cap_snap(capsnap);
+ }
- dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
- cap, cf->tid, ceph_cap_string(cf->caps));
- delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
- __ceph_caps_used(ci),
- __ceph_caps_wanted(ci),
- cap->issued | cap->implemented,
- cf->caps, cf->tid, oldest_flush_tid);
+ spin_lock(&ci->i_ceph_lock);
}
- return delayed;
}
void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
@@ -2175,8 +2160,14 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
{
struct ceph_inode_info *ci;
struct ceph_cap *cap;
+ u64 oldest_flush_tid;
dout("early_kick_flushing_caps mds%d\n", session->s_mds);
+
+ spin_lock(&mdsc->cap_dirty_lock);
+ oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+ spin_unlock(&mdsc->cap_dirty_lock);
+
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
@@ -2196,10 +2187,11 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
*/
if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) {
- spin_unlock(&ci->i_ceph_lock);
- if (!__kick_flushing_caps(mdsc, session, ci))
- continue;
- spin_lock(&ci->i_ceph_lock);
+ ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+ __kick_flushing_caps(mdsc, session, ci,
+ oldest_flush_tid);
+ } else {
+ ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
}
spin_unlock(&ci->i_ceph_lock);
@@ -2210,50 +2202,56 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_inode_info *ci;
-
- kick_flushing_capsnaps(mdsc, session);
+ struct ceph_cap *cap;
+ u64 oldest_flush_tid;
dout("kick_flushing_caps mds%d\n", session->s_mds);
+
+ spin_lock(&mdsc->cap_dirty_lock);
+ oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+ spin_unlock(&mdsc->cap_dirty_lock);
+
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
- int delayed = __kick_flushing_caps(mdsc, session, ci);
- if (delayed) {
- spin_lock(&ci->i_ceph_lock);
- __cap_delay_requeue(mdsc, ci);
+ spin_lock(&ci->i_ceph_lock);
+ cap = ci->i_auth_cap;
+ if (!(cap && cap->session == session)) {
+ pr_err("%p auth cap %p not mds%d ???\n",
+ &ci->vfs_inode, cap, session->s_mds);
spin_unlock(&ci->i_ceph_lock);
+ continue;
+ }
+ if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+ ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+ __kick_flushing_caps(mdsc, session, ci,
+ oldest_flush_tid);
}
+ spin_unlock(&ci->i_ceph_lock);
}
}
static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct inode *inode)
+ __releases(ci->i_ceph_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap;
- spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
dout("kick_flushing_inode_caps %p flushing %s\n", inode,
ceph_cap_string(ci->i_flushing_caps));
- __ceph_flush_snaps(ci, &session, 1);
-
- if (ci->i_flushing_caps) {
- int delayed;
-
+ if (!list_empty(&ci->i_cap_flush_list)) {
+ u64 oldest_flush_tid;
spin_lock(&mdsc->cap_dirty_lock);
list_move_tail(&ci->i_flushing_item,
&cap->session->s_cap_flushing);
+ oldest_flush_tid = __get_oldest_flush_tid(mdsc);
spin_unlock(&mdsc->cap_dirty_lock);
+ ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+ __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
spin_unlock(&ci->i_ceph_lock);
-
- delayed = __kick_flushing_caps(mdsc, session, ci);
- if (delayed) {
- spin_lock(&ci->i_ceph_lock);
- __cap_delay_requeue(mdsc, ci);
- spin_unlock(&ci->i_ceph_lock);
- }
} else {
spin_unlock(&ci->i_ceph_lock);
}
@@ -2317,7 +2315,7 @@ again:
/* make sure file is actually open */
file_wanted = __ceph_caps_file_wanted(ci);
- if ((file_wanted & need) == 0) {
+ if ((file_wanted & need) != need) {
dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
ceph_cap_string(need), ceph_cap_string(file_wanted));
*err = -EBADF;
@@ -2393,6 +2391,9 @@ again:
snap_rwsem_locked = true;
}
*got = need | (have & want);
+ if ((need & CEPH_CAP_FILE_RD) &&
+ !(*got & CEPH_CAP_FILE_CACHE))
+ ceph_disable_fscache_readpage(ci);
__take_cap_refs(ci, *got, true);
ret = 1;
}
@@ -2412,12 +2413,26 @@ again:
goto out_unlock;
}
- if (!__ceph_is_any_caps(ci) &&
- ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
- dout("get_cap_refs %p forced umount\n", inode);
- *err = -EIO;
- ret = 1;
- goto out_unlock;
+ if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
+ int mds_wanted;
+ if (ACCESS_ONCE(mdsc->fsc->mount_state) ==
+ CEPH_MOUNT_SHUTDOWN) {
+ dout("get_cap_refs %p forced umount\n", inode);
+ *err = -EIO;
+ ret = 1;
+ goto out_unlock;
+ }
+ mds_wanted = __ceph_caps_mds_wanted(ci);
+ if ((mds_wanted & need) != need) {
+ dout("get_cap_refs %p caps were dropped"
+ " (session killed?)\n", inode);
+ *err = -ESTALE;
+ ret = 1;
+ goto out_unlock;
+ }
+ if ((mds_wanted & file_wanted) ==
+ (file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
+ ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
}
dout("get_cap_refs %p have %s needed %s\n", inode,
@@ -2487,7 +2502,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (err == -EAGAIN)
continue;
if (err < 0)
- return err;
+ ret = err;
} else {
ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff,
@@ -2496,8 +2511,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
continue;
if (err < 0)
ret = err;
- if (ret < 0)
- return ret;
+ }
+ if (ret < 0) {
+ if (err == -ESTALE) {
+ /* session was killed, try renew caps */
+ ret = ceph_renew_caps(&ci->vfs_inode);
+ if (ret == 0)
+ continue;
+ }
+ return ret;
}
if (ci->i_inline_version != CEPH_INLINE_NONE &&
@@ -2510,7 +2532,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
*pinned_page = page;
break;
}
- page_cache_release(page);
+ put_page(page);
}
/*
* drop cap refs first because getattr while
@@ -2533,6 +2555,9 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
break;
}
+ if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
+ ceph_fscache_revalidate_cookie(ci);
+
*got = _got;
return 0;
}
@@ -2553,16 +2578,19 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
* drop cap_snap that is not associated with any snapshot.
* we don't need to send FLUSHSNAP message for it.
*/
-static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap)
+static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
+ struct ceph_cap_snap *capsnap)
{
if (!capsnap->need_flush &&
!capsnap->writing && !capsnap->dirty_pages) {
-
dout("dropping cap_snap %p follows %llu\n",
capsnap, capsnap->follows);
+ BUG_ON(capsnap->cap_flush.tid > 0);
ceph_put_snap_context(capsnap->context);
+ if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps))
+ ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
+
list_del(&capsnap->ci_item);
- list_del(&capsnap->flushing_item);
ceph_put_cap_snap(capsnap);
return 1;
}
@@ -2609,7 +2637,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
struct ceph_cap_snap,
ci_item);
capsnap->writing = 0;
- if (ceph_try_drop_cap_snap(capsnap))
+ if (ceph_try_drop_cap_snap(ci, capsnap))
put++;
else if (__ceph_finish_cap_snap(ci, capsnap))
flushsnaps = 1;
@@ -2634,7 +2662,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
if (last && !flushsnaps)
ceph_check_caps(ci, 0, NULL);
else if (flushsnaps)
- ceph_flush_snaps(ci);
+ ceph_flush_snaps(ci, NULL);
if (wake)
wake_up_all(&ci->i_cap_wq);
while (put-- > 0)
@@ -2652,15 +2680,19 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc)
{
struct inode *inode = &ci->vfs_inode;
- int last = 0;
- int complete_capsnap = 0;
- int drop_capsnap = 0;
- int found = 0;
struct ceph_cap_snap *capsnap = NULL;
+ int put = 0;
+ bool last = false;
+ bool found = false;
+ bool flush_snaps = false;
+ bool complete_capsnap = false;
spin_lock(&ci->i_ceph_lock);
ci->i_wrbuffer_ref -= nr;
- last = !ci->i_wrbuffer_ref;
+ if (ci->i_wrbuffer_ref == 0) {
+ last = true;
+ put++;
+ }
if (ci->i_head_snapc == snapc) {
ci->i_wrbuffer_ref_head -= nr;
@@ -2680,15 +2712,22 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
} else {
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->context == snapc) {
- found = 1;
+ found = true;
break;
}
}
BUG_ON(!found);
capsnap->dirty_pages -= nr;
if (capsnap->dirty_pages == 0) {
- complete_capsnap = 1;
- drop_capsnap = ceph_try_drop_cap_snap(capsnap);
+ complete_capsnap = true;
+ if (!capsnap->writing) {
+ if (ceph_try_drop_cap_snap(ci, capsnap)) {
+ put++;
+ } else {
+ ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
+ flush_snaps = true;
+ }
+ }
}
dout("put_wrbuffer_cap_refs on %p cap_snap %p "
" snap %lld %d/%d -> %d/%d %s%s\n",
@@ -2703,12 +2742,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (last) {
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
- iput(inode);
- } else if (complete_capsnap) {
- ceph_flush_snaps(ci);
- wake_up_all(&ci->i_cap_wq);
+ } else if (flush_snaps) {
+ ceph_flush_snaps(ci, NULL);
}
- if (drop_capsnap)
+ if (complete_capsnap)
+ wake_up_all(&ci->i_cap_wq);
+ while (put-- > 0)
iput(inode);
}
@@ -2752,12 +2791,11 @@ static void invalidate_aliases(struct inode *inode)
*/
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
- u64 inline_version,
- void *inline_data, int inline_len,
+ struct ceph_string **pns, u64 inline_version,
+ void *inline_data, u32 inline_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
- struct ceph_cap *cap, int issued,
- u32 pool_ns_len)
+ struct ceph_cap *cap, int issued)
__releases(ci->i_ceph_lock)
__releases(mdsc->snap_rwsem)
{
@@ -2774,7 +2812,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
bool writeback = false;
bool queue_trunc = false;
bool queue_invalidate = false;
- bool queue_revalidate = false;
bool deleted_inode = false;
bool fill_inline = false;
@@ -2807,7 +2844,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
- !ci->i_wrbuffer_ref) {
+ !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
if (try_nonblocking_invalidate(inode)) {
/* there were locked pages.. invalidate later
in a separate thread. */
@@ -2816,8 +2853,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
ci->i_rdcache_revoking = ci->i_rdcache_gen;
}
}
-
- ceph_fscache_invalidate(inode);
}
/* side effects now are allowed */
@@ -2859,11 +2894,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
}
- /* Do we need to revalidate our fscache cookie. Don't bother on the
- * first cache cap as we already validate at cookie creation time. */
- if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
- queue_revalidate = true;
-
if (newcaps & CEPH_CAP_ANY_RD) {
/* ctime/mtime/atime? */
ceph_decode_timespec(&mtime, &grant->mtime);
@@ -2876,8 +2906,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
/* file layout may have changed */
- ci->i_layout = grant->layout;
- ci->i_pool_ns_len = pool_ns_len;
+ s64 old_pool = ci->i_layout.pool_id;
+ struct ceph_string *old_ns;
+
+ ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
+ old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+ lockdep_is_held(&ci->i_ceph_lock));
+ rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
+
+ if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
+ ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
+
+ *pns = old_ns;
/* size/truncate_seq? */
queue_trunc = ceph_fill_file_size(inode, issued,
@@ -2960,23 +3000,20 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
fill_inline = true;
}
- spin_unlock(&ci->i_ceph_lock);
-
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
- kick_flushing_inode_caps(mdsc, session, inode);
- up_read(&mdsc->snap_rwsem);
if (newcaps & ~issued)
wake = true;
+ kick_flushing_inode_caps(mdsc, session, inode);
+ up_read(&mdsc->snap_rwsem);
+ } else {
+ spin_unlock(&ci->i_ceph_lock);
}
if (fill_inline)
ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
- if (queue_trunc) {
+ if (queue_trunc)
ceph_queue_vmtruncate(inode);
- ceph_queue_revalidate(inode);
- } else if (queue_revalidate)
- ceph_queue_revalidate(inode);
if (writeback)
/*
@@ -3013,23 +3050,24 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
- struct ceph_cap_flush *cf;
- struct rb_node *n;
+ struct ceph_cap_flush *cf, *tmp_cf;
LIST_HEAD(to_remove);
unsigned seq = le32_to_cpu(m->seq);
int dirty = le32_to_cpu(m->dirty);
int cleaned = 0;
- int drop = 0;
+ bool drop = false;
+ bool wake_ci = 0;
+ bool wake_mdsc = 0;
- n = rb_first(&ci->i_cap_flush_tree);
- while (n) {
- cf = rb_entry(n, struct ceph_cap_flush, i_node);
- n = rb_next(&cf->i_node);
+ list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
if (cf->tid == flush_tid)
cleaned = cf->caps;
+ if (cf->caps == 0) /* capsnap */
+ continue;
if (cf->tid <= flush_tid) {
- rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
- list_add_tail(&cf->list, &to_remove);
+ if (__finish_cap_flush(NULL, ci, cf))
+ wake_ci = true;
+ list_add_tail(&cf->i_list, &to_remove);
} else {
cleaned &= ~cf->caps;
if (!cleaned)
@@ -3050,31 +3088,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
spin_lock(&mdsc->cap_dirty_lock);
- if (!list_empty(&to_remove)) {
- list_for_each_entry(cf, &to_remove, list)
- rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
-
- n = rb_first(&mdsc->cap_flush_tree);
- cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
- if (!cf || cf->tid > flush_tid)
- wake_up_all(&mdsc->cap_flushing_wq);
+ list_for_each_entry(cf, &to_remove, i_list) {
+ if (__finish_cap_flush(mdsc, NULL, cf))
+ wake_mdsc = true;
}
if (ci->i_flushing_caps == 0) {
- list_del_init(&ci->i_flushing_item);
- if (!list_empty(&session->s_cap_flushing))
- dout(" mds%d still flushing cap on %p\n",
- session->s_mds,
- &list_entry(session->s_cap_flushing.next,
- struct ceph_inode_info,
- i_flushing_item)->vfs_inode);
+ if (list_empty(&ci->i_cap_flush_list)) {
+ list_del_init(&ci->i_flushing_item);
+ if (!list_empty(&session->s_cap_flushing)) {
+ dout(" mds%d still flushing cap on %p\n",
+ session->s_mds,
+ &list_first_entry(&session->s_cap_flushing,
+ struct ceph_inode_info,
+ i_flushing_item)->vfs_inode);
+ }
+ }
mdsc->num_cap_flushing--;
dout(" inode %p now !flushing\n", inode);
if (ci->i_dirty_caps == 0) {
dout(" inode %p now clean\n", inode);
BUG_ON(!list_empty(&ci->i_dirty_item));
- drop = 1;
+ drop = true;
if (ci->i_wr_ref == 0 &&
ci->i_wrbuffer_ref_head == 0) {
BUG_ON(!ci->i_head_snapc);
@@ -3086,17 +3122,21 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
}
}
spin_unlock(&mdsc->cap_dirty_lock);
- wake_up_all(&ci->i_cap_wq);
out:
spin_unlock(&ci->i_ceph_lock);
while (!list_empty(&to_remove)) {
cf = list_first_entry(&to_remove,
- struct ceph_cap_flush, list);
- list_del(&cf->list);
+ struct ceph_cap_flush, i_list);
+ list_del(&cf->i_list);
ceph_free_cap_flush(cf);
}
+
+ if (wake_ci)
+ wake_up_all(&ci->i_cap_wq);
+ if (wake_mdsc)
+ wake_up_all(&mdsc->cap_flushing_wq);
if (drop)
iput(inode);
}
@@ -3115,7 +3155,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
u64 follows = le64_to_cpu(m->snap_follows);
struct ceph_cap_snap *capsnap;
- int drop = 0;
+ bool flushed = false;
+ bool wake_ci = false;
+ bool wake_mdsc = false;
dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
inode, ci, session->s_mds, follows);
@@ -3123,30 +3165,47 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
spin_lock(&ci->i_ceph_lock);
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->follows == follows) {
- if (capsnap->flush_tid != flush_tid) {
+ if (capsnap->cap_flush.tid != flush_tid) {
dout(" cap_snap %p follows %lld tid %lld !="
" %lld\n", capsnap, follows,
- flush_tid, capsnap->flush_tid);
+ flush_tid, capsnap->cap_flush.tid);
break;
}
- WARN_ON(capsnap->dirty_pages || capsnap->writing);
- dout(" removing %p cap_snap %p follows %lld\n",
- inode, capsnap, follows);
- ceph_put_snap_context(capsnap->context);
- list_del(&capsnap->ci_item);
- list_del(&capsnap->flushing_item);
- ceph_put_cap_snap(capsnap);
- wake_up_all(&mdsc->cap_flushing_wq);
- drop = 1;
+ flushed = true;
break;
} else {
dout(" skipping cap_snap %p follows %lld\n",
capsnap, capsnap->follows);
}
}
+ if (flushed) {
+ WARN_ON(capsnap->dirty_pages || capsnap->writing);
+ dout(" removing %p cap_snap %p follows %lld\n",
+ inode, capsnap, follows);
+ list_del(&capsnap->ci_item);
+ if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
+ wake_ci = true;
+
+ spin_lock(&mdsc->cap_dirty_lock);
+
+ if (list_empty(&ci->i_cap_flush_list))
+ list_del_init(&ci->i_flushing_item);
+
+ if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
+ wake_mdsc = true;
+
+ spin_unlock(&mdsc->cap_dirty_lock);
+ }
spin_unlock(&ci->i_ceph_lock);
- if (drop)
+ if (flushed) {
+ ceph_put_snap_context(capsnap->context);
+ ceph_put_cap_snap(capsnap);
+ if (wake_ci)
+ wake_up_all(&ci->i_cap_wq);
+ if (wake_mdsc)
+ wake_up_all(&mdsc->cap_flushing_wq);
iput(inode);
+ }
}
/*
@@ -3178,10 +3237,8 @@ static void handle_cap_trunc(struct inode *inode,
truncate_seq, truncate_size, size);
spin_unlock(&ci->i_ceph_lock);
- if (queue_trunc) {
+ if (queue_trunc)
ceph_queue_vmtruncate(inode);
- ceph_fscache_invalidate(inode);
- }
}
/*
@@ -3226,6 +3283,8 @@ retry:
if (target < 0) {
__ceph_remove_cap(cap, false);
+ if (!ci->i_auth_cap)
+ ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
goto out_unlock;
}
@@ -3251,7 +3310,8 @@ retry:
tcap->implemented |= issued;
if (cap == ci->i_auth_cap)
ci->i_auth_cap = tcap;
- if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
+ if (!list_empty(&ci->i_cap_flush_list) &&
+ ci->i_auth_cap == tcap) {
spin_lock(&mdsc->cap_dirty_lock);
list_move_tail(&ci->i_flushing_item,
&tcap->session->s_cap_flushing);
@@ -3404,20 +3464,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_cap *cap;
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
- struct ceph_snap_realm *realm;
+ struct ceph_snap_realm *realm = NULL;
+ struct ceph_string *pool_ns = NULL;
int mds = session->s_mds;
int op, issued;
u32 seq, mseq;
struct ceph_vino vino;
- u64 cap_id;
- u64 size, max_size;
u64 tid;
u64 inline_version = 0;
void *inline_data = NULL;
u32 inline_len = 0;
void *snaptrace;
size_t snaptrace_len;
- u32 pool_ns_len = 0;
void *p, *end;
dout("handle_caps from mds%d\n", mds);
@@ -3431,11 +3489,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
- cap_id = le64_to_cpu(h->cap_id);
seq = le32_to_cpu(h->seq);
mseq = le32_to_cpu(h->migrate_seq);
- size = le64_to_cpu(h->size);
- max_size = le64_to_cpu(h->max_size);
snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len);
@@ -3474,6 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 flush_tid;
u32 caller_uid, caller_gid;
u32 osd_epoch_barrier;
+ u32 pool_ns_len;
/* version >= 5 */
ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
/* version >= 6 */
@@ -3483,6 +3539,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_decode_32_safe(&p, end, caller_gid, bad);
/* version >= 8 */
ceph_decode_32_safe(&p, end, pool_ns_len, bad);
+ if (pool_ns_len > 0) {
+ ceph_decode_need(&p, end, pool_ns_len, bad);
+ pool_ns = ceph_find_or_create_string(p, pool_ns_len);
+ p += pool_ns_len;
+ }
}
/* lookup ino */
@@ -3503,7 +3564,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap = ceph_get_cap(mdsc, NULL);
cap->cap_ino = vino.ino;
cap->queue_release = 1;
- cap->cap_id = cap_id;
+ cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq;
cap->seq = seq;
spin_lock(&session->s_cap_lock);
@@ -3538,10 +3599,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
- handle_cap_grant(mdsc, inode, h,
+ handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
- msg->middle, session, cap, issued,
- pool_ns_len);
+ msg->middle, session, cap, issued);
if (realm)
ceph_put_snap_realm(mdsc, realm);
goto done_unlocked;
@@ -3563,10 +3623,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
- handle_cap_grant(mdsc, inode, h,
+ handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
- msg->middle, session, cap, issued,
- pool_ns_len);
+ msg->middle, session, cap, issued);
goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
@@ -3597,6 +3656,7 @@ done:
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
+ ceph_put_string(pool_ns);
return;
bad:
@@ -3657,6 +3717,16 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
dout("flush_dirty_caps done\n");
}
+void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
+{
+ int i;
+ int bits = (fmode << 1) | 1;
+ for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+ if (bits & (1 << i))
+ ci->i_nr_by_mode[i]++;
+ }
+}
+
/*
* Drop open file reference. If we were the last open file,
* we may need to release capabilities to the MDS (or schedule
@@ -3664,15 +3734,20 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
*/
void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
{
- struct inode *inode = &ci->vfs_inode;
- int last = 0;
-
+ int i, last = 0;
+ int bits = (fmode << 1) | 1;
spin_lock(&ci->i_ceph_lock);
- dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode,
- ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1);
- BUG_ON(ci->i_nr_by_mode[fmode] == 0);
- if (--ci->i_nr_by_mode[fmode] == 0)
- last++;
+ for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+ if (bits & (1 << i)) {
+ BUG_ON(ci->i_nr_by_mode[i] == 0);
+ if (--ci->i_nr_by_mode[i] == 0)
+ last++;
+ }
+ }
+ dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
+ &ci->vfs_inode, fmode,
+ ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
+ ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
spin_unlock(&ci->i_ceph_lock);
if (last && ci->i_vino.snap == CEPH_NOSNAP)
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 31f831471ed2..39ff678e567f 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -109,7 +109,7 @@ static int mdsc_show(struct seq_file *s, void *p)
path ? path : "");
spin_unlock(&req->r_old_dentry->d_lock);
kfree(path);
- } else if (req->r_path2) {
+ } else if (req->r_path2 && req->r_op != CEPH_MDS_OP_SYMLINK) {
if (req->r_ino2.ino)
seq_printf(s, " #%llx/%s", req->r_ino2.ino,
req->r_path2);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fadc243dfb28..c64a0b794d49 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -5,6 +5,7 @@
#include <linux/namei.h>
#include <linux/slab.h>
#include <linux/sched.h>
+#include <linux/xattr.h>
#include "super.h"
#include "mds_client.h"
@@ -58,7 +59,7 @@ int ceph_init_dentry(struct dentry *dentry)
di->dentry = dentry;
di->lease_session = NULL;
- dentry->d_time = jiffies;
+ di->time = jiffies;
/* avoid reordering d_fsdata setup so that the check above is safe */
smp_mb();
dentry->d_fsdata = di;
@@ -69,16 +70,42 @@ out_unlock:
}
/*
- * for readdir, we encode the directory frag and offset within that
- * frag into f_pos.
+ * for f_pos for readdir:
+ * - hash order:
+ * (0xff << 52) | ((24 bits hash) << 28) |
+ * (the nth entry has hash collision);
+ * - frag+name order;
+ * ((frag value) << 28) | (the nth entry in frag);
*/
+#define OFFSET_BITS 28
+#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
+#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
+loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
+{
+ loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
+ if (hash_order)
+ fpos |= HASH_ORDER;
+ return fpos;
+}
+
+static bool is_hash_order(loff_t p)
+{
+ return (p & HASH_ORDER) == HASH_ORDER;
+}
+
static unsigned fpos_frag(loff_t p)
{
- return p >> 32;
+ return p >> OFFSET_BITS;
}
+
+static unsigned fpos_hash(loff_t p)
+{
+ return ceph_frag_value(fpos_frag(p));
+}
+
static unsigned fpos_off(loff_t p)
{
- return p & 0xffffffff;
+ return p & OFFSET_MASK;
}
static int fpos_cmp(loff_t l, loff_t r)
@@ -110,6 +137,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
return 0;
}
+
+static struct dentry *
+__dcache_find_get_entry(struct dentry *parent, u64 idx,
+ struct ceph_readdir_cache_control *cache_ctl)
+{
+ struct inode *dir = d_inode(parent);
+ struct dentry *dentry;
+ unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
+ loff_t ptr_pos = idx * sizeof(struct dentry *);
+ pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
+
+ if (ptr_pos >= i_size_read(dir))
+ return NULL;
+
+ if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
+ ceph_readdir_cache_release(cache_ctl);
+ cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
+ if (!cache_ctl->page) {
+ dout(" page %lu not found\n", ptr_pgoff);
+ return ERR_PTR(-EAGAIN);
+ }
+ /* reading/filling the cache are serialized by
+ i_mutex, no need to use page lock */
+ unlock_page(cache_ctl->page);
+ cache_ctl->dentries = kmap(cache_ctl->page);
+ }
+
+ cache_ctl->index = idx & idx_mask;
+
+ rcu_read_lock();
+ spin_lock(&parent->d_lock);
+ /* check i_size again here, because empty directory can be
+ * marked as complete while not holding the i_mutex. */
+ if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
+ dentry = cache_ctl->dentries[cache_ctl->index];
+ else
+ dentry = NULL;
+ spin_unlock(&parent->d_lock);
+ if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
+ dentry = NULL;
+ rcu_read_unlock();
+ return dentry ? : ERR_PTR(-EAGAIN);
+}
+
/*
* When possible, we try to satisfy a readdir by peeking at the
* dcache. We make this work by carefully ordering dentries on
@@ -129,75 +200,68 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
struct inode *dir = d_inode(parent);
struct dentry *dentry, *last = NULL;
struct ceph_dentry_info *di;
- unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *);
- int err = 0;
- loff_t ptr_pos = 0;
struct ceph_readdir_cache_control cache_ctl = {};
+ u64 idx = 0;
+ int err = 0;
- dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
+ dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
+
+ /* search start position */
+ if (ctx->pos > 2) {
+ u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
+ while (count > 0) {
+ u64 step = count >> 1;
+ dentry = __dcache_find_get_entry(parent, idx + step,
+ &cache_ctl);
+ if (!dentry) {
+ /* use linar search */
+ idx = 0;
+ break;
+ }
+ if (IS_ERR(dentry)) {
+ err = PTR_ERR(dentry);
+ goto out;
+ }
+ di = ceph_dentry(dentry);
+ spin_lock(&dentry->d_lock);
+ if (fpos_cmp(di->offset, ctx->pos) < 0) {
+ idx += step + 1;
+ count -= step + 1;
+ } else {
+ count = step;
+ }
+ spin_unlock(&dentry->d_lock);
+ dput(dentry);
+ }
- /* we can calculate cache index for the first dirfrag */
- if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
- cache_ctl.index = fpos_off(ctx->pos) - 2;
- BUG_ON(cache_ctl.index < 0);
- ptr_pos = cache_ctl.index * sizeof(struct dentry *);
+ dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
}
- while (true) {
- pgoff_t pgoff;
- bool emit_dentry;
- if (ptr_pos >= i_size_read(dir)) {
+ for (;;) {
+ bool emit_dentry = false;
+ dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
+ if (!dentry) {
fi->flags |= CEPH_F_ATEND;
err = 0;
break;
}
-
- err = -EAGAIN;
- pgoff = ptr_pos >> PAGE_CACHE_SHIFT;
- if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
- ceph_readdir_cache_release(&cache_ctl);
- cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
- if (!cache_ctl.page) {
- dout(" page %lu not found\n", pgoff);
- break;
- }
- /* reading/filling the cache are serialized by
- * i_mutex, no need to use page lock */
- unlock_page(cache_ctl.page);
- cache_ctl.dentries = kmap(cache_ctl.page);
+ if (IS_ERR(dentry)) {
+ err = PTR_ERR(dentry);
+ goto out;
}
- rcu_read_lock();
- spin_lock(&parent->d_lock);
- /* check i_size again here, because empty directory can be
- * marked as complete while not holding the i_mutex. */
- if (ceph_dir_is_complete_ordered(dir) &&
- ptr_pos < i_size_read(dir))
- dentry = cache_ctl.dentries[cache_ctl.index % nsize];
- else
- dentry = NULL;
- spin_unlock(&parent->d_lock);
- if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
- dentry = NULL;
- rcu_read_unlock();
- if (!dentry)
- break;
-
- emit_dentry = false;
di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock);
if (di->lease_shared_gen == shared_gen &&
d_really_is_positive(dentry) &&
- ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
- ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
fpos_cmp(ctx->pos, di->offset) <= 0) {
emit_dentry = true;
}
spin_unlock(&dentry->d_lock);
if (emit_dentry) {
- dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+ dout(" %llx dentry %p %pd %p\n", di->offset,
dentry, dentry, d_inode(dentry));
ctx->pos = di->offset;
if (!dir_emit(ctx, dentry->d_name.name,
@@ -217,10 +281,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
} else {
dput(dentry);
}
-
- cache_ctl.index++;
- ptr_pos += sizeof(struct dentry *);
}
+out:
ceph_readdir_cache_release(&cache_ctl);
if (last) {
int ret;
@@ -234,6 +296,16 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
return err;
}
+static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+{
+ if (!fi->last_readdir)
+ return true;
+ if (is_hash_order(pos))
+ return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+ else
+ return fi->frag != fpos_frag(pos);
+}
+
static int ceph_readdir(struct file *file, struct dir_context *ctx)
{
struct ceph_file_info *fi = file->private_data;
@@ -241,13 +313,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
- unsigned frag = fpos_frag(ctx->pos);
- int off = fpos_off(ctx->pos);
+ int i;
int err;
u32 ftype;
struct ceph_mds_reply_info_parsed *rinfo;
- dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
+ dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
if (fi->flags & CEPH_F_ATEND)
return 0;
@@ -259,7 +330,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
inode->i_mode >> 12))
return 0;
ctx->pos = 1;
- off = 1;
}
if (ctx->pos == 1) {
ino_t ino = parent_ino(file->f_path.dentry);
@@ -269,7 +339,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
inode->i_mode >> 12))
return 0;
ctx->pos = 2;
- off = 2;
}
/* can we use the dcache? */
@@ -284,8 +353,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN)
return err;
- frag = fpos_frag(ctx->pos);
- off = fpos_off(ctx->pos);
} else {
spin_unlock(&ci->i_ceph_lock);
}
@@ -293,8 +360,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* proceed with a normal readdir */
more:
/* do we have the correct frag content buffered? */
- if (fi->frag != frag || fi->last_readdir == NULL) {
+ if (need_send_readdir(fi, ctx->pos)) {
struct ceph_mds_request *req;
+ unsigned frag;
int op = ceph_snap(inode) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
@@ -304,6 +372,13 @@ more:
fi->last_readdir = NULL;
}
+ if (is_hash_order(ctx->pos)) {
+ frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+ NULL, NULL);
+ } else {
+ frag = fpos_frag(ctx->pos);
+ }
+
dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
ceph_vinop(inode), frag, fi->last_name);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -330,6 +405,8 @@ more:
req->r_readdir_cache_idx = fi->readdir_cache_idx;
req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag);
+ req->r_args.readdir.flags =
+ cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
req->r_inode = inode;
ihold(inode);
@@ -339,22 +416,26 @@ more:
ceph_mdsc_put_request(req);
return err;
}
- dout("readdir got and parsed readdir result=%d"
- " on frag %x, end=%d, complete=%d\n", err, frag,
+ dout("readdir got and parsed readdir result=%d on "
+ "frag %x, end=%d, complete=%d, hash_order=%d\n",
+ err, frag,
(int)req->r_reply_info.dir_end,
- (int)req->r_reply_info.dir_complete);
-
+ (int)req->r_reply_info.dir_complete,
+ (int)req->r_reply_info.hash_order);
- /* note next offset and last dentry name */
rinfo = &req->r_reply_info;
if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
frag = le32_to_cpu(rinfo->dir_dir->frag);
- off = req->r_readdir_offset;
- fi->next_offset = off;
+ if (!rinfo->hash_order) {
+ fi->next_offset = req->r_readdir_offset;
+ /* adjust ctx->pos to beginning of frag */
+ ctx->pos = ceph_make_fpos(frag,
+ fi->next_offset,
+ false);
+ }
}
fi->frag = frag;
- fi->offset = fi->next_offset;
fi->last_readdir = req;
if (req->r_did_prepopulate) {
@@ -362,7 +443,8 @@ more:
if (fi->readdir_cache_idx < 0) {
/* preclude from marking dir ordered */
fi->dir_ordered_count = 0;
- } else if (ceph_frag_is_leftmost(frag) && off == 2) {
+ } else if (ceph_frag_is_leftmost(frag) &&
+ fi->next_offset == 2) {
/* note dir version at start of readdir so
* we can tell if any dentries get dropped */
fi->dir_release_count = req->r_dir_release_cnt;
@@ -376,65 +458,87 @@ more:
fi->dir_release_count = 0;
}
- if (req->r_reply_info.dir_end) {
- kfree(fi->last_name);
- fi->last_name = NULL;
- if (ceph_frag_is_rightmost(frag))
- fi->next_offset = 2;
- else
- fi->next_offset = 0;
- } else {
- err = note_last_dentry(fi,
- rinfo->dir_dname[rinfo->dir_nr-1],
- rinfo->dir_dname_len[rinfo->dir_nr-1],
- fi->next_offset + rinfo->dir_nr);
+ /* note next offset and last dentry name */
+ if (rinfo->dir_nr > 0) {
+ struct ceph_mds_reply_dir_entry *rde =
+ rinfo->dir_entries + (rinfo->dir_nr-1);
+ unsigned next_offset = req->r_reply_info.dir_end ?
+ 2 : (fpos_off(rde->offset) + 1);
+ err = note_last_dentry(fi, rde->name, rde->name_len,
+ next_offset);
if (err)
return err;
+ } else if (req->r_reply_info.dir_end) {
+ fi->next_offset = 2;
+ /* keep last name */
}
}
rinfo = &fi->last_readdir->r_reply_info;
- dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
- rinfo->dir_nr, off, fi->offset);
-
- ctx->pos = ceph_make_fpos(frag, off);
- while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
- struct ceph_mds_reply_inode *in =
- rinfo->dir_in[off - fi->offset].in;
+ dout("readdir frag %x num %d pos %llx chunk first %llx\n",
+ fi->frag, rinfo->dir_nr, ctx->pos,
+ rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
+
+ i = 0;
+ /* search start position */
+ if (rinfo->dir_nr > 0) {
+ int step, nr = rinfo->dir_nr;
+ while (nr > 0) {
+ step = nr >> 1;
+ if (rinfo->dir_entries[i + step].offset < ctx->pos) {
+ i += step + 1;
+ nr -= step + 1;
+ } else {
+ nr = step;
+ }
+ }
+ }
+ for (; i < rinfo->dir_nr; i++) {
+ struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
struct ceph_vino vino;
ino_t ino;
- dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
- off, off - fi->offset, rinfo->dir_nr, ctx->pos,
- rinfo->dir_dname_len[off - fi->offset],
- rinfo->dir_dname[off - fi->offset], in);
- BUG_ON(!in);
- ftype = le32_to_cpu(in->mode) >> 12;
- vino.ino = le64_to_cpu(in->ino);
- vino.snap = le64_to_cpu(in->snapid);
+ BUG_ON(rde->offset < ctx->pos);
+
+ ctx->pos = rde->offset;
+ dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
+ i, rinfo->dir_nr, ctx->pos,
+ rde->name_len, rde->name, &rde->inode.in);
+
+ BUG_ON(!rde->inode.in);
+ ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
+ vino.ino = le64_to_cpu(rde->inode.in->ino);
+ vino.snap = le64_to_cpu(rde->inode.in->snapid);
ino = ceph_vino_to_ino(vino);
- if (!dir_emit(ctx,
- rinfo->dir_dname[off - fi->offset],
- rinfo->dir_dname_len[off - fi->offset],
- ceph_translate_ino(inode->i_sb, ino), ftype)) {
+
+ if (!dir_emit(ctx, rde->name, rde->name_len,
+ ceph_translate_ino(inode->i_sb, ino), ftype)) {
dout("filldir stopping us...\n");
return 0;
}
- off++;
ctx->pos++;
}
- if (fi->last_name) {
+ if (fi->next_offset > 2) {
ceph_mdsc_put_request(fi->last_readdir);
fi->last_readdir = NULL;
goto more;
}
/* more frags? */
- if (!ceph_frag_is_rightmost(frag)) {
- frag = ceph_frag_next(frag);
- off = 0;
- ctx->pos = ceph_make_fpos(frag, off);
+ if (!ceph_frag_is_rightmost(fi->frag)) {
+ unsigned frag = ceph_frag_next(fi->frag);
+ if (is_hash_order(ctx->pos)) {
+ loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
+ fi->next_offset, true);
+ if (new_pos > ctx->pos)
+ ctx->pos = new_pos;
+ /* keep last_name */
+ } else {
+ ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
+ kfree(fi->last_name);
+ fi->last_name = NULL;
+ }
dout("readdir next frag is %x\n", frag);
goto more;
}
@@ -466,7 +570,7 @@ more:
return 0;
}
-static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
+static void reset_readdir(struct ceph_file_info *fi)
{
if (fi->last_readdir) {
ceph_mdsc_put_request(fi->last_readdir);
@@ -476,18 +580,38 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
fi->last_name = NULL;
fi->dir_release_count = 0;
fi->readdir_cache_idx = -1;
- if (ceph_frag_is_leftmost(frag))
- fi->next_offset = 2; /* compensate for . and .. */
- else
- fi->next_offset = 0;
+ fi->next_offset = 2; /* compensate for . and .. */
fi->flags &= ~CEPH_F_ATEND;
}
+/*
+ * discard buffered readdir content on seekdir(0), or seek to new frag,
+ * or seek prior to current chunk
+ */
+static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
+{
+ struct ceph_mds_reply_info_parsed *rinfo;
+ loff_t chunk_offset;
+ if (new_pos == 0)
+ return true;
+ if (is_hash_order(new_pos)) {
+ /* no need to reset last_name for a forward seek when
+ * dentries are sotred in hash order */
+ } else if (fi->frag |= fpos_frag(new_pos)) {
+ return true;
+ }
+ rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
+ if (!rinfo || !rinfo->dir_nr)
+ return true;
+ chunk_offset = rinfo->dir_entries[0].offset;
+ return new_pos < chunk_offset ||
+ is_hash_order(new_pos) != is_hash_order(chunk_offset);
+}
+
static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
{
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_mapping->host;
- loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
loff_t retval;
inode_lock(inode);
@@ -504,25 +628,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
}
if (offset >= 0) {
+ if (need_reset_readdir(fi, offset)) {
+ dout("dir_llseek dropping %p content\n", file);
+ reset_readdir(fi);
+ } else if (is_hash_order(offset) && offset > file->f_pos) {
+ /* for hash offset, we don't know if a forward seek
+ * is within same frag */
+ fi->dir_release_count = 0;
+ fi->readdir_cache_idx = -1;
+ }
+
if (offset != file->f_pos) {
file->f_pos = offset;
file->f_version = 0;
fi->flags &= ~CEPH_F_ATEND;
}
retval = offset;
-
- if (offset == 0 ||
- fpos_frag(offset) != fi->frag ||
- fpos_off(offset) < fi->offset) {
- /* discard buffered readdir content on seekdir(0), or
- * seek to new frag, or seek prior to current chunk */
- dout("dir_llseek dropping %p content\n", file);
- reset_readdir(fi, fpos_frag(offset));
- } else if (fpos_cmp(offset, old_offset) > 0) {
- /* reset dir_release_count if we did a forward seek */
- fi->dir_release_count = 0;
- fi->readdir_cache_idx = -1;
- }
}
out:
inode_unlock(inode);
@@ -590,7 +711,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
return dentry;
}
-static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
+static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
{
return ceph_ino(inode) == CEPH_INO_ROOT &&
strncmp(dentry->d_name.name, ".ceph", 5) == 0;
@@ -1003,7 +1124,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
void ceph_invalidate_dentry_lease(struct dentry *dentry)
{
spin_lock(&dentry->d_lock);
- dentry->d_time = jiffies;
+ ceph_dentry(dentry)->time = jiffies;
ceph_dentry(dentry)->lease_shared_gen = 0;
spin_unlock(&dentry->d_lock);
}
@@ -1012,7 +1133,8 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
* Check if dentry lease is valid. If not, delete the lease. Try to
* renew if the least is more than half up.
*/
-static int dentry_lease_is_valid(struct dentry *dentry)
+static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
+ struct inode *dir)
{
struct ceph_dentry_info *di;
struct ceph_mds_session *s;
@@ -1020,12 +1142,11 @@ static int dentry_lease_is_valid(struct dentry *dentry)
u32 gen;
unsigned long ttl;
struct ceph_mds_session *session = NULL;
- struct inode *dir = NULL;
u32 seq = 0;
spin_lock(&dentry->d_lock);
di = ceph_dentry(dentry);
- if (di->lease_session) {
+ if (di && di->lease_session) {
s = di->lease_session;
spin_lock(&s->s_gen_ttl_lock);
gen = s->s_cap_gen;
@@ -1033,17 +1154,24 @@ static int dentry_lease_is_valid(struct dentry *dentry)
spin_unlock(&s->s_gen_ttl_lock);
if (di->lease_gen == gen &&
- time_before(jiffies, dentry->d_time) &&
+ time_before(jiffies, di->time) &&
time_before(jiffies, ttl)) {
valid = 1;
if (di->lease_renew_after &&
time_after(jiffies, di->lease_renew_after)) {
- /* we should renew */
- dir = d_inode(dentry->d_parent);
- session = ceph_get_mds_session(s);
- seq = di->lease_seq;
- di->lease_renew_after = 0;
- di->lease_renew_from = jiffies;
+ /*
+ * We should renew. If we're in RCU walk mode
+ * though, we can't do that so just return
+ * -ECHILD.
+ */
+ if (flags & LOOKUP_RCU) {
+ valid = -ECHILD;
+ } else {
+ session = ceph_get_mds_session(s);
+ seq = di->lease_seq;
+ di->lease_renew_after = 0;
+ di->lease_renew_from = jiffies;
+ }
}
}
}
@@ -1086,15 +1214,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
struct dentry *parent;
struct inode *dir;
- if (flags & LOOKUP_RCU)
- return -ECHILD;
+ if (flags & LOOKUP_RCU) {
+ parent = ACCESS_ONCE(dentry->d_parent);
+ dir = d_inode_rcu(parent);
+ if (!dir)
+ return -ECHILD;
+ } else {
+ parent = dget_parent(dentry);
+ dir = d_inode(parent);
+ }
dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
- parent = dget_parent(dentry);
- dir = d_inode(parent);
-
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
@@ -1103,12 +1235,16 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
} else if (d_really_is_positive(dentry) &&
ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
valid = 1;
- } else if (dentry_lease_is_valid(dentry) ||
- dir_lease_is_valid(dir, dentry)) {
- if (d_really_is_positive(dentry))
- valid = ceph_is_any_caps(d_inode(dentry));
- else
- valid = 1;
+ } else {
+ valid = dentry_lease_is_valid(dentry, flags, dir);
+ if (valid == -ECHILD)
+ return valid;
+ if (valid || dir_lease_is_valid(dir, dentry)) {
+ if (d_really_is_positive(dentry))
+ valid = ceph_is_any_caps(d_inode(dentry));
+ else
+ valid = 1;
+ }
}
if (!valid) {
@@ -1117,6 +1253,9 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
struct ceph_mds_request *req;
int op, mask, err;
+ if (flags & LOOKUP_RCU)
+ return -ECHILD;
+
op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -1152,7 +1291,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
ceph_dir_clear_complete(dir);
}
- dput(parent);
+ if (!(flags & LOOKUP_RCU))
+ dput(parent);
return valid;
}
@@ -1165,10 +1305,14 @@ static void ceph_d_release(struct dentry *dentry)
dout("d_release %p\n", dentry);
ceph_dentry_lru_del(dentry);
+
+ spin_lock(&dentry->d_lock);
+ dentry->d_fsdata = NULL;
+ spin_unlock(&dentry->d_lock);
+
if (di->lease_session)
ceph_put_mds_session(di->lease_session);
kmem_cache_free(ceph_dentry_cachep, di);
- dentry->d_fsdata = NULL;
}
static int ceph_snapdir_d_revalidate(struct dentry *dentry,
@@ -1342,10 +1486,10 @@ const struct inode_operations ceph_dir_iops = {
.permission = ceph_permission,
.getattr = ceph_getattr,
.setattr = ceph_setattr,
- .setxattr = ceph_setxattr,
- .getxattr = ceph_getxattr,
+ .setxattr = generic_setxattr,
+ .getxattr = generic_getxattr,
.listxattr = ceph_listxattr,
- .removexattr = ceph_removexattr,
+ .removexattr = generic_removexattr,
.get_acl = ceph_get_acl,
.set_acl = ceph_set_acl,
.mknod = ceph_mknod,
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 6e72c98162d5..1780218a48f0 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -95,10 +95,8 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
}
dentry = d_obtain_alias(inode);
- if (IS_ERR(dentry)) {
- iput(inode);
+ if (IS_ERR(dentry))
return dentry;
- }
err = ceph_init_dentry(dentry);
if (err < 0) {
dput(dentry);
@@ -167,10 +165,8 @@ static struct dentry *__get_parent(struct super_block *sb,
return ERR_PTR(-ENOENT);
dentry = d_obtain_alias(inode);
- if (IS_ERR(dentry)) {
- iput(inode);
+ if (IS_ERR(dentry))
return dentry;
- }
err = ceph_init_dentry(dentry);
if (err < 0) {
dput(dentry);
@@ -210,7 +206,7 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb,
dout("fh_to_parent %llx\n", cfh->parent_ino);
dentry = __get_parent(sb, NULL, cfh->ino);
- if (IS_ERR(dentry) && PTR_ERR(dentry) == -ENOENT)
+ if (unlikely(dentry == ERR_PTR(-ENOENT)))
dentry = __fh_to_dentry(sb, cfh->parent_ino);
return dentry;
}
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ef38f01c1795..0f5375d8e030 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -137,23 +137,11 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
{
struct ceph_file_info *cf;
int ret = 0;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
- struct ceph_mds_client *mdsc = fsc->mdsc;
switch (inode->i_mode & S_IFMT) {
case S_IFREG:
- /* First file open request creates the cookie, we want to keep
- * this cookie around for the filetime of the inode as not to
- * have to worry about fscache register / revoke / operation
- * races.
- *
- * Also, if we know the operation is going to invalidate data
- * (non readonly) just nuke the cache right away.
- */
- ceph_fscache_register_inode_cookie(mdsc->fsc, ci);
- if ((fmode & CEPH_FILE_MODE_WR))
- ceph_fscache_invalidate(inode);
+ ceph_fscache_register_inode_cookie(inode);
+ ceph_fscache_file_set_cookie(inode, file);
case S_IFDIR:
dout("init_file %p %p 0%o (regular)\n", inode, file,
inode->i_mode);
@@ -192,6 +180,59 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
}
/*
+ * try renew caps after session gets killed.
+ */
+int ceph_renew_caps(struct inode *inode)
+{
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_mds_request *req;
+ int err, flags, wanted;
+
+ spin_lock(&ci->i_ceph_lock);
+ wanted = __ceph_caps_file_wanted(ci);
+ if (__ceph_is_any_real_caps(ci) &&
+ (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
+ int issued = __ceph_caps_issued(ci, NULL);
+ spin_unlock(&ci->i_ceph_lock);
+ dout("renew caps %p want %s issued %s updating mds_wanted\n",
+ inode, ceph_cap_string(wanted), ceph_cap_string(issued));
+ ceph_check_caps(ci, 0, NULL);
+ return 0;
+ }
+ spin_unlock(&ci->i_ceph_lock);
+
+ flags = 0;
+ if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
+ flags = O_RDWR;
+ else if (wanted & CEPH_CAP_FILE_RD)
+ flags = O_RDONLY;
+ else if (wanted & CEPH_CAP_FILE_WR)
+ flags = O_WRONLY;
+#ifdef O_LAZY
+ if (wanted & CEPH_CAP_FILE_LAZYIO)
+ flags |= O_LAZY;
+#endif
+
+ req = prepare_open_request(inode->i_sb, flags, 0);
+ if (IS_ERR(req)) {
+ err = PTR_ERR(req);
+ goto out;
+ }
+
+ req->r_inode = inode;
+ ihold(inode);
+ req->r_num_caps = 1;
+ req->r_fmode = -1;
+
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
+ ceph_mdsc_put_request(req);
+out:
+ dout("renew caps %p open result=%d\n", inode, err);
+ return err < 0 ? err : 0;
+}
+
+/*
* If we already have the requisite capabilities, we can satisfy
* the open request locally (no need to request new caps from the
* MDS). We do, however, need to inform the MDS (asynchronously)
@@ -353,7 +394,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
- if (d_unhashed(dentry)) {
+ if (d_in_lookup(dentry)) {
dn = ceph_finish_lookup(req, dentry, err);
if (IS_ERR(dn))
err = PTR_ERR(dn);
@@ -466,7 +507,7 @@ more:
ret += zlen;
}
- didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
+ didpages = (page_align + ret) >> PAGE_SHIFT;
pos += ret;
read = pos - off;
left -= ret;
@@ -616,8 +657,7 @@ static void ceph_aio_complete(struct inode *inode,
kfree(aio_req);
}
-static void ceph_aio_complete_req(struct ceph_osd_request *req,
- struct ceph_msg *msg)
+static void ceph_aio_complete_req(struct ceph_osd_request *req)
{
int rc = req->r_result;
struct inode *inode = req->r_inode;
@@ -668,7 +708,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req,
}
}
- ceph_put_page_vector(osd_data->pages, num_pages, false);
+ ceph_put_page_vector(osd_data->pages, num_pages, !aio_req->write);
ceph_osdc_put_request(req);
if (rc < 0)
@@ -714,14 +754,21 @@ static void ceph_aio_retry_work(struct work_struct *work)
req->r_flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
CEPH_OSD_FLAG_WRITE;
- req->r_base_oloc = orig_req->r_base_oloc;
- req->r_base_oid = orig_req->r_base_oid;
+ ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
+ ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
+
+ ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
+ if (ret) {
+ ceph_osdc_put_request(req);
+ req = orig_req;
+ goto out;
+ }
req->r_ops[0] = orig_req->r_ops[0];
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
- ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
- snapc, CEPH_NOSNAP, &aio_req->mtime);
+ req->r_mtime = aio_req->mtime;
+ req->r_data_offset = req->r_ops[0].extent.offset;
ceph_osdc_put_request(orig_req);
@@ -733,7 +780,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
out:
if (ret < 0) {
req->r_result = ret;
- ceph_aio_complete_req(req, NULL);
+ ceph_aio_complete_req(req);
}
ceph_put_snap_context(snapc);
@@ -764,6 +811,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
list_add_tail(&req->r_unsafe_item,
&ci->i_unsafe_writes);
spin_unlock(&ci->i_unsafe_lock);
+
+ complete_all(&req->r_completion);
} else {
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_item);
@@ -772,6 +821,54 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
}
}
+/*
+ * Wait on any unsafe replies for the given inode. First wait on the
+ * newest request, and make that the upper bound. Then, if there are
+ * more requests, keep waiting on the oldest as long as it is still older
+ * than the original request.
+ */
+void ceph_sync_write_wait(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct list_head *head = &ci->i_unsafe_writes;
+ struct ceph_osd_request *req;
+ u64 last_tid;
+
+ if (!S_ISREG(inode->i_mode))
+ return;
+
+ spin_lock(&ci->i_unsafe_lock);
+ if (list_empty(head))
+ goto out;
+
+ /* set upper bound as _last_ entry in chain */
+
+ req = list_last_entry(head, struct ceph_osd_request,
+ r_unsafe_item);
+ last_tid = req->r_tid;
+
+ do {
+ ceph_osdc_get_request(req);
+ spin_unlock(&ci->i_unsafe_lock);
+
+ dout("sync_write_wait on tid %llu (until %llu)\n",
+ req->r_tid, last_tid);
+ wait_for_completion(&req->r_safe_completion);
+ ceph_osdc_put_request(req);
+
+ spin_lock(&ci->i_unsafe_lock);
+ /*
+ * from here on look at first entry in chain, since we
+ * only want to wait for anything older than last_tid
+ */
+ if (list_empty(head))
+ break;
+ req = list_first_entry(head, struct ceph_osd_request,
+ r_unsafe_item);
+ } while (req->r_tid < last_tid);
+out:
+ spin_unlock(&ci->i_unsafe_lock);
+}
static ssize_t
ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
@@ -806,8 +903,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
if (write) {
ret = invalidate_inode_pages2_range(inode->i_mapping,
- pos >> PAGE_CACHE_SHIFT,
- (pos + count) >> PAGE_CACHE_SHIFT);
+ pos >> PAGE_SHIFT,
+ (pos + count) >> PAGE_SHIFT);
if (ret < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
@@ -872,17 +969,15 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
* may block.
*/
truncate_inode_pages_range(inode->i_mapping, pos,
- (pos+len) | (PAGE_CACHE_SIZE - 1));
+ (pos+len) | (PAGE_SIZE - 1));
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+ req->r_mtime = mtime;
}
-
osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
false, false);
- ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
-
if (aio_req) {
aio_req->total_len += len;
aio_req->num_reqs++;
@@ -917,7 +1012,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
len = ret;
}
- ceph_put_page_vector(pages, num_pages, false);
+ ceph_put_page_vector(pages, num_pages, !write);
ceph_osdc_put_request(req);
if (ret < 0)
@@ -938,6 +1033,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
}
if (aio_req) {
+ LIST_HEAD(osd_reqs);
+
if (aio_req->num_reqs == 0) {
kfree(aio_req);
return ret;
@@ -946,8 +1043,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
CEPH_CAP_FILE_RD);
- while (!list_empty(&aio_req->osd_reqs)) {
- req = list_first_entry(&aio_req->osd_reqs,
+ list_splice(&aio_req->osd_reqs, &osd_reqs);
+ while (!list_empty(&osd_reqs)) {
+ req = list_first_entry(&osd_reqs,
struct ceph_osd_request,
r_unsafe_item);
list_del_init(&req->r_unsafe_item);
@@ -956,7 +1054,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
req, false);
if (ret < 0) {
req->r_result = ret;
- ceph_aio_complete_req(req, NULL);
+ ceph_aio_complete_req(req);
}
}
return -EIOCBQUEUED;
@@ -1006,8 +1104,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
return ret;
ret = invalidate_inode_pages2_range(inode->i_mapping,
- pos >> PAGE_CACHE_SHIFT,
- (pos + count) >> PAGE_CACHE_SHIFT);
+ pos >> PAGE_SHIFT,
+ (pos + count) >> PAGE_SHIFT);
if (ret < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
@@ -1036,7 +1134,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
* write from beginning of first page,
* regardless of io alignment
*/
- num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+ num_pages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages)) {
@@ -1067,9 +1165,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
false, true);
- /* BUG_ON(vino.snap != CEPH_NOSNAP); */
- ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
-
+ req->r_mtime = mtime;
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1159,7 +1255,7 @@ again:
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
if (pinned_page) {
- page_cache_release(pinned_page);
+ put_page(pinned_page);
pinned_page = NULL;
}
ceph_put_cap_refs(ci, got);
@@ -1188,10 +1284,10 @@ again:
if (retry_op == READ_INLINE) {
BUG_ON(ret > 0 || read > 0);
if (iocb->ki_pos < i_size &&
- iocb->ki_pos < PAGE_CACHE_SIZE) {
+ iocb->ki_pos < PAGE_SIZE) {
loff_t end = min_t(loff_t, i_size,
iocb->ki_pos + len);
- end = min_t(loff_t, end, PAGE_CACHE_SIZE);
+ end = min_t(loff_t, end, PAGE_SIZE);
if (statret < end)
zero_user_segment(page, statret, end);
ret = copy_page_to_iter(page,
@@ -1292,7 +1388,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
}
retry_snap:
- if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
+ if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
goto out;
}
@@ -1350,7 +1446,6 @@ retry_snap:
iov_iter_advance(from, written);
ceph_put_snap_context(snapc);
} else {
- loff_t old_size = i_size_read(inode);
/*
* No need to acquire the i_truncate_mutex. Because
* the MDS revokes Fwb caps before sending truncate
@@ -1361,8 +1456,6 @@ retry_snap:
written = generic_perform_write(file, from, pos);
if (likely(written >= 0))
iocb->ki_pos = pos + written;
- if (i_size_read(inode) > old_size)
- ceph_fscache_update_objectsize(inode);
inode_unlock(inode);
}
@@ -1382,12 +1475,11 @@ retry_snap:
ceph_cap_string(got));
ceph_put_cap_refs(ci, got);
- if (written >= 0 &&
- ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) ||
- ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
- err = vfs_fsync_range(file, pos, pos + written - 1, 1);
- if (err < 0)
- written = err;
+ if (written >= 0) {
+ if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_NEARFULL))
+ iocb->ki_flags |= IOCB_DSYNC;
+
+ written = generic_write_sync(iocb, written);
}
goto out_unlocked;
@@ -1407,16 +1499,14 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
{
struct inode *inode = file->f_mapping->host;
loff_t i_size;
- int ret;
+ loff_t ret;
inode_lock(inode);
if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
- if (ret < 0) {
- offset = ret;
+ if (ret < 0)
goto out;
- }
}
i_size = i_size_read(inode);
@@ -1432,7 +1522,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
* write() or lseek() might have altered it
*/
if (offset == 0) {
- offset = file->f_pos;
+ ret = file->f_pos;
goto out;
}
offset += file->f_pos;
@@ -1452,32 +1542,32 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
break;
}
- offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
+ ret = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
out:
inode_unlock(inode);
- return offset;
+ return ret;
}
static inline void ceph_zero_partial_page(
struct inode *inode, loff_t offset, unsigned size)
{
struct page *page;
- pgoff_t index = offset >> PAGE_CACHE_SHIFT;
+ pgoff_t index = offset >> PAGE_SHIFT;
page = find_lock_page(inode->i_mapping, index);
if (page) {
wait_on_page_writeback(page);
- zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
+ zero_user(page, offset & (PAGE_SIZE - 1), size);
unlock_page(page);
- page_cache_release(page);
+ put_page(page);
}
}
static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
loff_t length)
{
- loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
+ loff_t nearly = round_up(offset, PAGE_SIZE);
if (offset < nearly) {
loff_t size = nearly - offset;
if (length < size)
@@ -1486,8 +1576,8 @@ static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
offset += size;
length -= size;
}
- if (length >= PAGE_CACHE_SIZE) {
- loff_t size = round_down(length, PAGE_CACHE_SIZE);
+ if (length >= PAGE_SIZE) {
+ loff_t size = round_down(length, PAGE_SIZE);
truncate_pagecache_range(inode, offset, offset + size - 1);
offset += size;
length -= size;
@@ -1525,9 +1615,7 @@ static int ceph_zero_partial_object(struct inode *inode,
goto out;
}
- ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
- &inode->i_mtime);
-
+ req->r_mtime = inode->i_mtime;
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!ret) {
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1544,9 +1632,9 @@ static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
{
int ret = 0;
struct ceph_inode_info *ci = ceph_inode(inode);
- s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
- s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
- s32 object_size = ceph_file_layout_object_size(ci->i_layout);
+ s32 stripe_unit = ci->i_layout.stripe_unit;
+ s32 stripe_count = ci->i_layout.stripe_count;
+ s32 object_size = ci->i_layout.object_size;
u64 object_set_size = object_size * stripe_count;
u64 nearly, t;
@@ -1618,8 +1706,8 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}
- if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
- !(mode & FALLOC_FL_PUNCH_HOLE)) {
+ if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) &&
+ !(mode & FALLOC_FL_PUNCH_HOLE)) {
ret = -ENOSPC;
goto unlock;
}
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ed58b168904a..dd3a6dbf71eb 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -8,8 +8,10 @@
#include <linux/kernel.h>
#include <linux/writeback.h>
#include <linux/vmalloc.h>
+#include <linux/xattr.h>
#include <linux/posix_acl.h>
#include <linux/random.h>
+#include <linux/sort.h>
#include "super.h"
#include "mds_client.h"
@@ -92,10 +94,10 @@ const struct inode_operations ceph_file_iops = {
.permission = ceph_permission,
.setattr = ceph_setattr,
.getattr = ceph_getattr,
- .setxattr = ceph_setxattr,
- .getxattr = ceph_getxattr,
+ .setxattr = generic_setxattr,
+ .getxattr = generic_getxattr,
.listxattr = ceph_listxattr,
- .removexattr = ceph_removexattr,
+ .removexattr = generic_removexattr,
.get_acl = ceph_get_acl,
.set_acl = ceph_set_acl,
};
@@ -253,6 +255,9 @@ static int ceph_fill_dirfrag(struct inode *inode,
diri_auth = ci->i_auth_cap->mds;
spin_unlock(&ci->i_ceph_lock);
+ if (mds == -1) /* CDIR_AUTH_PARENT */
+ mds = diri_auth;
+
mutex_lock(&ci->i_fragtree_mutex);
if (ndist == 0 && mds == diri_auth) {
/* no delegation info needed. */
@@ -299,20 +304,38 @@ out:
return err;
}
+static int frag_tree_split_cmp(const void *l, const void *r)
+{
+ struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
+ struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
+ return ceph_frag_compare(ls->frag, rs->frag);
+}
+
+static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
+{
+ if (!frag)
+ return f == ceph_frag_make(0, 0);
+ if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
+ return false;
+ return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
+}
+
static int ceph_fill_fragtree(struct inode *inode,
struct ceph_frag_tree_head *fragtree,
struct ceph_mds_reply_dirfrag *dirinfo)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_inode_frag *frag;
+ struct ceph_inode_frag *frag, *prev_frag = NULL;
struct rb_node *rb_node;
- int i;
- u32 id, nsplits;
+ unsigned i, split_by, nsplits;
+ u32 id;
bool update = false;
mutex_lock(&ci->i_fragtree_mutex);
nsplits = le32_to_cpu(fragtree->nsplits);
- if (nsplits) {
+ if (nsplits != ci->i_fragtree_nsplits) {
+ update = true;
+ } else if (nsplits) {
i = prandom_u32() % nsplits;
id = le32_to_cpu(fragtree->splits[i].frag);
if (!__ceph_find_frag(ci, id))
@@ -331,10 +354,22 @@ static int ceph_fill_fragtree(struct inode *inode,
if (!update)
goto out_unlock;
+ if (nsplits > 1) {
+ sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
+ frag_tree_split_cmp, NULL);
+ }
+
dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
rb_node = rb_first(&ci->i_fragtree);
for (i = 0; i < nsplits; i++) {
id = le32_to_cpu(fragtree->splits[i].frag);
+ split_by = le32_to_cpu(fragtree->splits[i].by);
+ if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
+ pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
+ "frag %x split by %d\n", ceph_vinop(inode),
+ i, nsplits, id, split_by);
+ continue;
+ }
frag = NULL;
while (rb_node) {
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
@@ -346,8 +381,14 @@ static int ceph_fill_fragtree(struct inode *inode,
break;
}
rb_node = rb_next(rb_node);
- rb_erase(&frag->node, &ci->i_fragtree);
- kfree(frag);
+ /* delete stale split/leaf node */
+ if (frag->split_by > 0 ||
+ !is_frag_child(frag->frag, prev_frag)) {
+ rb_erase(&frag->node, &ci->i_fragtree);
+ if (frag->split_by > 0)
+ ci->i_fragtree_nsplits--;
+ kfree(frag);
+ }
frag = NULL;
}
if (!frag) {
@@ -355,14 +396,23 @@ static int ceph_fill_fragtree(struct inode *inode,
if (IS_ERR(frag))
continue;
}
- frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+ if (frag->split_by == 0)
+ ci->i_fragtree_nsplits++;
+ frag->split_by = split_by;
dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+ prev_frag = frag;
}
while (rb_node) {
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
rb_node = rb_next(rb_node);
- rb_erase(&frag->node, &ci->i_fragtree);
- kfree(frag);
+ /* delete stale split/leaf node */
+ if (frag->split_by > 0 ||
+ !is_frag_child(frag->frag, prev_frag)) {
+ rb_erase(&frag->node, &ci->i_fragtree);
+ if (frag->split_by > 0)
+ ci->i_fragtree_nsplits--;
+ kfree(frag);
+ }
}
out_unlock:
mutex_unlock(&ci->i_fragtree_mutex);
@@ -396,7 +446,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_symlink = NULL;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
- ci->i_pool_ns_len = 0;
+ RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex);
@@ -418,7 +468,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_dirty_item);
INIT_LIST_HEAD(&ci->i_flushing_item);
ci->i_prealloc_cap_flush = NULL;
- ci->i_cap_flush_tree = RB_ROOT;
+ INIT_LIST_HEAD(&ci->i_cap_flush_list);
init_waitqueue_head(&ci->i_cap_wq);
ci->i_hold_caps_min = 0;
ci->i_hold_caps_max = 0;
@@ -427,7 +477,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_head_snapc = NULL;
ci->i_snap_caps = 0;
- for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
+ for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
ci->i_nr_by_mode[i] = 0;
mutex_init(&ci->i_truncate_mutex);
@@ -512,6 +562,7 @@ void ceph_destroy_inode(struct inode *inode)
rb_erase(n, &ci->i_fragtree);
kfree(frag);
}
+ ci->i_fragtree_nsplits = 0;
__ceph_destroy_xattrs(ci);
if (ci->i_xattrs.blob)
@@ -519,6 +570,8 @@ void ceph_destroy_inode(struct inode *inode)
if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+ ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
+
call_rcu(&inode->i_rcu, ceph_i_callback);
}
@@ -532,6 +585,19 @@ int ceph_drop_inode(struct inode *inode)
return 1;
}
+void ceph_evict_inode(struct inode *inode)
+{
+ /* wait unsafe sync writes */
+ ceph_sync_write_wait(inode);
+ truncate_inode_pages_final(&inode->i_data);
+ clear_inode(inode);
+}
+
+static inline blkcnt_t calc_inode_blocks(u64 size)
+{
+ return (size + (1<<9) - 1) >> 9;
+}
+
/*
* Helpers to fill in size, ctime, mtime, and atime. We have to be
* careful because either the client or MDS may have more up to date
@@ -554,7 +620,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
size = 0;
}
i_size_write(inode, size);
- inode->i_blocks = (size + (1<<9) - 1) >> 9;
+ inode->i_blocks = calc_inode_blocks(size);
ci->i_reported_size = size;
if (truncate_seq != ci->i_truncate_seq) {
dout("truncate_seq %u -> %u\n",
@@ -677,6 +743,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int issued = 0, implemented, new_issued;
struct timespec mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL;
+ struct ceph_string *pool_ns = NULL;
struct ceph_cap *new_cap = NULL;
int err = 0;
bool wake = false;
@@ -704,6 +771,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
iinfo->xattr_len);
}
+ if (iinfo->pool_ns_len > 0)
+ pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
+ iinfo->pool_ns_len);
+
spin_lock(&ci->i_ceph_lock);
/*
@@ -758,10 +829,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (new_version ||
(new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
- if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
+ s64 old_pool = ci->i_layout.pool_id;
+ struct ceph_string *old_ns;
+
+ ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
+ old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+ lockdep_is_held(&ci->i_ceph_lock));
+ rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
+
+ if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
- ci->i_layout = info->layout;
- ci->i_pool_ns_len = iinfo->pool_ns_len;
+
+ pool_ns = old_ns;
queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(info->truncate_seq),
@@ -813,9 +892,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
spin_unlock(&ci->i_ceph_lock);
- err = -EINVAL;
- if (WARN_ON(symlen != i_size_read(inode)))
- goto out;
+ if (symlen != i_size_read(inode)) {
+ pr_err("fill_inode %llx.%llx BAD symlink "
+ "size %lld\n", ceph_vinop(inode),
+ i_size_read(inode));
+ i_size_write(inode, symlen);
+ inode->i_blocks = calc_inode_blocks(symlen);
+ }
err = -ENOMEM;
sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
@@ -925,6 +1008,7 @@ out:
ceph_put_cap(mdsc, new_cap);
if (xattr_blob)
ceph_buffer_put(xattr_blob);
+ ceph_put_string(pool_ns);
return err;
}
@@ -958,7 +1042,7 @@ static void update_dentry_lease(struct dentry *dentry,
goto out_unlock;
if (di->lease_gen == session->s_cap_gen &&
- time_before(ttl, dentry->d_time))
+ time_before(ttl, di->time))
goto out_unlock; /* we already have a newer lease. */
if (di->lease_session && di->lease_session != session)
@@ -972,7 +1056,7 @@ static void update_dentry_lease(struct dentry *dentry,
di->lease_seq = le32_to_cpu(lease->seq);
di->lease_renew_after = half_ttl;
di->lease_renew_from = 0;
- dentry->d_time = ttl;
+ di->time = ttl;
out_unlock:
spin_unlock(&dentry->d_lock);
return;
@@ -1104,7 +1188,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
dname.name = rinfo->dname;
dname.len = rinfo->dname_len;
- dname.hash = full_name_hash(dname.name, dname.len);
+ dname.hash = full_name_hash(parent, dname.name, dname.len);
vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
retry_lookup:
@@ -1308,12 +1392,13 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
int i, err = 0;
for (i = 0; i < rinfo->dir_nr; i++) {
+ struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
struct ceph_vino vino;
struct inode *in;
int rc;
- vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
- vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
+ vino.ino = le64_to_cpu(rde->inode.in->ino);
+ vino.snap = le64_to_cpu(rde->inode.in->snapid);
in = ceph_get_inode(req->r_dentry->d_sb, vino);
if (IS_ERR(in)) {
@@ -1321,14 +1406,14 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
dout("new_inode badness got %d\n", err);
continue;
}
- rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+ rc = fill_inode(in, NULL, &rde->inode, NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation);
if (rc < 0) {
pr_err("fill_inode badness on %p got %d\n", in, rc);
err = rc;
- continue;
}
+ iput(in);
}
return err;
@@ -1338,7 +1423,7 @@ void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
{
if (ctl->page) {
kunmap(ctl->page);
- page_cache_release(ctl->page);
+ put_page(ctl->page);
ctl->page = NULL;
}
}
@@ -1348,7 +1433,7 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
struct ceph_mds_request *req)
{
struct ceph_inode_info *ci = ceph_inode(dir);
- unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*);
+ unsigned nsize = PAGE_SIZE / sizeof(struct dentry*);
unsigned idx = ctl->index % nsize;
pgoff_t pgoff = ctl->index / nsize;
@@ -1367,7 +1452,7 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
unlock_page(ctl->page);
ctl->dentries = kmap(ctl->page);
if (idx == 0)
- memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
+ memset(ctl->dentries, 0, PAGE_SIZE);
}
if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1386,6 +1471,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
struct ceph_mds_session *session)
{
struct dentry *parent = req->r_dentry;
+ struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct qstr dname;
struct dentry *dn;
@@ -1393,22 +1479,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
int err = 0, skipped = 0, ret, i;
struct inode *snapdir = NULL;
struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
- struct ceph_dentry_info *di;
u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+ u32 last_hash = 0;
+ u32 fpos_offset;
struct ceph_readdir_cache_control cache_ctl = {};
if (req->r_aborted)
return readdir_prepopulate_inodes_only(req, session);
+ if (rinfo->hash_order && req->r_path2) {
+ last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+ req->r_path2, strlen(req->r_path2));
+ last_hash = ceph_frag_value(last_hash);
+ }
+
if (rinfo->dir_dir &&
le32_to_cpu(rinfo->dir_dir->frag) != frag) {
dout("readdir_prepopulate got new frag %x -> %x\n",
frag, le32_to_cpu(rinfo->dir_dir->frag));
frag = le32_to_cpu(rinfo->dir_dir->frag);
- if (ceph_frag_is_leftmost(frag))
+ if (!rinfo->hash_order)
req->r_readdir_offset = 2;
- else
- req->r_readdir_offset = 0;
}
if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
@@ -1426,24 +1517,37 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
/* note dir version at start of readdir so we can tell
* if any dentries get dropped */
- struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
req->r_readdir_cache_idx = 0;
}
cache_ctl.index = req->r_readdir_cache_idx;
+ fpos_offset = req->r_readdir_offset;
/* FIXME: release caps/leases if error occurs */
for (i = 0; i < rinfo->dir_nr; i++) {
+ struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
struct ceph_vino vino;
- dname.name = rinfo->dir_dname[i];
- dname.len = rinfo->dir_dname_len[i];
- dname.hash = full_name_hash(dname.name, dname.len);
-
- vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
- vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
+ dname.name = rde->name;
+ dname.len = rde->name_len;
+ dname.hash = full_name_hash(parent, dname.name, dname.len);
+
+ vino.ino = le64_to_cpu(rde->inode.in->ino);
+ vino.snap = le64_to_cpu(rde->inode.in->snapid);
+
+ if (rinfo->hash_order) {
+ u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+ rde->name, rde->name_len);
+ hash = ceph_frag_value(hash);
+ if (hash != last_hash)
+ fpos_offset = 2;
+ last_hash = hash;
+ rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
+ } else {
+ rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
+ }
retry_lookup:
dn = d_lookup(parent, &dname);
@@ -1489,7 +1593,7 @@ retry_lookup:
}
}
- ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+ ret = fill_inode(in, NULL, &rde->inode, NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation);
if (ret < 0) {
@@ -1522,11 +1626,9 @@ retry_lookup:
dn = realdn;
}
- di = dn->d_fsdata;
- di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+ ceph_dentry(dn)->offset = rde->offset;
- update_dentry_lease(dn, rinfo->dir_dlease[i],
- req->r_session,
+ update_dentry_lease(dn, rde->lease, req->r_session,
req->r_request_started);
if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
@@ -1561,7 +1663,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
spin_lock(&ci->i_ceph_lock);
dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
i_size_write(inode, size);
- inode->i_blocks = (size + (1 << 9) - 1) >> 9;
+ inode->i_blocks = calc_inode_blocks(size);
/* tell the MDS if we are approaching max_size */
if ((size << 1) >= ci->i_max_size &&
@@ -1623,10 +1725,21 @@ static void ceph_invalidate_work(struct work_struct *work)
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_pg_inv_work);
struct inode *inode = &ci->vfs_inode;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
u32 orig_gen;
int check = 0;
mutex_lock(&ci->i_truncate_mutex);
+
+ if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
+ inode, ceph_ino(inode));
+ mapping_set_error(inode->i_mapping, -EIO);
+ truncate_pagecache(inode, 0);
+ mutex_unlock(&ci->i_truncate_mutex);
+ goto out;
+ }
+
spin_lock(&ci->i_ceph_lock);
dout("invalidate_pages %p gen %d revoking %d\n", inode,
ci->i_rdcache_gen, ci->i_rdcache_revoking);
@@ -1640,7 +1753,9 @@ static void ceph_invalidate_work(struct work_struct *work)
orig_gen = ci->i_rdcache_gen;
spin_unlock(&ci->i_ceph_lock);
- truncate_pagecache(inode, 0);
+ if (invalidate_inode_pages2(inode->i_mapping) < 0) {
+ pr_err("invalidate_pages %p fails\n", inode);
+ }
spin_lock(&ci->i_ceph_lock);
if (orig_gen == ci->i_rdcache_gen &&
@@ -1770,22 +1885,18 @@ static const struct inode_operations ceph_symlink_iops = {
.get_link = simple_get_link,
.setattr = ceph_setattr,
.getattr = ceph_getattr,
- .setxattr = ceph_setxattr,
- .getxattr = ceph_getxattr,
+ .setxattr = generic_setxattr,
+ .getxattr = generic_getxattr,
.listxattr = ceph_listxattr,
- .removexattr = ceph_removexattr,
+ .removexattr = generic_removexattr,
};
-/*
- * setattr
- */
-int ceph_setattr(struct dentry *dentry, struct iattr *attr)
+int __ceph_setattr(struct inode *inode, struct iattr *attr)
{
- struct inode *inode = d_inode(dentry);
struct ceph_inode_info *ci = ceph_inode(inode);
const unsigned int ia_valid = attr->ia_valid;
struct ceph_mds_request *req;
- struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_cap_flush *prealloc_cf;
int issued;
int release = 0, dirtied = 0;
@@ -1923,8 +2034,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
if ((issued & CEPH_CAP_FILE_EXCL) &&
attr->ia_size > inode->i_size) {
i_size_write(inode, attr->ia_size);
- inode->i_blocks =
- (attr->ia_size + (1 << 9) - 1) >> 9;
+ inode->i_blocks = calc_inode_blocks(attr->ia_size);
inode->i_ctime = attr->ia_ctime;
ci->i_reported_size = attr->ia_size;
dirtied |= CEPH_CAP_FILE_EXCL;
@@ -2010,6 +2120,14 @@ out_put:
}
/*
+ * setattr
+ */
+int ceph_setattr(struct dentry *dentry, struct iattr *attr)
+{
+ return __ceph_setattr(d_inode(dentry), attr);
+}
+
+/*
* Verify that we have a lease on the given mask. If not,
* do a getattr against an mds.
*/
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index f851d8d70158..7d752d53353a 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -21,10 +21,10 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
if (!err) {
- l.stripe_unit = ceph_file_layout_su(ci->i_layout);
- l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
- l.object_size = ceph_file_layout_object_size(ci->i_layout);
- l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
+ l.stripe_unit = ci->i_layout.stripe_unit;
+ l.stripe_count = ci->i_layout.stripe_count;
+ l.object_size = ci->i_layout.object_size;
+ l.data_pool = ci->i_layout.pool_id;
l.preferred_osd = (s32)-1;
if (copy_to_user(arg, &l, sizeof(l)))
return -EFAULT;
@@ -82,19 +82,19 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
if (l.stripe_count)
nl.stripe_count = l.stripe_count;
else
- nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+ nl.stripe_count = ci->i_layout.stripe_count;
if (l.stripe_unit)
nl.stripe_unit = l.stripe_unit;
else
- nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
+ nl.stripe_unit = ci->i_layout.stripe_unit;
if (l.object_size)
nl.object_size = l.object_size;
else
- nl.object_size = ceph_file_layout_object_size(ci->i_layout);
+ nl.object_size = ci->i_layout.object_size;
if (l.data_pool)
nl.data_pool = l.data_pool;
else
- nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout);
+ nl.data_pool = ci->i_layout.pool_id;
/* this is obsolete, and always -1 */
nl.preferred_osd = le64_to_cpu(-1);
@@ -183,7 +183,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
struct ceph_osd_client *osdc =
&ceph_sb_to_client(inode->i_sb)->client->osdc;
struct ceph_object_locator oloc;
- struct ceph_object_id oid;
+ CEPH_DEFINE_OID_ONSTACK(oid);
u64 len = 1, olen;
u64 tmp;
struct ceph_pg pgid;
@@ -193,17 +193,17 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
if (copy_from_user(&dl, arg, sizeof(dl)))
return -EFAULT;
- down_read(&osdc->map_sem);
+ down_read(&osdc->lock);
r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
&dl.object_no, &dl.object_offset,
&olen);
if (r < 0) {
- up_read(&osdc->map_sem);
+ up_read(&osdc->lock);
return -EIO;
}
dl.file_offset -= dl.object_offset;
- dl.object_size = ceph_file_layout_object_size(ci->i_layout);
- dl.block_size = ceph_file_layout_su(ci->i_layout);
+ dl.object_size = ci->i_layout.object_size;
+ dl.block_size = ci->i_layout.stripe_unit;
/* block_offset = object_offset % block_size */
tmp = dl.object_offset;
@@ -212,16 +212,19 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
ceph_ino(inode), dl.object_no);
- oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
- ceph_oid_set_name(&oid, dl.object_name);
+ oloc.pool = ci->i_layout.pool_id;
+ oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
+ ceph_oid_printf(&oid, "%s", dl.object_name);
- r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid);
+ r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
+
+ ceph_oloc_destroy(&oloc);
if (r < 0) {
- up_read(&osdc->map_sem);
+ up_read(&osdc->lock);
return r;
}
- dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
+ dl.osd = ceph_pg_to_acting_primary(osdc->osdmap, &pgid);
if (dl.osd >= 0) {
struct ceph_entity_addr *a =
ceph_osd_addr(osdc->osdmap, dl.osd);
@@ -230,7 +233,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
} else {
memset(&dl.osd_addr, 0, sizeof(dl.osd_addr));
}
- up_read(&osdc->map_sem);
+ up_read(&osdc->lock);
/* send result back to user */
if (copy_to_user(arg, &dl, sizeof(dl)))
@@ -247,9 +250,8 @@ static long ceph_ioctl_lazyio(struct file *file)
if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
spin_lock(&ci->i_ceph_lock);
- ci->i_nr_by_mode[fi->fmode]--;
fi->fmode |= CEPH_FILE_MODE_LAZY;
- ci->i_nr_by_mode[fi->fmode]++;
+ ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++;
spin_unlock(&ci->i_ceph_lock);
dout("ioctl_layzio: file %p marked lazy\n", file);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 44852c3ae531..fa59a85226b2 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -48,7 +48,7 @@
struct ceph_reconnect_state {
int nr_caps;
struct ceph_pagelist *pagelist;
- bool flock;
+ unsigned msg_version;
};
static void __wake_requests(struct ceph_mds_client *mdsc,
@@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end,
} else
info->inline_version = CEPH_INLINE_NONE;
+ info->pool_ns_len = 0;
+ info->pool_ns_data = NULL;
if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
- ceph_decode_need(p, end, info->pool_ns_len, bad);
- *p += info->pool_ns_len;
- } else {
- info->pool_ns_len = 0;
+ if (info->pool_ns_len > 0) {
+ ceph_decode_need(p, end, info->pool_ns_len, bad);
+ info->pool_ns_data = *p;
+ *p += info->pool_ns_len;
+ }
}
return 0;
@@ -181,17 +184,18 @@ static int parse_reply_info_dir(void **p, void *end,
ceph_decode_need(p, end, sizeof(num) + 2, bad);
num = ceph_decode_32(p);
- info->dir_end = ceph_decode_8(p);
- info->dir_complete = ceph_decode_8(p);
+ {
+ u16 flags = ceph_decode_16(p);
+ info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
+ info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
+ info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
+ }
if (num == 0)
goto done;
- BUG_ON(!info->dir_in);
- info->dir_dname = (void *)(info->dir_in + num);
- info->dir_dname_len = (void *)(info->dir_dname + num);
- info->dir_dlease = (void *)(info->dir_dname_len + num);
- if ((unsigned long)(info->dir_dlease + num) >
- (unsigned long)info->dir_in + info->dir_buf_size) {
+ BUG_ON(!info->dir_entries);
+ if ((unsigned long)(info->dir_entries + num) >
+ (unsigned long)info->dir_entries + info->dir_buf_size) {
pr_err("dir contents are larger than expected\n");
WARN_ON(1);
goto bad;
@@ -199,21 +203,23 @@ static int parse_reply_info_dir(void **p, void *end,
info->dir_nr = num;
while (num) {
+ struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
/* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad);
- info->dir_dname_len[i] = ceph_decode_32(p);
- ceph_decode_need(p, end, info->dir_dname_len[i], bad);
- info->dir_dname[i] = *p;
- *p += info->dir_dname_len[i];
- dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
- info->dir_dname[i]);
- info->dir_dlease[i] = *p;
+ rde->name_len = ceph_decode_32(p);
+ ceph_decode_need(p, end, rde->name_len, bad);
+ rde->name = *p;
+ *p += rde->name_len;
+ dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
+ rde->lease = *p;
*p += sizeof(struct ceph_mds_reply_lease);
/* inode */
- err = parse_reply_info_in(p, end, &info->dir_in[i], features);
+ err = parse_reply_info_in(p, end, &rde->inode, features);
if (err < 0)
goto out_bad;
+ /* ceph_readdir_prepopulate() will update it */
+ rde->offset = 0;
i++;
num--;
}
@@ -345,9 +351,9 @@ out_bad:
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
- if (!info->dir_in)
+ if (!info->dir_entries)
return;
- free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
+ free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
}
@@ -386,9 +392,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
if (atomic_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
- ceph_auth_destroy_authorizer(
- s->s_mdsc->fsc->client->monc.auth,
- s->s_auth.authorizer);
+ ceph_auth_destroy_authorizer(s->s_auth.authorizer);
kfree(s);
}
}
@@ -468,7 +472,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_flushing);
- INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
dout("register_session mds%d\n", mds);
if (mds >= mdsc->max_sessions) {
@@ -569,51 +572,23 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req);
}
+DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
+
/*
* lookup session, bump ref if found.
*
* called under mdsc->mutex.
*/
-static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
- u64 tid)
+static struct ceph_mds_request *
+lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
{
struct ceph_mds_request *req;
- struct rb_node *n = mdsc->request_tree.rb_node;
-
- while (n) {
- req = rb_entry(n, struct ceph_mds_request, r_node);
- if (tid < req->r_tid)
- n = n->rb_left;
- else if (tid > req->r_tid)
- n = n->rb_right;
- else {
- ceph_mdsc_get_request(req);
- return req;
- }
- }
- return NULL;
-}
-
-static void __insert_request(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *new)
-{
- struct rb_node **p = &mdsc->request_tree.rb_node;
- struct rb_node *parent = NULL;
- struct ceph_mds_request *req = NULL;
- while (*p) {
- parent = *p;
- req = rb_entry(parent, struct ceph_mds_request, r_node);
- if (new->r_tid < req->r_tid)
- p = &(*p)->rb_left;
- else if (new->r_tid > req->r_tid)
- p = &(*p)->rb_right;
- else
- BUG();
- }
+ req = lookup_request(&mdsc->request_tree, tid);
+ if (req)
+ ceph_mdsc_get_request(req);
- rb_link_node(&new->r_node, parent, p);
- rb_insert_color(&new->r_node, &mdsc->request_tree);
+ return req;
}
/*
@@ -632,7 +607,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
- __insert_request(mdsc, req);
+ insert_request(&mdsc->request_tree, req);
req->r_uid = current_fsuid();
req->r_gid = current_fsgid();
@@ -665,8 +640,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
}
}
- rb_erase(&req->r_node, &mdsc->request_tree);
- RB_CLEAR_NODE(&req->r_node);
+ erase_request(&mdsc->request_tree, req);
if (req->r_unsafe_dir && req->r_got_unsafe) {
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
@@ -870,12 +844,14 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
int metadata_bytes = 0;
int metadata_key_count = 0;
struct ceph_options *opt = mdsc->fsc->client->options;
+ struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
void *p;
const char* metadata[][2] = {
{"hostname", utsname()->nodename},
{"kernel_version", utsname()->release},
- {"entity_id", opt->name ? opt->name : ""},
+ {"entity_id", opt->name ? : ""},
+ {"root", fsopt->server_path ? : "/"},
{NULL, NULL}
};
@@ -1151,9 +1127,11 @@ out:
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
+ struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
- int drop = 0;
+ bool drop = false;
+ bool invalidate = false;
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
@@ -1161,22 +1139,25 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
- struct ceph_mds_client *mdsc =
- ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = fsc->mdsc;
- while (true) {
- struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
- if (!n)
- break;
- cf = rb_entry(n, struct ceph_cap_flush, i_node);
- rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
- list_add(&cf->list, &to_remove);
+ ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+
+ if (ci->i_wrbuffer_ref > 0 &&
+ ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
+ invalidate = true;
+
+ while (!list_empty(&ci->i_cap_flush_list)) {
+ cf = list_first_entry(&ci->i_cap_flush_list,
+ struct ceph_cap_flush, i_list);
+ list_del(&cf->i_list);
+ list_add(&cf->i_list, &to_remove);
}
spin_lock(&mdsc->cap_dirty_lock);
- list_for_each_entry(cf, &to_remove, list)
- rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+ list_for_each_entry(cf, &to_remove, i_list)
+ list_del(&cf->g_list);
if (!list_empty(&ci->i_dirty_item)) {
pr_warn_ratelimited(
@@ -1185,7 +1166,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
- drop = 1;
+ drop = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
@@ -1195,12 +1176,12 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
- drop = 1;
+ drop = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
- list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+ list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
}
}
@@ -1208,11 +1189,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
while (!list_empty(&to_remove)) {
struct ceph_cap_flush *cf;
cf = list_first_entry(&to_remove,
- struct ceph_cap_flush, list);
- list_del(&cf->list);
+ struct ceph_cap_flush, i_list);
+ list_del(&cf->i_list);
ceph_free_cap_flush(cf);
}
- while (drop--)
+
+ wake_up_all(&ci->i_cap_wq);
+ if (invalidate)
+ ceph_queue_invalidate(inode);
+ if (drop)
iput(inode);
return 0;
}
@@ -1222,12 +1207,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
*/
static void remove_session_caps(struct ceph_mds_session *session)
{
+ struct ceph_fs_client *fsc = session->s_mdsc->fsc;
+ struct super_block *sb = fsc->sb;
dout("remove_session_caps on %p\n", session);
- iterate_session_caps(session, remove_session_caps_cb, NULL);
+ iterate_session_caps(session, remove_session_caps_cb, fsc);
+
+ wake_up_all(&fsc->mdsc->cap_flushing_wq);
spin_lock(&session->s_cap_lock);
if (session->s_nr_caps > 0) {
- struct super_block *sb = session->s_mdsc->fsc->sb;
struct inode *inode;
struct ceph_cap *cap, *prev = NULL;
struct ceph_vino vino;
@@ -1272,13 +1260,13 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
{
struct ceph_inode_info *ci = ceph_inode(inode);
- wake_up_all(&ci->i_cap_wq);
if (arg) {
spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
}
+ wake_up_all(&ci->i_cap_wq);
return 0;
}
@@ -1492,35 +1480,21 @@ static int trim_caps(struct ceph_mds_client *mdsc,
return 0;
}
-static int check_capsnap_flush(struct ceph_inode_info *ci,
- u64 want_snap_seq)
-{
- int ret = 1;
- spin_lock(&ci->i_ceph_lock);
- if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
- struct ceph_cap_snap *capsnap =
- list_first_entry(&ci->i_cap_snaps,
- struct ceph_cap_snap, ci_item);
- ret = capsnap->follows >= want_snap_seq;
- }
- spin_unlock(&ci->i_ceph_lock);
- return ret;
-}
-
static int check_caps_flush(struct ceph_mds_client *mdsc,
u64 want_flush_tid)
{
- struct rb_node *n;
- struct ceph_cap_flush *cf;
int ret = 1;
spin_lock(&mdsc->cap_dirty_lock);
- n = rb_first(&mdsc->cap_flush_tree);
- cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
- if (cf && cf->tid <= want_flush_tid) {
- dout("check_caps_flush still flushing tid %llu <= %llu\n",
- cf->tid, want_flush_tid);
- ret = 0;
+ if (!list_empty(&mdsc->cap_flush_list)) {
+ struct ceph_cap_flush *cf =
+ list_first_entry(&mdsc->cap_flush_list,
+ struct ceph_cap_flush, g_list);
+ if (cf->tid <= want_flush_tid) {
+ dout("check_caps_flush still flushing tid "
+ "%llu <= %llu\n", cf->tid, want_flush_tid);
+ ret = 0;
+ }
}
spin_unlock(&mdsc->cap_dirty_lock);
return ret;
@@ -1532,54 +1506,9 @@ static int check_caps_flush(struct ceph_mds_client *mdsc,
* returns true if we've flushed through want_flush_tid
*/
static void wait_caps_flush(struct ceph_mds_client *mdsc,
- u64 want_flush_tid, u64 want_snap_seq)
+ u64 want_flush_tid)
{
- int mds;
-
- dout("check_caps_flush want %llu snap want %llu\n",
- want_flush_tid, want_snap_seq);
- mutex_lock(&mdsc->mutex);
- for (mds = 0; mds < mdsc->max_sessions; ) {
- struct ceph_mds_session *session = mdsc->sessions[mds];
- struct inode *inode = NULL;
-
- if (!session) {
- mds++;
- continue;
- }
- get_session(session);
- mutex_unlock(&mdsc->mutex);
-
- mutex_lock(&session->s_mutex);
- if (!list_empty(&session->s_cap_snaps_flushing)) {
- struct ceph_cap_snap *capsnap =
- list_first_entry(&session->s_cap_snaps_flushing,
- struct ceph_cap_snap,
- flushing_item);
- struct ceph_inode_info *ci = capsnap->ci;
- if (!check_capsnap_flush(ci, want_snap_seq)) {
- dout("check_cap_flush still flushing snap %p "
- "follows %lld <= %lld to mds%d\n",
- &ci->vfs_inode, capsnap->follows,
- want_snap_seq, mds);
- inode = igrab(&ci->vfs_inode);
- }
- }
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
-
- if (inode) {
- wait_event(mdsc->cap_flushing_wq,
- check_capsnap_flush(ceph_inode(inode),
- want_snap_seq));
- iput(inode);
- } else {
- mds++;
- }
-
- mutex_lock(&mdsc->mutex);
- }
- mutex_unlock(&mdsc->mutex);
+ dout("check_caps_flush want %llu\n", want_flush_tid);
wait_event(mdsc->cap_flushing_wq,
check_caps_flush(mdsc, want_flush_tid));
@@ -1610,7 +1539,7 @@ again:
while (!list_empty(&tmp_list)) {
if (!msg) {
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
- PAGE_CACHE_SIZE, GFP_NOFS, false);
+ PAGE_SIZE, GFP_NOFS, false);
if (!msg)
goto out_err;
head = msg->front.iov_base;
@@ -1673,8 +1602,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
- size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
- sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
+ size_t size = sizeof(struct ceph_mds_reply_dir_entry);
int order, num_entries;
spin_lock(&ci->i_ceph_lock);
@@ -1685,14 +1613,14 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
order = get_order(size * num_entries);
while (order >= 0) {
- rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
- __GFP_NOWARN,
- order);
- if (rinfo->dir_in)
+ rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
+ __GFP_NOWARN,
+ order);
+ if (rinfo->dir_entries)
break;
order--;
}
- if (!rinfo->dir_in)
+ if (!rinfo->dir_entries)
return -ENOMEM;
num_entries = (PAGE_SIZE << order) / size;
@@ -1724,6 +1652,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
INIT_LIST_HEAD(&req->r_unsafe_target_item);
req->r_fmode = -1;
kref_init(&req->r_kref);
+ RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_wait);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
@@ -2177,6 +2106,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
mds = __choose_mds(mdsc, req);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+ if (mdsc->mdsmap_err) {
+ err = mdsc->mdsmap_err;
+ dout("do_request mdsmap err %d\n", err);
+ goto finish;
+ }
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
goto out;
@@ -2306,14 +2240,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- /* deny access to directories with pool_ns layouts */
- if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
- ceph_inode(req->r_inode)->i_pool_ns_len)
- return -EIO;
- if (req->r_locked_dir &&
- ceph_inode(req->r_locked_dir)->i_pool_ns_len)
- return -EIO;
-
/* issue */
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
@@ -2416,7 +2342,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* get request, session */
tid = le64_to_cpu(msg->hdr.tid);
mutex_lock(&mdsc->mutex);
- req = __lookup_request(mdsc, tid);
+ req = lookup_get_request(mdsc, tid);
if (!req) {
dout("handle_reply on unknown tid %llu\n", tid);
mutex_unlock(&mdsc->mutex);
@@ -2606,7 +2532,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
fwd_seq = ceph_decode_32(&p);
mutex_lock(&mdsc->mutex);
- req = __lookup_request(mdsc, tid);
+ req = lookup_get_request(mdsc, tid);
if (!req) {
dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
goto out; /* dup reply? */
@@ -2805,13 +2731,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_mds_cap_reconnect v2;
struct ceph_mds_cap_reconnect_v1 v1;
} rec;
- size_t reclen;
struct ceph_inode_info *ci;
struct ceph_reconnect_state *recon_state = arg;
struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
int pathlen, err;
u64 pathbase;
+ u64 snap_follows;
struct dentry *dentry;
ci = cap->ci;
@@ -2834,9 +2760,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
path = NULL;
pathlen = 0;
}
- err = ceph_pagelist_encode_string(pagelist, path, pathlen);
- if (err)
- goto out_free;
spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */
@@ -2844,14 +2767,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->mseq = 0; /* and migrate_seq */
cap->cap_gen = cap->session->s_cap_gen;
- if (recon_state->flock) {
+ if (recon_state->msg_version >= 2) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.v2.issued = cpu_to_le32(cap->issued);
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v2.pathbase = cpu_to_le64(pathbase);
rec.v2.flock_len = 0;
- reclen = sizeof(rec.v2);
} else {
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -2861,13 +2783,23 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v1.pathbase = cpu_to_le64(pathbase);
- reclen = sizeof(rec.v1);
+ }
+
+ if (list_empty(&ci->i_cap_snaps)) {
+ snap_follows = 0;
+ } else {
+ struct ceph_cap_snap *capsnap =
+ list_first_entry(&ci->i_cap_snaps,
+ struct ceph_cap_snap, ci_item);
+ snap_follows = capsnap->follows;
}
spin_unlock(&ci->i_ceph_lock);
- if (recon_state->flock) {
+ if (recon_state->msg_version >= 2) {
int num_fcntl_locks, num_flock_locks;
struct ceph_filelock *flocks;
+ size_t struct_len, total_len = 0;
+ u8 struct_v = 0;
encode_again:
ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
@@ -2886,20 +2818,51 @@ encode_again:
goto encode_again;
goto out_free;
}
+
+ if (recon_state->msg_version >= 3) {
+ /* version, compat_version and struct_len */
+ total_len = 2 * sizeof(u8) + sizeof(u32);
+ struct_v = 2;
+ }
/*
* number of encoded locks is stable, so copy to pagelist
*/
- rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
- (num_fcntl_locks+num_flock_locks) *
- sizeof(struct ceph_filelock));
- err = ceph_pagelist_append(pagelist, &rec, reclen);
- if (!err)
- err = ceph_locks_to_pagelist(flocks, pagelist,
- num_fcntl_locks,
- num_flock_locks);
+ struct_len = 2 * sizeof(u32) +
+ (num_fcntl_locks + num_flock_locks) *
+ sizeof(struct ceph_filelock);
+ rec.v2.flock_len = cpu_to_le32(struct_len);
+
+ struct_len += sizeof(rec.v2);
+ struct_len += sizeof(u32) + pathlen;
+
+ if (struct_v >= 2)
+ struct_len += sizeof(u64); /* snap_follows */
+
+ total_len += struct_len;
+ err = ceph_pagelist_reserve(pagelist, total_len);
+
+ if (!err) {
+ if (recon_state->msg_version >= 3) {
+ ceph_pagelist_encode_8(pagelist, struct_v);
+ ceph_pagelist_encode_8(pagelist, 1);
+ ceph_pagelist_encode_32(pagelist, struct_len);
+ }
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+ ceph_locks_to_pagelist(flocks, pagelist,
+ num_fcntl_locks,
+ num_flock_locks);
+ if (struct_v >= 2)
+ ceph_pagelist_encode_64(pagelist, snap_follows);
+ }
kfree(flocks);
} else {
- err = ceph_pagelist_append(pagelist, &rec, reclen);
+ size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
+ err = ceph_pagelist_reserve(pagelist, size);
+ if (!err) {
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+ }
}
recon_state->nr_caps++;
@@ -2990,7 +2953,12 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
recon_state.nr_caps = 0;
recon_state.pagelist = pagelist;
- recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+ if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+ recon_state.msg_version = 3;
+ else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+ recon_state.msg_version = 2;
+ else
+ recon_state.msg_version = 1;
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
@@ -3019,8 +2987,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail;
}
- if (recon_state.flock)
- reply->hdr.version = cpu_to_le16(2);
+ reply->hdr.version = cpu_to_le16(recon_state.msg_version);
/* raced with cap release? */
if (s_nr_caps != recon_state.nr_caps) {
@@ -3218,7 +3185,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
WARN_ON(1);
goto release; /* hrm... */
}
- dname.hash = full_name_hash(dname.name, dname.len);
+ dname.hash = full_name_hash(parent, dname.name, dname.len);
dentry = d_lookup(parent, &dname);
dput(parent);
if (!dentry)
@@ -3245,7 +3212,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
msecs_to_jiffies(le32_to_cpu(h->duration_ms));
di->lease_seq = seq;
- dentry->d_time = di->lease_renew_from + duration;
+ di->time = di->lease_renew_from + duration;
di->lease_renew_after = di->lease_renew_from +
(duration >> 1);
di->lease_renew_from = 0;
@@ -3311,47 +3278,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
}
/*
- * Preemptively release a lease we expect to invalidate anyway.
- * Pass @inode always, @dentry is optional.
- */
-void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
- struct dentry *dentry)
-{
- struct ceph_dentry_info *di;
- struct ceph_mds_session *session;
- u32 seq;
-
- BUG_ON(inode == NULL);
- BUG_ON(dentry == NULL);
-
- /* is dentry lease valid? */
- spin_lock(&dentry->d_lock);
- di = ceph_dentry(dentry);
- if (!di || !di->lease_session ||
- di->lease_session->s_mds < 0 ||
- di->lease_gen != di->lease_session->s_cap_gen ||
- !time_before(jiffies, dentry->d_time)) {
- dout("lease_release inode %p dentry %p -- "
- "no lease\n",
- inode, dentry);
- spin_unlock(&dentry->d_lock);
- return;
- }
-
- /* we do have a lease on this dentry; note mds and seq */
- session = ceph_get_mds_session(di->lease_session);
- seq = di->lease_seq;
- __ceph_mdsc_drop_dentry_lease(dentry);
- spin_unlock(&dentry->d_lock);
-
- dout("lease_release inode %p dentry %p to mds%d\n",
- inode, dentry, session->s_mds);
- ceph_mdsc_lease_send_msg(session, inode, dentry,
- CEPH_MDS_LEASE_RELEASE, seq);
- ceph_put_mds_session(session);
-}
-
-/*
* drop all leases (and dentry refs) in preparation for umount
*/
static void drop_leases(struct ceph_mds_client *mdsc)
@@ -3484,7 +3410,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
mdsc->last_cap_flush_tid = 1;
- mdsc->cap_flush_tree = RB_ROOT;
+ INIT_LIST_HEAD(&mdsc->cap_flush_list);
INIT_LIST_HEAD(&mdsc->cap_dirty);
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0;
@@ -3599,7 +3525,7 @@ restart:
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{
- u64 want_tid, want_flush, want_snap;
+ u64 want_tid, want_flush;
if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return;
@@ -3612,17 +3538,19 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
ceph_flush_dirty_caps(mdsc);
spin_lock(&mdsc->cap_dirty_lock);
want_flush = mdsc->last_cap_flush_tid;
+ if (!list_empty(&mdsc->cap_flush_list)) {
+ struct ceph_cap_flush *cf =
+ list_last_entry(&mdsc->cap_flush_list,
+ struct ceph_cap_flush, g_list);
+ cf->wake = true;
+ }
spin_unlock(&mdsc->cap_dirty_lock);
- down_read(&mdsc->snap_rwsem);
- want_snap = mdsc->last_snap_seq;
- up_read(&mdsc->snap_rwsem);
-
- dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
- want_tid, want_flush, want_snap);
+ dout("sync want tid %lld flush_seq %lld\n",
+ want_tid, want_flush);
wait_unsafe_requests(mdsc, want_tid);
- wait_caps_flush(mdsc, want_flush, want_snap);
+ wait_caps_flush(mdsc, want_flush);
}
/*
@@ -3743,11 +3671,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
dout("mdsc_destroy %p done\n", mdsc);
}
+void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+ struct ceph_fs_client *fsc = mdsc->fsc;
+ const char *mds_namespace = fsc->mount_options->mds_namespace;
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
+ u32 epoch;
+ u32 map_len;
+ u32 num_fs;
+ u32 mount_fscid = (u32)-1;
+ u8 struct_v, struct_cv;
+ int err = -EINVAL;
+
+ ceph_decode_need(&p, end, sizeof(u32), bad);
+ epoch = ceph_decode_32(&p);
+
+ dout("handle_fsmap epoch %u\n", epoch);
+
+ ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+ struct_v = ceph_decode_8(&p);
+ struct_cv = ceph_decode_8(&p);
+ map_len = ceph_decode_32(&p);
+
+ ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+ p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+
+ num_fs = ceph_decode_32(&p);
+ while (num_fs-- > 0) {
+ void *info_p, *info_end;
+ u32 info_len;
+ u8 info_v, info_cv;
+ u32 fscid, namelen;
+
+ ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+ info_v = ceph_decode_8(&p);
+ info_cv = ceph_decode_8(&p);
+ info_len = ceph_decode_32(&p);
+ ceph_decode_need(&p, end, info_len, bad);
+ info_p = p;
+ info_end = p + info_len;
+ p = info_end;
+
+ ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+ fscid = ceph_decode_32(&info_p);
+ namelen = ceph_decode_32(&info_p);
+ ceph_decode_need(&info_p, info_end, namelen, bad);
+
+ if (mds_namespace &&
+ strlen(mds_namespace) == namelen &&
+ !strncmp(mds_namespace, (char *)info_p, namelen)) {
+ mount_fscid = fscid;
+ break;
+ }
+ }
+
+ ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+ if (mount_fscid != (u32)-1) {
+ fsc->client->monc.fs_cluster_id = mount_fscid;
+ ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+ 0, true);
+ ceph_monc_renew_subs(&fsc->client->monc);
+ } else {
+ err = -ENOENT;
+ goto err_out;
+ }
+ return;
+bad:
+ pr_err("error decoding fsmap\n");
+err_out:
+ mutex_lock(&mdsc->mutex);
+ mdsc->mdsmap_err = -ENOENT;
+ __wake_requests(mdsc, &mdsc->waiting_for_map);
+ mutex_unlock(&mdsc->mutex);
+ return;
+}
/*
* handle mds map update.
*/
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
u32 epoch;
u32 maplen;
@@ -3854,7 +3857,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
switch (type) {
case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(mdsc, msg);
+ ceph_mdsc_handle_mdsmap(mdsc, msg);
+ break;
+ case CEPH_MSG_FS_MAP_USER:
+ ceph_mdsc_handle_fsmap(mdsc, msg);
break;
case CEPH_MSG_CLIENT_SESSION:
handle_session(s, msg);
@@ -3900,7 +3906,7 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
struct ceph_auth_handshake *auth = &s->s_auth;
if (force_new && auth->authorizer) {
- ceph_auth_destroy_authorizer(ac, auth->authorizer);
+ ceph_auth_destroy_authorizer(auth->authorizer);
auth->authorizer = NULL;
}
if (!auth->authorizer) {
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 37712ccffcc6..6b3679737d4a 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -45,6 +45,15 @@ struct ceph_mds_reply_info_in {
u32 inline_len;
char *inline_data;
u32 pool_ns_len;
+ char *pool_ns_data;
+};
+
+struct ceph_mds_reply_dir_entry {
+ char *name;
+ u32 name_len;
+ struct ceph_mds_reply_lease *lease;
+ struct ceph_mds_reply_info_in inode;
+ loff_t offset;
};
/*
@@ -73,11 +82,10 @@ struct ceph_mds_reply_info_parsed {
struct ceph_mds_reply_dirfrag *dir_dir;
size_t dir_buf_size;
int dir_nr;
- char **dir_dname;
- u32 *dir_dname_len;
- struct ceph_mds_reply_lease **dir_dlease;
- struct ceph_mds_reply_info_in *dir_in;
- u8 dir_complete, dir_end;
+ bool dir_complete;
+ bool dir_end;
+ bool hash_order;
+ struct ceph_mds_reply_dir_entry *dir_entries;
};
/* for create results */
@@ -97,7 +105,7 @@ struct ceph_mds_reply_info_parsed {
/*
* cap releases are batched and sent to the MDS en masse.
*/
-#define CEPH_CAPS_PER_RELEASE ((PAGE_CACHE_SIZE - \
+#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - \
sizeof(struct ceph_mds_cap_release)) / \
sizeof(struct ceph_mds_cap_item))
@@ -144,7 +152,6 @@ struct ceph_mds_session {
/* protected by mutex */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
- struct list_head s_cap_snaps_flushing;
unsigned long s_renew_requested; /* last time we sent a renew req */
u64 s_renew_seq;
@@ -268,8 +275,10 @@ struct ceph_mds_request {
struct ceph_pool_perm {
struct rb_node node;
- u32 pool;
int perm;
+ s64 pool;
+ size_t pool_ns_len;
+ char pool_ns[];
};
/*
@@ -283,6 +292,7 @@ struct ceph_mds_client {
struct completion safe_umount_waiters;
wait_queue_head_t session_close_wq;
struct list_head waiting_for_map;
+ int mdsmap_err;
struct ceph_mds_session **sessions; /* NULL for mds if no session */
atomic_t num_sessions;
@@ -314,7 +324,7 @@ struct ceph_mds_client {
spinlock_t snap_flush_lock;
u64 last_cap_flush_tid;
- struct rb_root cap_flush_tree;
+ struct list_head cap_flush_list;
struct list_head cap_dirty; /* inodes with dirty caps */
struct list_head cap_dirty_migrating; /* ...that are migration... */
int num_cap_flushing; /* # caps we are flushing */
@@ -375,10 +385,6 @@ extern void ceph_mdsc_destroy(struct ceph_fs_client *fsc);
extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
-extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
- struct inode *inode,
- struct dentry *dn);
-
extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct inode *dir);
@@ -413,8 +419,10 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
struct dentry *dentry, char action,
u32 seq);
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
+extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
extern struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 261531e55e9d..8c3591a7fbae 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -54,16 +54,21 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
const void *start = *p;
int i, j, n;
int err = -EINVAL;
- u16 version;
+ u8 mdsmap_v, mdsmap_cv;
m = kzalloc(sizeof(*m), GFP_NOFS);
if (m == NULL)
return ERR_PTR(-ENOMEM);
- ceph_decode_16_safe(p, end, version, bad);
- if (version > 3) {
- pr_warn("got mdsmap version %d > 3, failing", version);
- goto bad;
+ ceph_decode_need(p, end, 1 + 1, bad);
+ mdsmap_v = ceph_decode_8(p);
+ mdsmap_cv = ceph_decode_8(p);
+ if (mdsmap_v >= 4) {
+ u32 mdsmap_len;
+ ceph_decode_32_safe(p, end, mdsmap_len, bad);
+ if (end < *p + mdsmap_len)
+ goto bad;
+ end = *p + mdsmap_len;
}
ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad);
@@ -87,16 +92,29 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
u32 namelen;
s32 mds, inc, state;
u64 state_seq;
- u8 infoversion;
+ u8 info_v;
+ void *info_end = NULL;
struct ceph_entity_addr addr;
u32 num_export_targets;
void *pexport_targets = NULL;
struct ceph_timespec laggy_since;
struct ceph_mds_info *info;
- ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
+ ceph_decode_need(p, end, sizeof(u64) + 1, bad);
global_id = ceph_decode_64(p);
- infoversion = ceph_decode_8(p);
+ info_v= ceph_decode_8(p);
+ if (info_v >= 4) {
+ u32 info_len;
+ u8 info_cv;
+ ceph_decode_need(p, end, 1 + sizeof(u32), bad);
+ info_cv = ceph_decode_8(p);
+ info_len = ceph_decode_32(p);
+ info_end = *p + info_len;
+ if (info_end > end)
+ goto bad;
+ }
+
+ ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
*p += sizeof(u64);
namelen = ceph_decode_32(p); /* skip mds name */
*p += namelen;
@@ -115,7 +133,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
*p += sizeof(u32);
ceph_decode_32_safe(p, end, namelen, bad);
*p += namelen;
- if (infoversion >= 2) {
+ if (info_v >= 2) {
ceph_decode_32_safe(p, end, num_export_targets, bad);
pexport_targets = *p;
*p += num_export_targets * sizeof(u32);
@@ -123,6 +141,12 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
num_export_targets = 0;
}
+ if (info_end && *p != info_end) {
+ if (*p > info_end)
+ goto bad;
+ *p = info_end;
+ }
+
dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n",
i+1, n, global_id, mds, inc,
ceph_pr_addr(&addr.in_addr),
@@ -163,6 +187,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_cas_pg_pool = ceph_decode_64(p);
/* ok, we don't care about the rest. */
+ *p = end;
dout("mdsmap_decode success epoch %u\n", m->m_epoch);
return m;
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 9caaa7ffc93f..9ff5219d849e 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -520,9 +520,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
ihold(inode);
atomic_set(&capsnap->nref, 1);
- capsnap->ci = ci;
INIT_LIST_HEAD(&capsnap->ci_item);
- INIT_LIST_HEAD(&capsnap->flushing_item);
capsnap->follows = old_snapc->seq;
capsnap->issued = __ceph_caps_issued(ci, NULL);
@@ -551,7 +549,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
ci->i_wrbuffer_ref_head = 0;
capsnap->context = old_snapc;
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
- old_snapc = NULL;
if (used & CEPH_CAP_FILE_WR) {
dout("queue_cap_snap %p cap_snap %p snapc %p"
@@ -563,6 +560,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
__ceph_finish_cap_snap(ci, capsnap);
}
capsnap = NULL;
+ old_snapc = NULL;
update_snapc:
if (ci->i_head_snapc) {
@@ -603,6 +601,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->dirty_pages);
return 0;
}
+
+ ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context,
capsnap->context->seq, ceph_cap_string(capsnap->dirty),
@@ -799,9 +799,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
inode = &ci->vfs_inode;
ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
- spin_lock(&ci->i_ceph_lock);
- __ceph_flush_snaps(ci, &session, 0);
- spin_unlock(&ci->i_ceph_lock);
+ ceph_flush_snaps(ci, &session);
iput(inode);
spin_lock(&mdsc->snap_flush_lock);
}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index c973043deb0e..e247f6f0feb7 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -120,6 +120,7 @@ enum {
Opt_last_int,
/* int args above */
Opt_snapdirname,
+ Opt_mds_namespace,
Opt_last_string,
/* string args above */
Opt_dirstat,
@@ -154,6 +155,7 @@ static match_table_t fsopt_tokens = {
{Opt_congestion_kb, "write_congestion_kb=%d"},
/* int args above */
{Opt_snapdirname, "snapdirname=%s"},
+ {Opt_mds_namespace, "mds_namespace=%s"},
/* string args above */
{Opt_dirstat, "dirstat"},
{Opt_nodirstat, "nodirstat"},
@@ -210,7 +212,13 @@ static int parse_fsopt_token(char *c, void *private)
if (!fsopt->snapdir_name)
return -ENOMEM;
break;
-
+ case Opt_mds_namespace:
+ fsopt->mds_namespace = kstrndup(argstr[0].from,
+ argstr[0].to-argstr[0].from,
+ GFP_KERNEL);
+ if (!fsopt->mds_namespace)
+ return -ENOMEM;
+ break;
/* misc */
case Opt_wsize:
fsopt->wsize = intval;
@@ -297,6 +305,8 @@ static void destroy_mount_options(struct ceph_mount_options *args)
{
dout("destroy_mount_options %p\n", args);
kfree(args->snapdir_name);
+ kfree(args->mds_namespace);
+ kfree(args->server_path);
kfree(args);
}
@@ -327,6 +337,13 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
if (ret)
return ret;
+ ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
+ if (ret)
+ return ret;
+
+ ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
+ if (ret)
+ return ret;
return ceph_compare_options(new_opt, fsc->client);
}
@@ -334,8 +351,7 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
static int parse_mount_options(struct ceph_mount_options **pfsopt,
struct ceph_options **popt,
int flags, char *options,
- const char *dev_name,
- const char **path)
+ const char *dev_name)
{
struct ceph_mount_options *fsopt;
const char *dev_name_end;
@@ -380,12 +396,13 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
*/
dev_name_end = strchr(dev_name, '/');
if (dev_name_end) {
- /* skip over leading '/' for path */
- *path = dev_name_end + 1;
+ fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
+ if (!fsopt->server_path) {
+ err = -ENOMEM;
+ goto out;
+ }
} else {
- /* path is empty */
dev_name_end = dev_name + strlen(dev_name);
- *path = dev_name_end;
}
err = -EINVAL;
dev_name_end--; /* back up to ':' separator */
@@ -395,7 +412,8 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
goto out;
}
dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
- dout("server path '%s'\n", *path);
+ if (fsopt->server_path)
+ dout("server path '%s'\n", fsopt->server_path);
*popt = ceph_parse_options(options, dev_name, dev_name_end,
parse_fsopt_token, (void *)fsopt);
@@ -457,6 +475,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noacl");
#endif
+ if (fsopt->mds_namespace)
+ seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
@@ -495,9 +515,11 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
switch (type) {
case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(fsc->mdsc, msg);
+ ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
+ return 0;
+ case CEPH_MSG_FS_MAP_USER:
+ ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
return 0;
-
default:
return -1;
}
@@ -511,9 +533,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
{
struct ceph_fs_client *fsc;
const u64 supported_features =
- CEPH_FEATURE_FLOCK |
- CEPH_FEATURE_DIRLAYOUTHASH |
- CEPH_FEATURE_MDS_INLINE_DATA;
+ CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
+ CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
const u64 required_features = 0;
int page_count;
size_t size;
@@ -530,7 +551,14 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail;
}
fsc->client->extra_mon_dispatch = extra_mon_dispatch;
- ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
+
+ if (fsopt->mds_namespace == NULL) {
+ ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+ 0, true);
+ } else {
+ ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
+ 0, false);
+ }
fsc->mount_options = fsopt;
@@ -560,7 +588,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
/* set up mempools */
err = -ENOMEM;
- page_count = fsc->mount_options->wsize >> PAGE_CACHE_SHIFT;
+ page_count = fsc->mount_options->wsize >> PAGE_SHIFT;
size = sizeof (struct page *) * (page_count ? page_count : 1);
fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
if (!fsc->wb_pagevec_pool)
@@ -658,8 +686,8 @@ static int __init init_caches(void)
if (ceph_dentry_cachep == NULL)
goto bad_dentry;
- ceph_file_cachep = KMEM_CACHE(ceph_file_info,
- SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
+ ceph_file_cachep = KMEM_CACHE(ceph_file_info, SLAB_MEM_SPREAD);
+
if (ceph_file_cachep == NULL)
goto bad_file;
@@ -717,6 +745,7 @@ static const struct super_operations ceph_super_ops = {
.destroy_inode = ceph_destroy_inode,
.write_inode = ceph_write_inode,
.drop_inode = ceph_drop_inode,
+ .evict_inode = ceph_evict_inode,
.sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
.show_options = ceph_show_options,
@@ -785,8 +814,7 @@ out:
/*
* mount: join the ceph cluster, and open root directory.
*/
-static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
- const char *path)
+static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
{
int err;
unsigned long started = jiffies; /* note the start time */
@@ -815,11 +843,12 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
goto fail;
}
- if (path[0] == 0) {
+ if (!fsc->mount_options->server_path) {
root = fsc->sb->s_root;
dget(root);
} else {
- dout("mount opening base mountpoint\n");
+ const char *path = fsc->mount_options->server_path + 1;
+ dout("mount opening path %s\n", path);
root = open_root_dentry(fsc, path, started);
if (IS_ERR(root)) {
err = PTR_ERR(root);
@@ -912,13 +941,13 @@ static int ceph_register_bdi(struct super_block *sb,
int err;
/* set ra_pages based on rasize mount option? */
- if (fsc->mount_options->rasize >= PAGE_CACHE_SIZE)
+ if (fsc->mount_options->rasize >= PAGE_SIZE)
fsc->backing_dev_info.ra_pages =
- (fsc->mount_options->rasize + PAGE_CACHE_SIZE - 1)
+ (fsc->mount_options->rasize + PAGE_SIZE - 1)
>> PAGE_SHIFT;
else
fsc->backing_dev_info.ra_pages =
- VM_MAX_READAHEAD * 1024 / PAGE_CACHE_SIZE;
+ VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
atomic_long_inc_return(&bdi_seq));
@@ -935,7 +964,6 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
struct dentry *res;
int err;
int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
- const char *path = NULL;
struct ceph_mount_options *fsopt = NULL;
struct ceph_options *opt = NULL;
@@ -944,7 +972,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
#ifdef CONFIG_CEPH_FS_POSIX_ACL
flags |= MS_POSIXACL;
#endif
- err = parse_mount_options(&fsopt, &opt, flags, data, dev_name, &path);
+ err = parse_mount_options(&fsopt, &opt, flags, data, dev_name);
if (err < 0) {
res = ERR_PTR(err);
goto out_final;
@@ -987,7 +1015,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
}
}
- res = ceph_real_mount(fsc, path);
+ res = ceph_real_mount(fsc);
if (IS_ERR(res))
goto out_splat;
dout("root %p inode %p ino %llx.%llx\n", res,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e705c4d612d7..3e3fa9163059 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -69,6 +69,8 @@ struct ceph_mount_options {
*/
char *snapdir_name; /* default ".snap" */
+ char *mds_namespace; /* default NULL */
+ char *server_path; /* default "/" */
};
struct ceph_fs_client {
@@ -101,7 +103,6 @@ struct ceph_fs_client {
#ifdef CONFIG_CEPH_FSCACHE
struct fscache_cookie *fscache;
- struct workqueue_struct *revalidate_wq;
#endif
};
@@ -146,6 +147,14 @@ struct ceph_cap {
#define CHECK_CAPS_AUTHONLY 2 /* only check auth cap */
#define CHECK_CAPS_FLUSH 4 /* flush any dirty caps */
+struct ceph_cap_flush {
+ u64 tid;
+ int caps; /* 0 means capsnap */
+ bool wake; /* wake up flush waiters when finish ? */
+ struct list_head g_list; // global
+ struct list_head i_list; // per inode
+};
+
/*
* Snapped cap state that is pending flush to mds. When a snapshot occurs,
* we first complete any in-process sync writes and writeback any dirty
@@ -153,10 +162,11 @@ struct ceph_cap {
*/
struct ceph_cap_snap {
atomic_t nref;
- struct ceph_inode_info *ci;
- struct list_head ci_item, flushing_item;
+ struct list_head ci_item;
- u64 follows, flush_tid;
+ struct ceph_cap_flush cap_flush;
+
+ u64 follows;
int issued, dirty;
struct ceph_snap_context *context;
@@ -185,16 +195,6 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
}
}
-struct ceph_cap_flush {
- u64 tid;
- int caps;
- struct rb_node g_node; // global
- union {
- struct rb_node i_node; // inode
- struct list_head list;
- };
-};
-
/*
* The frag tree describes how a directory is fragmented, potentially across
* multiple metadata servers. It is also used to indicate points where
@@ -245,7 +245,7 @@ struct ceph_dentry_info {
unsigned long lease_renew_after, lease_renew_from;
struct list_head lru;
struct dentry *dentry;
- u64 time;
+ unsigned long time;
u64 offset;
};
@@ -286,7 +286,6 @@ struct ceph_inode_info {
struct ceph_dir_layout i_dir_layout;
struct ceph_file_layout i_layout;
- size_t i_pool_ns_len;
char *i_symlink;
/* for dirs */
@@ -295,6 +294,7 @@ struct ceph_inode_info {
u64 i_files, i_subdirs;
struct rb_root i_fragtree;
+ int i_fragtree_nsplits;
struct mutex i_fragtree_mutex;
struct ceph_inode_xattrs_info i_xattrs;
@@ -309,7 +309,7 @@ struct ceph_inode_info {
* overlapping, pipelined cap flushes to the mds. we can probably
* reduce the tid to 8 bits if we're concerned about inode size. */
struct ceph_cap_flush *i_prealloc_cap_flush;
- struct rb_root i_cap_flush_tree;
+ struct list_head i_cap_flush_list;
wait_queue_head_t i_cap_wq; /* threads waiting on a capability */
unsigned long i_hold_caps_min; /* jiffies */
unsigned long i_hold_caps_max; /* jiffies */
@@ -320,7 +320,7 @@ struct ceph_inode_info {
dirty|flushing caps */
unsigned i_snap_caps; /* cap bits for snapped files */
- int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
+ int i_nr_by_mode[CEPH_FILE_MODE_BITS]; /* open file counts */
struct mutex i_truncate_mutex;
u32 i_truncate_seq; /* last truncate to smaller size */
@@ -357,8 +357,7 @@ struct ceph_inode_info {
#ifdef CONFIG_CEPH_FSCACHE
struct fscache_cookie *fscache;
- u32 i_fscache_gen; /* sequence, for delayed fscache validate */
- struct work_struct i_revalidate_work;
+ u32 i_fscache_gen;
#endif
struct inode vfs_inode; /* at end */
};
@@ -469,6 +468,9 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
+#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
+#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */
+#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count,
@@ -537,11 +539,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
return (struct ceph_dentry_info *)dentry->d_fsdata;
}
-static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
-{
- return ((loff_t)frag << 32) | (loff_t)off;
-}
-
/*
* caps helpers
*/
@@ -632,7 +629,6 @@ struct ceph_file_info {
struct ceph_mds_request *last_readdir;
/* readdir: position within a frag */
- unsigned offset; /* offset of last chunk, adjusted for . and .. */
unsigned next_offset; /* offset of next chunk (last_name's + 1) */
char *last_name; /* last entry in previous chunk */
long long dir_release_count;
@@ -754,6 +750,7 @@ extern const struct inode_operations ceph_file_iops;
extern struct inode *ceph_alloc_inode(struct super_block *sb);
extern void ceph_destroy_inode(struct inode *inode);
extern int ceph_drop_inode(struct inode *inode);
+extern void ceph_evict_inode(struct inode *inode);
extern struct inode *ceph_get_inode(struct super_block *sb,
struct ceph_vino vino);
@@ -785,19 +782,15 @@ static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
return __ceph_do_getattr(inode, NULL, mask, force);
}
extern int ceph_permission(struct inode *inode, int mask);
+extern int __ceph_setattr(struct inode *inode, struct iattr *attr);
extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
struct kstat *stat);
/* xattr.c */
-extern int ceph_setxattr(struct dentry *, const char *, const void *,
- size_t, int);
-int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int);
+int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
-int __ceph_removexattr(struct dentry *, const char *);
-extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
-extern int ceph_removexattr(struct dentry *, const char *);
extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
extern void __init ceph_xattr_init(void);
@@ -898,9 +891,8 @@ extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc);
-extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
- struct ceph_mds_session **psession,
- int again);
+extern void ceph_flush_snaps(struct ceph_inode_info *ci,
+ struct ceph_mds_session **psession);
extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session);
extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
@@ -915,10 +907,7 @@ extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page);
/* for counting open files by mode */
-static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
-{
- ci->i_nr_by_mode[mode]++;
-}
+extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
/* addr.c */
@@ -931,6 +920,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
/* file.c */
extern const struct file_operations ceph_file_fops;
+extern int ceph_renew_caps(struct inode *inode);
extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode,
@@ -938,6 +928,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
extern int ceph_release(struct inode *inode, struct file *filp);
extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
char *data, size_t len);
+extern void ceph_sync_write_wait(struct inode *inode);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct file_operations ceph_snapdir_fops;
@@ -946,6 +937,7 @@ extern const struct inode_operations ceph_snapdir_iops;
extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops;
+extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
extern int ceph_handle_snapdir(struct ceph_mds_request *req,
struct dentry *dentry, int err);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 9410abdef3ce..adc231892b0d 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -16,6 +16,8 @@
static int __remove_xattr(struct ceph_inode_info *ci,
struct ceph_inode_xattr *xattr);
+const struct xattr_handler ceph_other_xattr_handler;
+
/*
* List of handlers for synthetic system.* attributes. Other
* attributes are handled directly.
@@ -25,6 +27,7 @@ const struct xattr_handler *ceph_xattr_handlers[] = {
&posix_acl_access_xattr_handler,
&posix_acl_default_xattr_handler,
#endif
+ &ceph_other_xattr_handler,
NULL,
};
@@ -33,7 +36,6 @@ static bool ceph_is_valid_xattr(const char *name)
return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
!strncmp(name, XATTR_SECURITY_PREFIX,
XATTR_SECURITY_PREFIX_LEN) ||
- !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) ||
!strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
!strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
}
@@ -55,81 +57,88 @@ struct ceph_vxattr {
static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
{
- size_t s;
- char *p = (char *)&ci->i_layout;
-
- for (s = 0; s < sizeof(ci->i_layout); s++, p++)
- if (*p)
- return true;
- return false;
+ struct ceph_file_layout *fl = &ci->i_layout;
+ return (fl->stripe_unit > 0 || fl->stripe_count > 0 ||
+ fl->object_size > 0 || fl->pool_id >= 0 ||
+ rcu_dereference_raw(fl->pool_ns) != NULL);
}
static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
size_t size)
{
- int ret;
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc;
- s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
+ struct ceph_string *pool_ns;
+ s64 pool = ci->i_layout.pool_id;
const char *pool_name;
+ const char *ns_field = " pool_namespace=";
char buf[128];
+ size_t len, total_len = 0;
+ int ret;
+
+ pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
- down_read(&osdc->map_sem);
+ down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name) {
- size_t len = strlen(pool_name);
- ret = snprintf(buf, sizeof(buf),
- "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=",
- (unsigned long long)ceph_file_layout_su(ci->i_layout),
- (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
- (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
- if (!size) {
- ret += len;
- } else if (ret + len > size) {
- ret = -ERANGE;
- } else {
- memcpy(val, buf, ret);
+ len = snprintf(buf, sizeof(buf),
+ "stripe_unit=%u stripe_count=%u object_size=%u pool=",
+ ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
+ ci->i_layout.object_size);
+ total_len = len + strlen(pool_name);
+ } else {
+ len = snprintf(buf, sizeof(buf),
+ "stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
+ ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
+ ci->i_layout.object_size, (unsigned long long)pool);
+ total_len = len;
+ }
+
+ if (pool_ns)
+ total_len += strlen(ns_field) + pool_ns->len;
+
+ if (!size) {
+ ret = total_len;
+ } else if (total_len > size) {
+ ret = -ERANGE;
+ } else {
+ memcpy(val, buf, len);
+ ret = len;
+ if (pool_name) {
+ len = strlen(pool_name);
memcpy(val + ret, pool_name, len);
ret += len;
}
- } else {
- ret = snprintf(buf, sizeof(buf),
- "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
- (unsigned long long)ceph_file_layout_su(ci->i_layout),
- (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
- (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
- (unsigned long long)pool);
- if (size) {
- if (ret <= size)
- memcpy(val, buf, ret);
- else
- ret = -ERANGE;
+ if (pool_ns) {
+ len = strlen(ns_field);
+ memcpy(val + ret, ns_field, len);
+ ret += len;
+ memcpy(val + ret, pool_ns->str, pool_ns->len);
+ ret += pool_ns->len;
}
}
- up_read(&osdc->map_sem);
+ up_read(&osdc->lock);
+ ceph_put_string(pool_ns);
return ret;
}
static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
char *val, size_t size)
{
- return snprintf(val, size, "%lld",
- (unsigned long long)ceph_file_layout_su(ci->i_layout));
+ return snprintf(val, size, "%u", ci->i_layout.stripe_unit);
}
static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
char *val, size_t size)
{
- return snprintf(val, size, "%lld",
- (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout));
+ return snprintf(val, size, "%u", ci->i_layout.stripe_count);
}
static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
char *val, size_t size)
{
- return snprintf(val, size, "%lld",
- (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
+ return snprintf(val, size, "%u", ci->i_layout.object_size);
}
static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
@@ -138,16 +147,28 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
int ret;
struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
struct ceph_osd_client *osdc = &fsc->client->osdc;
- s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
+ s64 pool = ci->i_layout.pool_id;
const char *pool_name;
- down_read(&osdc->map_sem);
+ down_read(&osdc->lock);
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
if (pool_name)
ret = snprintf(val, size, "%s", pool_name);
else
ret = snprintf(val, size, "%lld", (unsigned long long)pool);
- up_read(&osdc->map_sem);
+ up_read(&osdc->lock);
+ return ret;
+}
+
+static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
+ char *val, size_t size)
+{
+ int ret = 0;
+ struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
+ if (ns) {
+ ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
+ ceph_put_string(ns);
+ }
return ret;
}
@@ -239,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
XATTR_LAYOUT_FIELD(dir, layout, object_size),
XATTR_LAYOUT_FIELD(dir, layout, pool),
+ XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
XATTR_NAME_CEPH(dir, entries),
XATTR_NAME_CEPH(dir, files),
XATTR_NAME_CEPH(dir, subdirs),
@@ -266,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
XATTR_LAYOUT_FIELD(file, layout, stripe_count),
XATTR_LAYOUT_FIELD(file, layout, object_size),
XATTR_LAYOUT_FIELD(file, layout, pool),
+ XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
{ .name = NULL, 0 } /* Required table terminator */
};
static size_t ceph_file_vxattrs_name_size; /* total size of all names */
@@ -496,19 +519,6 @@ static int __remove_xattr(struct ceph_inode_info *ci,
return 0;
}
-static int __remove_xattr_by_name(struct ceph_inode_info *ci,
- const char *name)
-{
- struct rb_node **p;
- struct ceph_inode_xattr *xattr;
- int err;
-
- p = &ci->i_xattrs.index.rb_node;
- xattr = __get_xattr(ci, name);
- err = __remove_xattr(ci, xattr);
- return err;
-}
-
static char *__copy_xattr_names(struct ceph_inode_info *ci,
char *dest)
{
@@ -740,9 +750,6 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
int req_mask;
int err;
- if (!ceph_is_valid_xattr(name))
- return -ENODATA;
-
/* let's see if a virtual xattr was requested */
vxattr = ceph_match_vxattr(inode, name);
if (vxattr) {
@@ -804,15 +811,6 @@ out:
return err;
}
-ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
- size_t size)
-{
- if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
- return generic_getxattr(dentry, name, value, size);
-
- return __ceph_getxattr(d_inode(dentry), name, value, size);
-}
-
ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
{
struct inode *inode = d_inode(dentry);
@@ -877,15 +875,15 @@ out:
return err;
}
-static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
+static int ceph_sync_setxattr(struct inode *inode, const char *name,
const char *value, size_t size, int flags)
{
- struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
- struct inode *inode = d_inode(dentry);
+ struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_pagelist *pagelist = NULL;
+ int op = CEPH_MDS_OP_SETXATTR;
int err;
if (size > 0) {
@@ -899,20 +897,21 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
if (err)
goto out;
} else if (!value) {
- flags |= CEPH_XATTR_REMOVE;
+ if (flags & CEPH_XATTR_REPLACE)
+ op = CEPH_MDS_OP_RMXATTR;
+ else
+ flags |= CEPH_XATTR_REMOVE;
}
dout("setxattr value=%.*s\n", (int)size, value);
/* do request */
- req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
- USE_AUTH_MDS);
+ req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
- req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_path2 = kstrdup(name, GFP_NOFS);
if (!req->r_path2) {
ceph_mdsc_put_request(req);
@@ -920,8 +919,11 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
goto out;
}
- req->r_pagelist = pagelist;
- pagelist = NULL;
+ if (op == CEPH_MDS_OP_SETXATTR) {
+ req->r_args.setxattr.flags = cpu_to_le32(flags);
+ req->r_pagelist = pagelist;
+ pagelist = NULL;
+ }
req->r_inode = inode;
ihold(inode);
@@ -939,13 +941,12 @@ out:
return err;
}
-int __ceph_setxattr(struct dentry *dentry, const char *name,
+int __ceph_setxattr(struct inode *inode, const char *name,
const void *value, size_t size, int flags)
{
- struct inode *inode = d_inode(dentry);
struct ceph_vxattr *vxattr;
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_cap_flush *prealloc_cf = NULL;
int issued;
int err;
@@ -958,8 +959,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
int required_blob_size;
bool lock_snap_rwsem = false;
- if (!ceph_is_valid_xattr(name))
- return -EOPNOTSUPP;
+ if (ceph_snap(inode) != CEPH_NOSNAP)
+ return -EROFS;
vxattr = ceph_match_vxattr(inode, name);
if (vxattr && vxattr->readonly)
@@ -1056,7 +1057,7 @@ do_sync_unlocked:
"during filling trace\n", inode);
err = -EBUSY;
} else {
- err = ceph_sync_setxattr(dentry, name, value, size, flags);
+ err = ceph_sync_setxattr(inode, name, value, size, flags);
}
out:
ceph_free_cap_flush(prealloc_cf);
@@ -1066,146 +1067,30 @@ out:
return err;
}
-int ceph_setxattr(struct dentry *dentry, const char *name,
- const void *value, size_t size, int flags)
+static int ceph_get_xattr_handler(const struct xattr_handler *handler,
+ struct dentry *dentry, struct inode *inode,
+ const char *name, void *value, size_t size)
{
- if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
- return -EROFS;
-
- if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
- return generic_setxattr(dentry, name, value, size, flags);
-
- if (size == 0)
- value = ""; /* empty EA, do not remove */
-
- return __ceph_setxattr(dentry, name, value, size, flags);
-}
-
-static int ceph_send_removexattr(struct dentry *dentry, const char *name)
-{
- struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
- struct ceph_mds_client *mdsc = fsc->mdsc;
- struct inode *inode = d_inode(dentry);
- struct ceph_mds_request *req;
- int err;
-
- req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RMXATTR,
- USE_AUTH_MDS);
- if (IS_ERR(req))
- return PTR_ERR(req);
- req->r_path2 = kstrdup(name, GFP_NOFS);
- if (!req->r_path2)
- return -ENOMEM;
-
- req->r_inode = inode;
- ihold(inode);
- req->r_num_caps = 1;
- req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
- err = ceph_mdsc_do_request(mdsc, NULL, req);
- ceph_mdsc_put_request(req);
- return err;
+ if (!ceph_is_valid_xattr(name))
+ return -EOPNOTSUPP;
+ return __ceph_getxattr(inode, name, value, size);
}
-int __ceph_removexattr(struct dentry *dentry, const char *name)
+static int ceph_set_xattr_handler(const struct xattr_handler *handler,
+ struct dentry *unused, struct inode *inode,
+ const char *name, const void *value,
+ size_t size, int flags)
{
- struct inode *inode = d_inode(dentry);
- struct ceph_vxattr *vxattr;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
- struct ceph_cap_flush *prealloc_cf = NULL;
- int issued;
- int err;
- int required_blob_size;
- int dirty;
- bool lock_snap_rwsem = false;
-
if (!ceph_is_valid_xattr(name))
return -EOPNOTSUPP;
-
- vxattr = ceph_match_vxattr(inode, name);
- if (vxattr && vxattr->readonly)
- return -EOPNOTSUPP;
-
- /* pass any unhandled ceph.* xattrs through to the MDS */
- if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
- goto do_sync_unlocked;
-
- prealloc_cf = ceph_alloc_cap_flush();
- if (!prealloc_cf)
- return -ENOMEM;
-
- err = -ENOMEM;
- spin_lock(&ci->i_ceph_lock);
-retry:
- issued = __ceph_caps_issued(ci, NULL);
- if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
- goto do_sync;
-
- if (!lock_snap_rwsem && !ci->i_head_snapc) {
- lock_snap_rwsem = true;
- if (!down_read_trylock(&mdsc->snap_rwsem)) {
- spin_unlock(&ci->i_ceph_lock);
- down_read(&mdsc->snap_rwsem);
- spin_lock(&ci->i_ceph_lock);
- goto retry;
- }
- }
-
- dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
-
- __build_xattrs(inode);
-
- required_blob_size = __get_required_blob_size(ci, 0, 0);
-
- if (!ci->i_xattrs.prealloc_blob ||
- required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
- struct ceph_buffer *blob;
-
- spin_unlock(&ci->i_ceph_lock);
- dout(" preaallocating new blob size=%d\n", required_blob_size);
- blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
- if (!blob)
- goto do_sync_unlocked;
- spin_lock(&ci->i_ceph_lock);
- if (ci->i_xattrs.prealloc_blob)
- ceph_buffer_put(ci->i_xattrs.prealloc_blob);
- ci->i_xattrs.prealloc_blob = blob;
- goto retry;
- }
-
- err = __remove_xattr_by_name(ceph_inode(inode), name);
-
- dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
- &prealloc_cf);
- ci->i_xattrs.dirty = true;
- inode->i_ctime = current_fs_time(inode->i_sb);
- spin_unlock(&ci->i_ceph_lock);
- if (lock_snap_rwsem)
- up_read(&mdsc->snap_rwsem);
- if (dirty)
- __mark_inode_dirty(inode, dirty);
- ceph_free_cap_flush(prealloc_cf);
- return err;
-do_sync:
- spin_unlock(&ci->i_ceph_lock);
-do_sync_unlocked:
- if (lock_snap_rwsem)
- up_read(&mdsc->snap_rwsem);
- ceph_free_cap_flush(prealloc_cf);
- err = ceph_send_removexattr(dentry, name);
- return err;
+ return __ceph_setxattr(inode, name, value, size, flags);
}
-int ceph_removexattr(struct dentry *dentry, const char *name)
-{
- if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
- return -EROFS;
-
- if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
- return generic_removexattr(dentry, name);
-
- return __ceph_removexattr(dentry, name);
-}
+const struct xattr_handler ceph_other_xattr_handler = {
+ .prefix = "", /* match any name => handlers called with full name */
+ .get = ceph_get_xattr_handler,
+ .set = ceph_set_xattr_handler,
+};
#ifdef CONFIG_SECURITY
bool ceph_security_xattr_wanted(struct inode *in)