summaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c508
1 files changed, 257 insertions, 251 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 44852c3ae531..fa59a85226b2 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -48,7 +48,7 @@
struct ceph_reconnect_state {
int nr_caps;
struct ceph_pagelist *pagelist;
- bool flock;
+ unsigned msg_version;
};
static void __wake_requests(struct ceph_mds_client *mdsc,
@@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end,
} else
info->inline_version = CEPH_INLINE_NONE;
+ info->pool_ns_len = 0;
+ info->pool_ns_data = NULL;
if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
- ceph_decode_need(p, end, info->pool_ns_len, bad);
- *p += info->pool_ns_len;
- } else {
- info->pool_ns_len = 0;
+ if (info->pool_ns_len > 0) {
+ ceph_decode_need(p, end, info->pool_ns_len, bad);
+ info->pool_ns_data = *p;
+ *p += info->pool_ns_len;
+ }
}
return 0;
@@ -181,17 +184,18 @@ static int parse_reply_info_dir(void **p, void *end,
ceph_decode_need(p, end, sizeof(num) + 2, bad);
num = ceph_decode_32(p);
- info->dir_end = ceph_decode_8(p);
- info->dir_complete = ceph_decode_8(p);
+ {
+ u16 flags = ceph_decode_16(p);
+ info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
+ info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
+ info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
+ }
if (num == 0)
goto done;
- BUG_ON(!info->dir_in);
- info->dir_dname = (void *)(info->dir_in + num);
- info->dir_dname_len = (void *)(info->dir_dname + num);
- info->dir_dlease = (void *)(info->dir_dname_len + num);
- if ((unsigned long)(info->dir_dlease + num) >
- (unsigned long)info->dir_in + info->dir_buf_size) {
+ BUG_ON(!info->dir_entries);
+ if ((unsigned long)(info->dir_entries + num) >
+ (unsigned long)info->dir_entries + info->dir_buf_size) {
pr_err("dir contents are larger than expected\n");
WARN_ON(1);
goto bad;
@@ -199,21 +203,23 @@ static int parse_reply_info_dir(void **p, void *end,
info->dir_nr = num;
while (num) {
+ struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
/* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad);
- info->dir_dname_len[i] = ceph_decode_32(p);
- ceph_decode_need(p, end, info->dir_dname_len[i], bad);
- info->dir_dname[i] = *p;
- *p += info->dir_dname_len[i];
- dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
- info->dir_dname[i]);
- info->dir_dlease[i] = *p;
+ rde->name_len = ceph_decode_32(p);
+ ceph_decode_need(p, end, rde->name_len, bad);
+ rde->name = *p;
+ *p += rde->name_len;
+ dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
+ rde->lease = *p;
*p += sizeof(struct ceph_mds_reply_lease);
/* inode */
- err = parse_reply_info_in(p, end, &info->dir_in[i], features);
+ err = parse_reply_info_in(p, end, &rde->inode, features);
if (err < 0)
goto out_bad;
+ /* ceph_readdir_prepopulate() will update it */
+ rde->offset = 0;
i++;
num--;
}
@@ -345,9 +351,9 @@ out_bad:
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
- if (!info->dir_in)
+ if (!info->dir_entries)
return;
- free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
+ free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
}
@@ -386,9 +392,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
if (atomic_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
- ceph_auth_destroy_authorizer(
- s->s_mdsc->fsc->client->monc.auth,
- s->s_auth.authorizer);
+ ceph_auth_destroy_authorizer(s->s_auth.authorizer);
kfree(s);
}
}
@@ -468,7 +472,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_flushing);
- INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
dout("register_session mds%d\n", mds);
if (mds >= mdsc->max_sessions) {
@@ -569,51 +572,23 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req);
}
+DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
+
/*
* lookup session, bump ref if found.
*
* called under mdsc->mutex.
*/
-static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
- u64 tid)
+static struct ceph_mds_request *
+lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
{
struct ceph_mds_request *req;
- struct rb_node *n = mdsc->request_tree.rb_node;
-
- while (n) {
- req = rb_entry(n, struct ceph_mds_request, r_node);
- if (tid < req->r_tid)
- n = n->rb_left;
- else if (tid > req->r_tid)
- n = n->rb_right;
- else {
- ceph_mdsc_get_request(req);
- return req;
- }
- }
- return NULL;
-}
-
-static void __insert_request(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *new)
-{
- struct rb_node **p = &mdsc->request_tree.rb_node;
- struct rb_node *parent = NULL;
- struct ceph_mds_request *req = NULL;
- while (*p) {
- parent = *p;
- req = rb_entry(parent, struct ceph_mds_request, r_node);
- if (new->r_tid < req->r_tid)
- p = &(*p)->rb_left;
- else if (new->r_tid > req->r_tid)
- p = &(*p)->rb_right;
- else
- BUG();
- }
+ req = lookup_request(&mdsc->request_tree, tid);
+ if (req)
+ ceph_mdsc_get_request(req);
- rb_link_node(&new->r_node, parent, p);
- rb_insert_color(&new->r_node, &mdsc->request_tree);
+ return req;
}
/*
@@ -632,7 +607,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
- __insert_request(mdsc, req);
+ insert_request(&mdsc->request_tree, req);
req->r_uid = current_fsuid();
req->r_gid = current_fsgid();
@@ -665,8 +640,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
}
}
- rb_erase(&req->r_node, &mdsc->request_tree);
- RB_CLEAR_NODE(&req->r_node);
+ erase_request(&mdsc->request_tree, req);
if (req->r_unsafe_dir && req->r_got_unsafe) {
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
@@ -870,12 +844,14 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
int metadata_bytes = 0;
int metadata_key_count = 0;
struct ceph_options *opt = mdsc->fsc->client->options;
+ struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
void *p;
const char* metadata[][2] = {
{"hostname", utsname()->nodename},
{"kernel_version", utsname()->release},
- {"entity_id", opt->name ? opt->name : ""},
+ {"entity_id", opt->name ? : ""},
+ {"root", fsopt->server_path ? : "/"},
{NULL, NULL}
};
@@ -1151,9 +1127,11 @@ out:
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
+ struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
- int drop = 0;
+ bool drop = false;
+ bool invalidate = false;
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
@@ -1161,22 +1139,25 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
- struct ceph_mds_client *mdsc =
- ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = fsc->mdsc;
- while (true) {
- struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
- if (!n)
- break;
- cf = rb_entry(n, struct ceph_cap_flush, i_node);
- rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
- list_add(&cf->list, &to_remove);
+ ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
+
+ if (ci->i_wrbuffer_ref > 0 &&
+ ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
+ invalidate = true;
+
+ while (!list_empty(&ci->i_cap_flush_list)) {
+ cf = list_first_entry(&ci->i_cap_flush_list,
+ struct ceph_cap_flush, i_list);
+ list_del(&cf->i_list);
+ list_add(&cf->i_list, &to_remove);
}
spin_lock(&mdsc->cap_dirty_lock);
- list_for_each_entry(cf, &to_remove, list)
- rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+ list_for_each_entry(cf, &to_remove, i_list)
+ list_del(&cf->g_list);
if (!list_empty(&ci->i_dirty_item)) {
pr_warn_ratelimited(
@@ -1185,7 +1166,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
- drop = 1;
+ drop = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
@@ -1195,12 +1176,12 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
- drop = 1;
+ drop = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
- list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+ list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
}
}
@@ -1208,11 +1189,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
while (!list_empty(&to_remove)) {
struct ceph_cap_flush *cf;
cf = list_first_entry(&to_remove,
- struct ceph_cap_flush, list);
- list_del(&cf->list);
+ struct ceph_cap_flush, i_list);
+ list_del(&cf->i_list);
ceph_free_cap_flush(cf);
}
- while (drop--)
+
+ wake_up_all(&ci->i_cap_wq);
+ if (invalidate)
+ ceph_queue_invalidate(inode);
+ if (drop)
iput(inode);
return 0;
}
@@ -1222,12 +1207,15 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
*/
static void remove_session_caps(struct ceph_mds_session *session)
{
+ struct ceph_fs_client *fsc = session->s_mdsc->fsc;
+ struct super_block *sb = fsc->sb;
dout("remove_session_caps on %p\n", session);
- iterate_session_caps(session, remove_session_caps_cb, NULL);
+ iterate_session_caps(session, remove_session_caps_cb, fsc);
+
+ wake_up_all(&fsc->mdsc->cap_flushing_wq);
spin_lock(&session->s_cap_lock);
if (session->s_nr_caps > 0) {
- struct super_block *sb = session->s_mdsc->fsc->sb;
struct inode *inode;
struct ceph_cap *cap, *prev = NULL;
struct ceph_vino vino;
@@ -1272,13 +1260,13 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
{
struct ceph_inode_info *ci = ceph_inode(inode);
- wake_up_all(&ci->i_cap_wq);
if (arg) {
spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
}
+ wake_up_all(&ci->i_cap_wq);
return 0;
}
@@ -1492,35 +1480,21 @@ static int trim_caps(struct ceph_mds_client *mdsc,
return 0;
}
-static int check_capsnap_flush(struct ceph_inode_info *ci,
- u64 want_snap_seq)
-{
- int ret = 1;
- spin_lock(&ci->i_ceph_lock);
- if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
- struct ceph_cap_snap *capsnap =
- list_first_entry(&ci->i_cap_snaps,
- struct ceph_cap_snap, ci_item);
- ret = capsnap->follows >= want_snap_seq;
- }
- spin_unlock(&ci->i_ceph_lock);
- return ret;
-}
-
static int check_caps_flush(struct ceph_mds_client *mdsc,
u64 want_flush_tid)
{
- struct rb_node *n;
- struct ceph_cap_flush *cf;
int ret = 1;
spin_lock(&mdsc->cap_dirty_lock);
- n = rb_first(&mdsc->cap_flush_tree);
- cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
- if (cf && cf->tid <= want_flush_tid) {
- dout("check_caps_flush still flushing tid %llu <= %llu\n",
- cf->tid, want_flush_tid);
- ret = 0;
+ if (!list_empty(&mdsc->cap_flush_list)) {
+ struct ceph_cap_flush *cf =
+ list_first_entry(&mdsc->cap_flush_list,
+ struct ceph_cap_flush, g_list);
+ if (cf->tid <= want_flush_tid) {
+ dout("check_caps_flush still flushing tid "
+ "%llu <= %llu\n", cf->tid, want_flush_tid);
+ ret = 0;
+ }
}
spin_unlock(&mdsc->cap_dirty_lock);
return ret;
@@ -1532,54 +1506,9 @@ static int check_caps_flush(struct ceph_mds_client *mdsc,
* returns true if we've flushed through want_flush_tid
*/
static void wait_caps_flush(struct ceph_mds_client *mdsc,
- u64 want_flush_tid, u64 want_snap_seq)
+ u64 want_flush_tid)
{
- int mds;
-
- dout("check_caps_flush want %llu snap want %llu\n",
- want_flush_tid, want_snap_seq);
- mutex_lock(&mdsc->mutex);
- for (mds = 0; mds < mdsc->max_sessions; ) {
- struct ceph_mds_session *session = mdsc->sessions[mds];
- struct inode *inode = NULL;
-
- if (!session) {
- mds++;
- continue;
- }
- get_session(session);
- mutex_unlock(&mdsc->mutex);
-
- mutex_lock(&session->s_mutex);
- if (!list_empty(&session->s_cap_snaps_flushing)) {
- struct ceph_cap_snap *capsnap =
- list_first_entry(&session->s_cap_snaps_flushing,
- struct ceph_cap_snap,
- flushing_item);
- struct ceph_inode_info *ci = capsnap->ci;
- if (!check_capsnap_flush(ci, want_snap_seq)) {
- dout("check_cap_flush still flushing snap %p "
- "follows %lld <= %lld to mds%d\n",
- &ci->vfs_inode, capsnap->follows,
- want_snap_seq, mds);
- inode = igrab(&ci->vfs_inode);
- }
- }
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
-
- if (inode) {
- wait_event(mdsc->cap_flushing_wq,
- check_capsnap_flush(ceph_inode(inode),
- want_snap_seq));
- iput(inode);
- } else {
- mds++;
- }
-
- mutex_lock(&mdsc->mutex);
- }
- mutex_unlock(&mdsc->mutex);
+ dout("check_caps_flush want %llu\n", want_flush_tid);
wait_event(mdsc->cap_flushing_wq,
check_caps_flush(mdsc, want_flush_tid));
@@ -1610,7 +1539,7 @@ again:
while (!list_empty(&tmp_list)) {
if (!msg) {
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
- PAGE_CACHE_SIZE, GFP_NOFS, false);
+ PAGE_SIZE, GFP_NOFS, false);
if (!msg)
goto out_err;
head = msg->front.iov_base;
@@ -1673,8 +1602,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
- size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
- sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
+ size_t size = sizeof(struct ceph_mds_reply_dir_entry);
int order, num_entries;
spin_lock(&ci->i_ceph_lock);
@@ -1685,14 +1613,14 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
order = get_order(size * num_entries);
while (order >= 0) {
- rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
- __GFP_NOWARN,
- order);
- if (rinfo->dir_in)
+ rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
+ __GFP_NOWARN,
+ order);
+ if (rinfo->dir_entries)
break;
order--;
}
- if (!rinfo->dir_in)
+ if (!rinfo->dir_entries)
return -ENOMEM;
num_entries = (PAGE_SIZE << order) / size;
@@ -1724,6 +1652,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
INIT_LIST_HEAD(&req->r_unsafe_target_item);
req->r_fmode = -1;
kref_init(&req->r_kref);
+ RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_wait);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
@@ -2177,6 +2106,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
mds = __choose_mds(mdsc, req);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+ if (mdsc->mdsmap_err) {
+ err = mdsc->mdsmap_err;
+ dout("do_request mdsmap err %d\n", err);
+ goto finish;
+ }
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
goto out;
@@ -2306,14 +2240,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- /* deny access to directories with pool_ns layouts */
- if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
- ceph_inode(req->r_inode)->i_pool_ns_len)
- return -EIO;
- if (req->r_locked_dir &&
- ceph_inode(req->r_locked_dir)->i_pool_ns_len)
- return -EIO;
-
/* issue */
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
@@ -2416,7 +2342,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* get request, session */
tid = le64_to_cpu(msg->hdr.tid);
mutex_lock(&mdsc->mutex);
- req = __lookup_request(mdsc, tid);
+ req = lookup_get_request(mdsc, tid);
if (!req) {
dout("handle_reply on unknown tid %llu\n", tid);
mutex_unlock(&mdsc->mutex);
@@ -2606,7 +2532,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
fwd_seq = ceph_decode_32(&p);
mutex_lock(&mdsc->mutex);
- req = __lookup_request(mdsc, tid);
+ req = lookup_get_request(mdsc, tid);
if (!req) {
dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
goto out; /* dup reply? */
@@ -2805,13 +2731,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_mds_cap_reconnect v2;
struct ceph_mds_cap_reconnect_v1 v1;
} rec;
- size_t reclen;
struct ceph_inode_info *ci;
struct ceph_reconnect_state *recon_state = arg;
struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
int pathlen, err;
u64 pathbase;
+ u64 snap_follows;
struct dentry *dentry;
ci = cap->ci;
@@ -2834,9 +2760,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
path = NULL;
pathlen = 0;
}
- err = ceph_pagelist_encode_string(pagelist, path, pathlen);
- if (err)
- goto out_free;
spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */
@@ -2844,14 +2767,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->mseq = 0; /* and migrate_seq */
cap->cap_gen = cap->session->s_cap_gen;
- if (recon_state->flock) {
+ if (recon_state->msg_version >= 2) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.v2.issued = cpu_to_le32(cap->issued);
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v2.pathbase = cpu_to_le64(pathbase);
rec.v2.flock_len = 0;
- reclen = sizeof(rec.v2);
} else {
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -2861,13 +2783,23 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v1.pathbase = cpu_to_le64(pathbase);
- reclen = sizeof(rec.v1);
+ }
+
+ if (list_empty(&ci->i_cap_snaps)) {
+ snap_follows = 0;
+ } else {
+ struct ceph_cap_snap *capsnap =
+ list_first_entry(&ci->i_cap_snaps,
+ struct ceph_cap_snap, ci_item);
+ snap_follows = capsnap->follows;
}
spin_unlock(&ci->i_ceph_lock);
- if (recon_state->flock) {
+ if (recon_state->msg_version >= 2) {
int num_fcntl_locks, num_flock_locks;
struct ceph_filelock *flocks;
+ size_t struct_len, total_len = 0;
+ u8 struct_v = 0;
encode_again:
ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
@@ -2886,20 +2818,51 @@ encode_again:
goto encode_again;
goto out_free;
}
+
+ if (recon_state->msg_version >= 3) {
+ /* version, compat_version and struct_len */
+ total_len = 2 * sizeof(u8) + sizeof(u32);
+ struct_v = 2;
+ }
/*
* number of encoded locks is stable, so copy to pagelist
*/
- rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
- (num_fcntl_locks+num_flock_locks) *
- sizeof(struct ceph_filelock));
- err = ceph_pagelist_append(pagelist, &rec, reclen);
- if (!err)
- err = ceph_locks_to_pagelist(flocks, pagelist,
- num_fcntl_locks,
- num_flock_locks);
+ struct_len = 2 * sizeof(u32) +
+ (num_fcntl_locks + num_flock_locks) *
+ sizeof(struct ceph_filelock);
+ rec.v2.flock_len = cpu_to_le32(struct_len);
+
+ struct_len += sizeof(rec.v2);
+ struct_len += sizeof(u32) + pathlen;
+
+ if (struct_v >= 2)
+ struct_len += sizeof(u64); /* snap_follows */
+
+ total_len += struct_len;
+ err = ceph_pagelist_reserve(pagelist, total_len);
+
+ if (!err) {
+ if (recon_state->msg_version >= 3) {
+ ceph_pagelist_encode_8(pagelist, struct_v);
+ ceph_pagelist_encode_8(pagelist, 1);
+ ceph_pagelist_encode_32(pagelist, struct_len);
+ }
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+ ceph_locks_to_pagelist(flocks, pagelist,
+ num_fcntl_locks,
+ num_flock_locks);
+ if (struct_v >= 2)
+ ceph_pagelist_encode_64(pagelist, snap_follows);
+ }
kfree(flocks);
} else {
- err = ceph_pagelist_append(pagelist, &rec, reclen);
+ size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
+ err = ceph_pagelist_reserve(pagelist, size);
+ if (!err) {
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+ }
}
recon_state->nr_caps++;
@@ -2990,7 +2953,12 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
recon_state.nr_caps = 0;
recon_state.pagelist = pagelist;
- recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+ if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+ recon_state.msg_version = 3;
+ else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+ recon_state.msg_version = 2;
+ else
+ recon_state.msg_version = 1;
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
@@ -3019,8 +2987,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail;
}
- if (recon_state.flock)
- reply->hdr.version = cpu_to_le16(2);
+ reply->hdr.version = cpu_to_le16(recon_state.msg_version);
/* raced with cap release? */
if (s_nr_caps != recon_state.nr_caps) {
@@ -3218,7 +3185,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
WARN_ON(1);
goto release; /* hrm... */
}
- dname.hash = full_name_hash(dname.name, dname.len);
+ dname.hash = full_name_hash(parent, dname.name, dname.len);
dentry = d_lookup(parent, &dname);
dput(parent);
if (!dentry)
@@ -3245,7 +3212,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
msecs_to_jiffies(le32_to_cpu(h->duration_ms));
di->lease_seq = seq;
- dentry->d_time = di->lease_renew_from + duration;
+ di->time = di->lease_renew_from + duration;
di->lease_renew_after = di->lease_renew_from +
(duration >> 1);
di->lease_renew_from = 0;
@@ -3311,47 +3278,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
}
/*
- * Preemptively release a lease we expect to invalidate anyway.
- * Pass @inode always, @dentry is optional.
- */
-void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
- struct dentry *dentry)
-{
- struct ceph_dentry_info *di;
- struct ceph_mds_session *session;
- u32 seq;
-
- BUG_ON(inode == NULL);
- BUG_ON(dentry == NULL);
-
- /* is dentry lease valid? */
- spin_lock(&dentry->d_lock);
- di = ceph_dentry(dentry);
- if (!di || !di->lease_session ||
- di->lease_session->s_mds < 0 ||
- di->lease_gen != di->lease_session->s_cap_gen ||
- !time_before(jiffies, dentry->d_time)) {
- dout("lease_release inode %p dentry %p -- "
- "no lease\n",
- inode, dentry);
- spin_unlock(&dentry->d_lock);
- return;
- }
-
- /* we do have a lease on this dentry; note mds and seq */
- session = ceph_get_mds_session(di->lease_session);
- seq = di->lease_seq;
- __ceph_mdsc_drop_dentry_lease(dentry);
- spin_unlock(&dentry->d_lock);
-
- dout("lease_release inode %p dentry %p to mds%d\n",
- inode, dentry, session->s_mds);
- ceph_mdsc_lease_send_msg(session, inode, dentry,
- CEPH_MDS_LEASE_RELEASE, seq);
- ceph_put_mds_session(session);
-}
-
-/*
* drop all leases (and dentry refs) in preparation for umount
*/
static void drop_leases(struct ceph_mds_client *mdsc)
@@ -3484,7 +3410,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
mdsc->last_cap_flush_tid = 1;
- mdsc->cap_flush_tree = RB_ROOT;
+ INIT_LIST_HEAD(&mdsc->cap_flush_list);
INIT_LIST_HEAD(&mdsc->cap_dirty);
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0;
@@ -3599,7 +3525,7 @@ restart:
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{
- u64 want_tid, want_flush, want_snap;
+ u64 want_tid, want_flush;
if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return;
@@ -3612,17 +3538,19 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
ceph_flush_dirty_caps(mdsc);
spin_lock(&mdsc->cap_dirty_lock);
want_flush = mdsc->last_cap_flush_tid;
+ if (!list_empty(&mdsc->cap_flush_list)) {
+ struct ceph_cap_flush *cf =
+ list_last_entry(&mdsc->cap_flush_list,
+ struct ceph_cap_flush, g_list);
+ cf->wake = true;
+ }
spin_unlock(&mdsc->cap_dirty_lock);
- down_read(&mdsc->snap_rwsem);
- want_snap = mdsc->last_snap_seq;
- up_read(&mdsc->snap_rwsem);
-
- dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
- want_tid, want_flush, want_snap);
+ dout("sync want tid %lld flush_seq %lld\n",
+ want_tid, want_flush);
wait_unsafe_requests(mdsc, want_tid);
- wait_caps_flush(mdsc, want_flush, want_snap);
+ wait_caps_flush(mdsc, want_flush);
}
/*
@@ -3743,11 +3671,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
dout("mdsc_destroy %p done\n", mdsc);
}
+void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+ struct ceph_fs_client *fsc = mdsc->fsc;
+ const char *mds_namespace = fsc->mount_options->mds_namespace;
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
+ u32 epoch;
+ u32 map_len;
+ u32 num_fs;
+ u32 mount_fscid = (u32)-1;
+ u8 struct_v, struct_cv;
+ int err = -EINVAL;
+
+ ceph_decode_need(&p, end, sizeof(u32), bad);
+ epoch = ceph_decode_32(&p);
+
+ dout("handle_fsmap epoch %u\n", epoch);
+
+ ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+ struct_v = ceph_decode_8(&p);
+ struct_cv = ceph_decode_8(&p);
+ map_len = ceph_decode_32(&p);
+
+ ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+ p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+
+ num_fs = ceph_decode_32(&p);
+ while (num_fs-- > 0) {
+ void *info_p, *info_end;
+ u32 info_len;
+ u8 info_v, info_cv;
+ u32 fscid, namelen;
+
+ ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+ info_v = ceph_decode_8(&p);
+ info_cv = ceph_decode_8(&p);
+ info_len = ceph_decode_32(&p);
+ ceph_decode_need(&p, end, info_len, bad);
+ info_p = p;
+ info_end = p + info_len;
+ p = info_end;
+
+ ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+ fscid = ceph_decode_32(&info_p);
+ namelen = ceph_decode_32(&info_p);
+ ceph_decode_need(&info_p, info_end, namelen, bad);
+
+ if (mds_namespace &&
+ strlen(mds_namespace) == namelen &&
+ !strncmp(mds_namespace, (char *)info_p, namelen)) {
+ mount_fscid = fscid;
+ break;
+ }
+ }
+
+ ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+ if (mount_fscid != (u32)-1) {
+ fsc->client->monc.fs_cluster_id = mount_fscid;
+ ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+ 0, true);
+ ceph_monc_renew_subs(&fsc->client->monc);
+ } else {
+ err = -ENOENT;
+ goto err_out;
+ }
+ return;
+bad:
+ pr_err("error decoding fsmap\n");
+err_out:
+ mutex_lock(&mdsc->mutex);
+ mdsc->mdsmap_err = -ENOENT;
+ __wake_requests(mdsc, &mdsc->waiting_for_map);
+ mutex_unlock(&mdsc->mutex);
+ return;
+}
/*
* handle mds map update.
*/
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
u32 epoch;
u32 maplen;
@@ -3854,7 +3857,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
switch (type) {
case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(mdsc, msg);
+ ceph_mdsc_handle_mdsmap(mdsc, msg);
+ break;
+ case CEPH_MSG_FS_MAP_USER:
+ ceph_mdsc_handle_fsmap(mdsc, msg);
break;
case CEPH_MSG_CLIENT_SESSION:
handle_session(s, msg);
@@ -3900,7 +3906,7 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
struct ceph_auth_handshake *auth = &s->s_auth;
if (force_new && auth->authorizer) {
- ceph_auth_destroy_authorizer(ac, auth->authorizer);
+ ceph_auth_destroy_authorizer(auth->authorizer);
auth->authorizer = NULL;
}
if (!auth->authorizer) {