From 281dbe5db81c6137def9757e07a7aea14b1ed86e Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 26 Jul 2016 15:22:35 +0200 Subject: libceph: add an ONSTACK initializer for oids An on-stack oid in ceph_ioctl_get_dataloc() is not initialized, resulting in a WARN and a NULL pointer dereference later on. We will have more of these on-stack in the future, so fix it with a convenience macro. Fixes: d30291b985d1 ("libceph: variable-sized ceph_object_id") Signed-off-by: Ilya Dryomov --- fs/ceph/ioctl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs/ceph') diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index be6b1657b1af..0946f2d4a81f 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -183,7 +183,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; struct ceph_object_locator oloc; - struct ceph_object_id oid; + CEPH_DEFINE_OID_ONSTACK(oid); u64 len = 1, olen; u64 tmp; struct ceph_pg pgid; -- cgit v1.2.3 From 7627151ea30bce2051e3cb27d7bb2c30083f86a5 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Wed, 3 Feb 2016 21:24:49 +0800 Subject: libceph: define new ceph_file_layout structure Define new ceph_file_layout structure and rename old ceph_file_layout to ceph_file_layout_legacy. This is preparation for adding namespace to ceph_file_layout structure. Signed-off-by: Yan, Zheng --- drivers/block/rbd.c | 14 ++++++------- fs/ceph/addr.c | 18 ++++++++-------- fs/ceph/caps.c | 5 ++++- fs/ceph/file.c | 6 +++--- fs/ceph/inode.c | 7 ++++--- fs/ceph/ioctl.c | 22 +++++++++---------- fs/ceph/mds_client.h | 2 +- fs/ceph/xattr.c | 28 ++++++++++--------------- include/linux/ceph/ceph_fs.h | 50 +++++++++++++++++++------------------------- net/ceph/ceph_fs.c | 30 +++++++++++++++++++++++--- net/ceph/osd_client.c | 4 ++-- net/ceph/osdmap.c | 6 +++--- 12 files changed, 103 insertions(+), 89 deletions(-) (limited to 'fs/ceph') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 81666a56415e..cc272ed46cfd 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1937,7 +1937,7 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; - osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id; if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", obj_request->object_name)) goto fail; @@ -1991,7 +1991,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; - osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id; if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", obj_request->object_name)) goto fail; @@ -3995,10 +3995,10 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, /* Initialize the layout used for all rbd requests */ - rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); - rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); + rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; + rbd_dev->layout.stripe_count = 1; + rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER; + rbd_dev->layout.pool_id = spec->pool_id; /* * If this is a mapping rbd_dev (as opposed to a parent one), @@ -5187,7 +5187,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); - rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id; if (rbd_dev->image_format == 1) ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", spec->image_name, RBD_SUFFIX); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 26a9d10d75e9..3f8efd866fec 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1730,7 +1730,7 @@ enum { POOL_WRITE = 2, }; -static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) +static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) { struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); struct ceph_mds_client *mdsc = fsc->mdsc; @@ -1757,7 +1757,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) if (*p) goto out; - dout("__ceph_pool_perm_get pool %u no perm cached\n", pool); + dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); down_write(&mdsc->pool_perm_rwsem); parent = NULL; @@ -1860,13 +1860,13 @@ out_unlock: out: if (!err) err = have; - dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err); + dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); return err; } int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) { - u32 pool; + s64 pool; int ret, flags; /* does not support pool namespace yet */ @@ -1879,17 +1879,17 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) spin_lock(&ci->i_ceph_lock); flags = ci->i_ceph_flags; - pool = ceph_file_layout_pg_pool(ci->i_layout); + pool = ci->i_layout.pool_id; spin_unlock(&ci->i_ceph_lock); check: if (flags & CEPH_I_POOL_PERM) { if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { - dout("ceph_pool_perm_check pool %u no read perm\n", + dout("ceph_pool_perm_check pool %lld no read perm\n", pool); return -EPERM; } if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { - dout("ceph_pool_perm_check pool %u no write perm\n", + dout("ceph_pool_perm_check pool %lld no write perm\n", pool); return -EPERM; } @@ -1907,10 +1907,10 @@ check: flags |= CEPH_I_POOL_WR; spin_lock(&ci->i_ceph_lock); - if (pool == ceph_file_layout_pg_pool(ci->i_layout)) { + if (pool == ci->i_layout.pool_id) { ci->i_ceph_flags = flags; } else { - pool = ceph_file_layout_pg_pool(ci->i_layout); + pool = ci->i_layout.pool_id; flags = ci->i_ceph_flags; } spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 6f60d0a3d0f9..f24722dce167 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2895,8 +2895,11 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { /* file layout may have changed */ - ci->i_layout = grant->layout; + s64 old_pool = ci->i_layout.pool_id; + ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); ci->i_pool_ns_len = pool_ns_len; + if (ci->i_layout.pool_id != old_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; /* size/truncate_seq? */ queue_trunc = ceph_fill_file_size(inode, issued, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0daaf7ceedc5..cba5dcf49a65 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1583,9 +1583,9 @@ static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length) { int ret = 0; struct ceph_inode_info *ci = ceph_inode(inode); - s32 stripe_unit = ceph_file_layout_su(ci->i_layout); - s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout); - s32 object_size = ceph_file_layout_object_size(ci->i_layout); + s32 stripe_unit = ci->i_layout.stripe_unit; + s32 stripe_count = ci->i_layout.stripe_count; + s32 object_size = ci->i_layout.object_size; u64 object_set_size = object_size * stripe_count; u64 nearly, t; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f059b5997072..6c5903e0a976 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -814,10 +814,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { - if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool) - ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; - ci->i_layout = info->layout; + s64 old_pool = ci->i_layout.pool_id; + ceph_file_layout_from_legacy(&ci->i_layout, &info->layout); ci->i_pool_ns_len = iinfo->pool_ns_len; + if (ci->i_layout.pool_id != old_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 0946f2d4a81f..843dd31a02cd 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -21,10 +21,10 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false); if (!err) { - l.stripe_unit = ceph_file_layout_su(ci->i_layout); - l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); - l.object_size = ceph_file_layout_object_size(ci->i_layout); - l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); + l.stripe_unit = ci->i_layout.stripe_unit; + l.stripe_count = ci->i_layout.stripe_count; + l.object_size = ci->i_layout.object_size; + l.data_pool = ci->i_layout.pool_id; l.preferred_osd = (s32)-1; if (copy_to_user(arg, &l, sizeof(l))) return -EFAULT; @@ -82,19 +82,19 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) if (l.stripe_count) nl.stripe_count = l.stripe_count; else - nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); + nl.stripe_count = ci->i_layout.stripe_count; if (l.stripe_unit) nl.stripe_unit = l.stripe_unit; else - nl.stripe_unit = ceph_file_layout_su(ci->i_layout); + nl.stripe_unit = ci->i_layout.stripe_unit; if (l.object_size) nl.object_size = l.object_size; else - nl.object_size = ceph_file_layout_object_size(ci->i_layout); + nl.object_size = ci->i_layout.object_size; if (l.data_pool) nl.data_pool = l.data_pool; else - nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout); + nl.data_pool = ci->i_layout.pool_id; /* this is obsolete, and always -1 */ nl.preferred_osd = le64_to_cpu(-1); @@ -202,8 +202,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) return -EIO; } dl.file_offset -= dl.object_offset; - dl.object_size = ceph_file_layout_object_size(ci->i_layout); - dl.block_size = ceph_file_layout_su(ci->i_layout); + dl.object_size = ci->i_layout.object_size; + dl.block_size = ci->i_layout.stripe_unit; /* block_offset = object_offset % block_size */ tmp = dl.object_offset; @@ -212,7 +212,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", ceph_ino(inode), dl.object_no); - oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); + oloc.pool = ci->i_layout.pool_id; ceph_oid_printf(&oid, "%s", dl.object_name); r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index e7d38aac7109..75ecf967d0b7 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -275,8 +275,8 @@ struct ceph_mds_request { struct ceph_pool_perm { struct rb_node node; - u32 pool; int perm; + s64 pool; }; /* diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 4870b29df224..5377c9c7a0c5 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -72,7 +72,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, int ret; struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; - s64 pool = ceph_file_layout_pg_pool(ci->i_layout); + s64 pool = ci->i_layout.pool_id; const char *pool_name; char buf[128]; @@ -82,10 +82,9 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, if (pool_name) { size_t len = strlen(pool_name); ret = snprintf(buf, sizeof(buf), - "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=", - (unsigned long long)ceph_file_layout_su(ci->i_layout), - (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), - (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); + "stripe_unit=%u stripe_count=%u object_size=%u pool=", + ci->i_layout.stripe_unit, ci->i_layout.stripe_count, + ci->i_layout.object_size); if (!size) { ret += len; } else if (ret + len > size) { @@ -97,11 +96,9 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, } } else { ret = snprintf(buf, sizeof(buf), - "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld", - (unsigned long long)ceph_file_layout_su(ci->i_layout), - (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), - (unsigned long long)ceph_file_layout_object_size(ci->i_layout), - (unsigned long long)pool); + "stripe_unit=%u stripe_count=%u object_size=%u pool=%lld", + ci->i_layout.stripe_unit, ci->i_layout.stripe_count, + ci->i_layout.object_size, (unsigned long long)pool); if (size) { if (ret <= size) memcpy(val, buf, ret); @@ -116,22 +113,19 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci, char *val, size_t size) { - return snprintf(val, size, "%lld", - (unsigned long long)ceph_file_layout_su(ci->i_layout)); + return snprintf(val, size, "%u", ci->i_layout.stripe_unit); } static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci, char *val, size_t size) { - return snprintf(val, size, "%lld", - (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout)); + return snprintf(val, size, "%u", ci->i_layout.stripe_count); } static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci, char *val, size_t size) { - return snprintf(val, size, "%lld", - (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); + return snprintf(val, size, "%u", ci->i_layout.object_size); } static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, @@ -140,7 +134,7 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, int ret; struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; - s64 pool = ceph_file_layout_pg_pool(ci->i_layout); + s64 pool = ci->i_layout.pool_id; const char *pool_name; down_read(&osdc->lock); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index dfce616002ad..e5a5fb9ca3f5 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -34,9 +34,9 @@ #define CEPH_MAX_MON 31 /* - * ceph_file_layout - describe data layout for a file/inode + * legacy ceph_file_layoute */ -struct ceph_file_layout { +struct ceph_file_layout_legacy { /* file -> object mapping */ __le32 fl_stripe_unit; /* stripe unit, in bytes. must be multiple of page size. */ @@ -53,33 +53,25 @@ struct ceph_file_layout { __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); -#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit)) -#define ceph_file_layout_stripe_count(l) \ - ((__s32)le32_to_cpu((l).fl_stripe_count)) -#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size)) -#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) -#define ceph_file_layout_object_su(l) \ - ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) -#define ceph_file_layout_pg_pool(l) \ - ((__s32)le32_to_cpu((l).fl_pg_pool)) - -static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l) -{ - return le32_to_cpu(l->fl_stripe_unit) * - le32_to_cpu(l->fl_stripe_count); -} - -/* "period" == bytes before i start on a new set of objects */ -static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l) -{ - return le32_to_cpu(l->fl_object_size) * - le32_to_cpu(l->fl_stripe_count); -} +/* + * ceph_file_layout - describe data layout for a file/inode + */ +struct ceph_file_layout { + /* file -> object mapping */ + u32 stripe_unit; /* stripe unit, in bytes */ + u32 stripe_count; /* over this many objects */ + u32 object_size; /* until objects are this big */ + s64 pool_id; /* rados pool id */ +}; + +extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); +extern void ceph_file_layout_from_legacy(struct ceph_file_layout *fl, + struct ceph_file_layout_legacy *legacy); +extern void ceph_file_layout_to_legacy(struct ceph_file_layout *fl, + struct ceph_file_layout_legacy *legacy); #define CEPH_MIN_STRIPE_UNIT 65536 -int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); - struct ceph_dir_layout { __u8 dl_dir_hash; /* see ceph_hash.h for ids */ __u8 dl_unused1; @@ -399,7 +391,7 @@ union ceph_mds_request_args { __le32 flags; } __attribute__ ((packed)) setxattr; struct { - struct ceph_file_layout layout; + struct ceph_file_layout_legacy layout; } __attribute__ ((packed)) setlayout; struct { __u8 rule; /* currently fcntl or flock */ @@ -478,7 +470,7 @@ struct ceph_mds_reply_inode { __le64 version; /* inode version */ __le64 xattr_version; /* version for xattr blob */ struct ceph_mds_reply_cap cap; /* caps issued for this inode */ - struct ceph_file_layout layout; + struct ceph_file_layout_legacy layout; struct ceph_timespec ctime, mtime, atime; __le32 time_warp_seq; __le64 size, max_size, truncate_size; @@ -673,7 +665,7 @@ struct ceph_mds_caps { __le64 size, max_size, truncate_size; __le32 truncate_seq; struct ceph_timespec mtime, atime, ctime; - struct ceph_file_layout layout; + struct ceph_file_layout_legacy layout; __le32 time_warp_seq; } __attribute__ ((packed)); diff --git a/net/ceph/ceph_fs.c b/net/ceph/ceph_fs.c index 41466ccb972a..7d54e944de5e 100644 --- a/net/ceph/ceph_fs.c +++ b/net/ceph/ceph_fs.c @@ -9,9 +9,9 @@ */ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout) { - __u32 su = le32_to_cpu(layout->fl_stripe_unit); - __u32 sc = le32_to_cpu(layout->fl_stripe_count); - __u32 os = le32_to_cpu(layout->fl_object_size); + __u32 su = layout->stripe_unit; + __u32 sc = layout->stripe_count; + __u32 os = layout->object_size; /* stripe unit, object size must be non-zero, 64k increment */ if (!su || (su & (CEPH_MIN_STRIPE_UNIT-1))) @@ -27,6 +27,30 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout) return 1; } +void ceph_file_layout_from_legacy(struct ceph_file_layout *fl, + struct ceph_file_layout_legacy *legacy) +{ + fl->stripe_unit = le32_to_cpu(legacy->fl_stripe_unit); + fl->stripe_count = le32_to_cpu(legacy->fl_stripe_count); + fl->object_size = le32_to_cpu(legacy->fl_object_size); + fl->pool_id = le32_to_cpu(legacy->fl_pg_pool); + if (fl->pool_id == 0) + fl->pool_id = -1; +} +EXPORT_SYMBOL(ceph_file_layout_from_legacy); + +void ceph_file_layout_to_legacy(struct ceph_file_layout *fl, + struct ceph_file_layout_legacy *legacy) +{ + legacy->fl_stripe_unit = cpu_to_le32(fl->stripe_unit); + legacy->fl_stripe_count = cpu_to_le32(fl->stripe_count); + legacy->fl_object_size = cpu_to_le32(fl->object_size); + if (fl->pool_id >= 0) + legacy->fl_pg_pool = cpu_to_le32(fl->pool_id); + else + legacy->fl_pg_pool = 0; +} +EXPORT_SYMBOL(ceph_file_layout_to_legacy); int ceph_flags_to_mode(int flags) { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 89469592076c..23efcac80072 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -932,7 +932,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { osd_req_op_init(req, which, opcode, 0); } else { - u32 object_size = le32_to_cpu(layout->fl_object_size); + u32 object_size = layout->object_size; u32 object_base = off - objoff; if (!(truncate_seq == 1 && truncate_size == -1ULL)) { if (truncate_size <= object_base) { @@ -948,7 +948,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } req->r_flags = flags; - req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); + req->r_base_oloc.pool = layout->pool_id; ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); req->r_snapid = vino.snap; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 7e480bf75bcf..5947b2e9fb8e 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1770,9 +1770,9 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u64 *ono, u64 *oxoff, u64 *oxlen) { - u32 osize = le32_to_cpu(layout->fl_object_size); - u32 su = le32_to_cpu(layout->fl_stripe_unit); - u32 sc = le32_to_cpu(layout->fl_stripe_count); + u32 osize = layout->object_size; + u32 su = layout->stripe_unit; + u32 sc = layout->stripe_count; u32 bl, stripeno, stripepos, objsetno; u32 su_per_object; u64 t, su_offset; -- cgit v1.2.3 From 30c156d9951e0aa88202707d80c583b0a09d3167 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Sun, 14 Feb 2016 11:24:31 +0800 Subject: libceph: rados pool namespace support Add pool namesapce pointer to struct ceph_file_layout and struct ceph_object_locator. Pool namespace is used by when mapping object to PG, it's also used when composing OSD request. The namespace pointer in struct ceph_file_layout is RCU protected. So libceph can read namespace without taking lock. Signed-off-by: Yan, Zheng [idryomov@gmail.com: ceph_oloc_destroy(), misc minor changes] Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 1 + fs/ceph/inode.c | 3 +++ include/linux/ceph/ceph_fs.h | 2 ++ include/linux/ceph/osdmap.h | 10 ++++----- net/ceph/debugfs.c | 12 ++++++++-- net/ceph/osd_client.c | 32 +++++++++++++++++++++------ net/ceph/osdmap.c | 52 +++++++++++++++++++++++++++++++++++++++----- 7 files changed, 92 insertions(+), 20 deletions(-) (limited to 'fs/ceph') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cc272ed46cfd..58fd02d4e534 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3999,6 +3999,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.stripe_count = 1; rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER; rbd_dev->layout.pool_id = spec->pool_id; + RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); /* * If this is a mapping rbd_dev (as opposed to a parent one), diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6c5903e0a976..d035e0ab6029 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -446,6 +446,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_symlink = NULL; memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); + RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL); ci->i_pool_ns_len = 0; ci->i_fragtree = RB_ROOT; @@ -570,6 +571,8 @@ void ceph_destroy_inode(struct inode *inode) if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); + ceph_put_string(ci->i_layout.pool_ns); + call_rcu(&inode->i_rcu, ceph_i_callback); } diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index e5a5fb9ca3f5..08b8fd72261e 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -53,6 +53,7 @@ struct ceph_file_layout_legacy { __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); +struct ceph_string; /* * ceph_file_layout - describe data layout for a file/inode */ @@ -62,6 +63,7 @@ struct ceph_file_layout { u32 stripe_count; /* over this many objects */ u32 object_size; /* until objects are this big */ s64 pool_id; /* rados pool id */ + struct ceph_string __rcu *pool_ns; /* rados pool namespace */ }; extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 21d7f048959f..9a9041784dcf 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -63,11 +63,13 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool) struct ceph_object_locator { s64 pool; + struct ceph_string *pool_ns; }; static inline void ceph_oloc_init(struct ceph_object_locator *oloc) { oloc->pool = -1; + oloc->pool_ns = NULL; } static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc) @@ -75,11 +77,9 @@ static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc) return oloc->pool == -1; } -static inline void ceph_oloc_copy(struct ceph_object_locator *dest, - const struct ceph_object_locator *src) -{ - dest->pool = src->pool; -} +void ceph_oloc_copy(struct ceph_object_locator *dest, + const struct ceph_object_locator *src); +void ceph_oloc_destroy(struct ceph_object_locator *oloc); /* * Maximum supported by kernel client object name length diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index e77b04ca7802..c62b2b029a6e 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -156,8 +156,16 @@ static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t) seq_printf(s, "]/%d\t[", t->up.primary); for (i = 0; i < t->acting.size; i++) seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]); - seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary, - t->target_oid.name_len, t->target_oid.name, t->flags); + seq_printf(s, "]/%d\t", t->acting.primary); + if (t->target_oloc.pool_ns) { + seq_printf(s, "%*pE/%*pE\t0x%x", + (int)t->target_oloc.pool_ns->len, + t->target_oloc.pool_ns->str, + t->target_oid.name_len, t->target_oid.name, t->flags); + } else { + seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len, + t->target_oid.name, t->flags); + } if (t->paused) seq_puts(s, "\tP"); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 23efcac80072..b68cc15e9cdc 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -387,7 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest, static void target_destroy(struct ceph_osd_request_target *t) { ceph_oid_destroy(&t->base_oid); + ceph_oloc_destroy(&t->base_oloc); ceph_oid_destroy(&t->target_oid); + ceph_oloc_destroy(&t->target_oloc); } /* @@ -533,6 +535,11 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } EXPORT_SYMBOL(ceph_osdc_alloc_request); +static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc) +{ + return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); +} + int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) { struct ceph_osd_client *osdc = req->r_osdc; @@ -540,11 +547,13 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) int msg_size; WARN_ON(ceph_oid_empty(&req->r_base_oid)); + WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); /* create request message */ msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ - msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ + msg_size += CEPH_ENCODING_START_BLK_LEN + + ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pgid */ msg_size += 4 + req->r_base_oid.name_len; /* oid */ msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); @@ -949,6 +958,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_flags = flags; req->r_base_oloc.pool = layout->pool_id; + req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); req->r_snapid = vino.snap; @@ -1489,12 +1499,16 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) p += sizeof(req->r_replay_version); /* oloc */ - ceph_encode_8(&p, 4); - ceph_encode_8(&p, 4); - ceph_encode_32(&p, 8 + 4 + 4); + ceph_start_encoding(&p, 5, 4, + ceph_oloc_encoding_size(&req->r_t.target_oloc)); ceph_encode_64(&p, req->r_t.target_oloc.pool); ceph_encode_32(&p, -1); /* preferred */ ceph_encode_32(&p, 0); /* key len */ + if (req->r_t.target_oloc.pool_ns) + ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str, + req->r_t.target_oloc.pool_ns->len); + else + ceph_encode_32(&p, 0); /* pgid */ ceph_encode_8(&p, 1); @@ -2596,8 +2610,8 @@ static int ceph_oloc_decode(void **p, void *end, if (struct_v >= 5) { len = ceph_decode_32(p); if (len > 0) { - pr_warn("ceph_object_locator::nspace is set\n"); - goto e_inval; + ceph_decode_need(p, end, len, e_inval); + *p += len; } } @@ -2835,7 +2849,11 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) unlink_request(osd, req); mutex_unlock(&osd->lock); - ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc); + /* + * Not ceph_oloc_copy() - changing pool_ns is not + * supported. + */ + req->r_t.target_oloc.pool = m.redirect.oloc.pool; req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; req->r_tid = 0; __submit_request(req, false); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 5947b2e9fb8e..d2436880b305 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1510,6 +1510,24 @@ bad: return ERR_PTR(err); } +void ceph_oloc_copy(struct ceph_object_locator *dest, + const struct ceph_object_locator *src) +{ + WARN_ON(!ceph_oloc_empty(dest)); + WARN_ON(dest->pool_ns); /* empty() only covers ->pool */ + + dest->pool = src->pool; + if (src->pool_ns) + dest->pool_ns = ceph_get_string(src->pool_ns); +} +EXPORT_SYMBOL(ceph_oloc_copy); + +void ceph_oloc_destroy(struct ceph_object_locator *oloc) +{ + ceph_put_string(oloc->pool_ns); +} +EXPORT_SYMBOL(ceph_oloc_destroy); + void ceph_oid_copy(struct ceph_object_id *dest, const struct ceph_object_id *src) { @@ -1844,12 +1862,34 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, if (!pi) return -ENOENT; - raw_pgid->pool = oloc->pool; - raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, - oid->name_len); - - dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, - raw_pgid->pool, raw_pgid->seed); + if (!oloc->pool_ns) { + raw_pgid->pool = oloc->pool; + raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, + oid->name_len); + dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, + raw_pgid->pool, raw_pgid->seed); + } else { + char stack_buf[256]; + char *buf = stack_buf; + int nsl = oloc->pool_ns->len; + size_t total = nsl + 1 + oid->name_len; + + if (total > sizeof(stack_buf)) { + buf = kmalloc(total, GFP_NOIO); + if (!buf) + return -ENOMEM; + } + memcpy(buf, oloc->pool_ns->str, nsl); + buf[nsl] = '\037'; + memcpy(buf + nsl + 1, oid->name, oid->name_len); + raw_pgid->pool = oloc->pool; + raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); + if (buf != stack_buf) + kfree(buf); + dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, + oid->name, nsl, oloc->pool_ns->str, + raw_pgid->pool, raw_pgid->seed); + } return 0; } EXPORT_SYMBOL(ceph_object_locator_to_pg); -- cgit v1.2.3 From 779fe0fb8e1883d5c479ac6bd85fbd237deed1f7 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Mon, 7 Mar 2016 09:35:06 +0800 Subject: ceph: rados pool namespace support This patch adds codes that decode pool namespace information in cap message and request reply. Pool namespace is saved in i_layout, it will be passed to libceph when doing read/write. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 67 +++++++++++++++++++++++++++++++++++---------- fs/ceph/caps.c | 46 +++++++++++++++++-------------- fs/ceph/inode.c | 20 +++++++++++--- fs/ceph/ioctl.c | 3 ++ fs/ceph/mds_client.c | 19 +++++-------- fs/ceph/mds_client.h | 3 ++ fs/ceph/super.h | 1 - fs/ceph/xattr.c | 77 +++++++++++++++++++++++++++++++++++----------------- 8 files changed, 159 insertions(+), 77 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 3f8efd866fec..d5b6f959a3c3 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1730,7 +1730,8 @@ enum { POOL_WRITE = 2, }; -static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) +static int __ceph_pool_perm_get(struct ceph_inode_info *ci, + s64 pool, struct ceph_string *pool_ns) { struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); struct ceph_mds_client *mdsc = fsc->mdsc; @@ -1738,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) struct rb_node **p, *parent; struct ceph_pool_perm *perm; struct page **pages; + size_t pool_ns_len; int err = 0, err2 = 0, have = 0; down_read(&mdsc->pool_perm_rwsem); @@ -1749,17 +1751,31 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) else if (pool > perm->pool) p = &(*p)->rb_right; else { - have = perm->perm; - break; + int ret = ceph_compare_string(pool_ns, + perm->pool_ns, + perm->pool_ns_len); + if (ret < 0) + p = &(*p)->rb_left; + else if (ret > 0) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } } } up_read(&mdsc->pool_perm_rwsem); if (*p) goto out; - dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); + if (pool_ns) + dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n", + pool, (int)pool_ns->len, pool_ns->str); + else + dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); down_write(&mdsc->pool_perm_rwsem); + p = &mdsc->pool_perm_tree.rb_node; parent = NULL; while (*p) { parent = *p; @@ -1769,8 +1785,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) else if (pool > perm->pool) p = &(*p)->rb_right; else { - have = perm->perm; - break; + int ret = ceph_compare_string(pool_ns, + perm->pool_ns, + perm->pool_ns_len); + if (ret < 0) + p = &(*p)->rb_left; + else if (ret > 0) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } } } if (*p) { @@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) rd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); rd_req->r_base_oloc.pool = pool; + if (pool_ns) + rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns); ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); @@ -1841,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) goto out_unlock; } - perm = kmalloc(sizeof(*perm), GFP_NOFS); + pool_ns_len = pool_ns ? pool_ns->len : 0; + perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS); if (!perm) { err = -ENOMEM; goto out_unlock; @@ -1849,6 +1877,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) perm->pool = pool; perm->perm = have; + perm->pool_ns_len = pool_ns_len; + if (pool_ns_len > 0) + memcpy(perm->pool_ns, pool_ns->str, pool_ns_len); + perm->pool_ns[pool_ns_len] = 0; + rb_link_node(&perm->node, parent, p); rb_insert_color(&perm->node, &mdsc->pool_perm_tree); err = 0; @@ -1860,19 +1893,20 @@ out_unlock: out: if (!err) err = have; - dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); + if (pool_ns) + dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n", + pool, (int)pool_ns->len, pool_ns->str, err); + else + dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); return err; } int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) { s64 pool; + struct ceph_string *pool_ns; int ret, flags; - /* does not support pool namespace yet */ - if (ci->i_pool_ns_len) - return -EIO; - if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), NOPOOLPERM)) return 0; @@ -1896,7 +1930,9 @@ check: return 0; } - ret = __ceph_pool_perm_get(ci, pool); + pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); + ret = __ceph_pool_perm_get(ci, pool, pool_ns); + ceph_put_string(pool_ns); if (ret < 0) return ret; @@ -1907,8 +1943,9 @@ check: flags |= CEPH_I_POOL_WR; spin_lock(&ci->i_ceph_lock); - if (pool == ci->i_layout.pool_id) { - ci->i_ceph_flags = flags; + if (pool == ci->i_layout.pool_id && + pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) { + ci->i_ceph_flags |= flags; } else { pool = ci->i_layout.pool_id; flags = ci->i_ceph_flags; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f24722dce167..0a9406a8a794 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2779,12 +2779,11 @@ static void invalidate_aliases(struct inode *inode) */ static void handle_cap_grant(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *grant, - u64 inline_version, - void *inline_data, int inline_len, + struct ceph_string **pns, u64 inline_version, + void *inline_data, u32 inline_len, struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, - struct ceph_cap *cap, int issued, - u32 pool_ns_len) + struct ceph_cap *cap, int issued) __releases(ci->i_ceph_lock) __releases(mdsc->snap_rwsem) { @@ -2896,11 +2895,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { /* file layout may have changed */ s64 old_pool = ci->i_layout.pool_id; + struct ceph_string *old_ns; + ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); - ci->i_pool_ns_len = pool_ns_len; - if (ci->i_layout.pool_id != old_pool) + old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, + lockdep_is_held(&ci->i_ceph_lock)); + rcu_assign_pointer(ci->i_layout.pool_ns, *pns); + + if (ci->i_layout.pool_id != old_pool || *pns != old_ns) ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; + *pns = old_ns; + /* size/truncate_seq? */ queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(grant->truncate_seq), @@ -3423,20 +3429,18 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_cap *cap; struct ceph_mds_caps *h; struct ceph_mds_cap_peer *peer = NULL; - struct ceph_snap_realm *realm; + struct ceph_snap_realm *realm = NULL; + struct ceph_string *pool_ns = NULL; int mds = session->s_mds; int op, issued; u32 seq, mseq; struct ceph_vino vino; - u64 cap_id; - u64 size, max_size; u64 tid; u64 inline_version = 0; void *inline_data = NULL; u32 inline_len = 0; void *snaptrace; size_t snaptrace_len; - u32 pool_ns_len = 0; void *p, *end; dout("handle_caps from mds%d\n", mds); @@ -3450,11 +3454,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, op = le32_to_cpu(h->op); vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; - cap_id = le64_to_cpu(h->cap_id); seq = le32_to_cpu(h->seq); mseq = le32_to_cpu(h->migrate_seq); - size = le64_to_cpu(h->size); - max_size = le64_to_cpu(h->max_size); snaptrace = h + 1; snaptrace_len = le32_to_cpu(h->snap_trace_len); @@ -3493,6 +3494,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 flush_tid; u32 caller_uid, caller_gid; u32 osd_epoch_barrier; + u32 pool_ns_len; /* version >= 5 */ ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad); /* version >= 6 */ @@ -3502,6 +3504,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_decode_32_safe(&p, end, caller_gid, bad); /* version >= 8 */ ceph_decode_32_safe(&p, end, pool_ns_len, bad); + if (pool_ns_len > 0) { + ceph_decode_need(&p, end, pool_ns_len, bad); + pool_ns = ceph_find_or_create_string(p, pool_ns_len); + p += pool_ns_len; + } } /* lookup ino */ @@ -3522,7 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap = ceph_get_cap(mdsc, NULL); cap->cap_ino = vino.ino; cap->queue_release = 1; - cap->cap_id = cap_id; + cap->cap_id = le64_to_cpu(h->cap_id); cap->mseq = mseq; cap->seq = seq; spin_lock(&session->s_cap_lock); @@ -3557,10 +3564,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, } handle_cap_import(mdsc, inode, h, peer, session, &cap, &issued); - handle_cap_grant(mdsc, inode, h, + handle_cap_grant(mdsc, inode, h, &pool_ns, inline_version, inline_data, inline_len, - msg->middle, session, cap, issued, - pool_ns_len); + msg->middle, session, cap, issued); if (realm) ceph_put_snap_realm(mdsc, realm); goto done_unlocked; @@ -3582,10 +3588,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_GRANT: __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, + handle_cap_grant(mdsc, inode, h, &pool_ns, inline_version, inline_data, inline_len, - msg->middle, session, cap, issued, - pool_ns_len); + msg->middle, session, cap, issued); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: @@ -3616,6 +3621,7 @@ done: mutex_unlock(&session->s_mutex); done_unlocked: iput(inode); + ceph_put_string(pool_ns); return; bad: diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index d035e0ab6029..dc032566ed71 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -447,7 +447,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL); - ci->i_pool_ns_len = 0; ci->i_fragtree = RB_ROOT; mutex_init(&ci->i_fragtree_mutex); @@ -571,7 +570,7 @@ void ceph_destroy_inode(struct inode *inode) if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); - ceph_put_string(ci->i_layout.pool_ns); + ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns)); call_rcu(&inode->i_rcu, ceph_i_callback); } @@ -736,6 +735,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, int issued = 0, implemented, new_issued; struct timespec mtime, atime, ctime; struct ceph_buffer *xattr_blob = NULL; + struct ceph_string *pool_ns = NULL; struct ceph_cap *new_cap = NULL; int err = 0; bool wake = false; @@ -763,6 +763,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, iinfo->xattr_len); } + if (iinfo->pool_ns_len > 0) + pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data, + iinfo->pool_ns_len); + spin_lock(&ci->i_ceph_lock); /* @@ -818,11 +822,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { s64 old_pool = ci->i_layout.pool_id; + struct ceph_string *old_ns; + ceph_file_layout_from_legacy(&ci->i_layout, &info->layout); - ci->i_pool_ns_len = iinfo->pool_ns_len; - if (ci->i_layout.pool_id != old_pool) + old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, + lockdep_is_held(&ci->i_ceph_lock)); + rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns); + + if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns) ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; + pool_ns = old_ns; + queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), le64_to_cpu(info->truncate_size), @@ -989,6 +1000,7 @@ out: ceph_put_cap(mdsc, new_cap); if (xattr_blob) ceph_buffer_put(xattr_blob); + ceph_put_string(pool_ns); return err; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 843dd31a02cd..6a30101b55ef 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -213,9 +213,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) ceph_ino(inode), dl.object_no); oloc.pool = ci->i_layout.pool_id; + oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); ceph_oid_printf(&oid, "%s", dl.object_name); r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid); + + ceph_oloc_destroy(&oloc); if (r < 0) { up_read(&osdc->lock); return r; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 2103b823bec0..46641bbc8056 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end, } else info->inline_version = CEPH_INLINE_NONE; + info->pool_ns_len = 0; + info->pool_ns_data = NULL; if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { ceph_decode_32_safe(p, end, info->pool_ns_len, bad); - ceph_decode_need(p, end, info->pool_ns_len, bad); - *p += info->pool_ns_len; - } else { - info->pool_ns_len = 0; + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } } return 0; @@ -2292,14 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - /* deny access to directories with pool_ns layouts */ - if (req->r_inode && S_ISDIR(req->r_inode->i_mode) && - ceph_inode(req->r_inode)->i_pool_ns_len) - return -EIO; - if (req->r_locked_dir && - ceph_inode(req->r_locked_dir)->i_pool_ns_len) - return -EIO; - /* issue */ mutex_lock(&mdsc->mutex); __register_request(mdsc, req, dir); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 75ecf967d0b7..2ce8e9f9bfc9 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -45,6 +45,7 @@ struct ceph_mds_reply_info_in { u32 inline_len; char *inline_data; u32 pool_ns_len; + char *pool_ns_data; }; struct ceph_mds_reply_dir_entry { @@ -277,6 +278,8 @@ struct ceph_pool_perm { struct rb_node node; int perm; s64 pool; + size_t pool_ns_len; + char pool_ns[]; }; /* diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 0168b49fb6ad..7ceab18c8ee2 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -287,7 +287,6 @@ struct ceph_inode_info { struct ceph_dir_layout i_dir_layout; struct ceph_file_layout i_layout; - size_t i_pool_ns_len; char *i_symlink; /* for dirs */ diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 5377c9c7a0c5..adc231892b0d 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -57,56 +57,69 @@ struct ceph_vxattr { static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) { - size_t s; - char *p = (char *)&ci->i_layout; - - for (s = 0; s < sizeof(ci->i_layout); s++, p++) - if (*p) - return true; - return false; + struct ceph_file_layout *fl = &ci->i_layout; + return (fl->stripe_unit > 0 || fl->stripe_count > 0 || + fl->object_size > 0 || fl->pool_id >= 0 || + rcu_dereference_raw(fl->pool_ns) != NULL); } static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, size_t size) { - int ret; struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; + struct ceph_string *pool_ns; s64 pool = ci->i_layout.pool_id; const char *pool_name; + const char *ns_field = " pool_namespace="; char buf[128]; + size_t len, total_len = 0; + int ret; + + pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); down_read(&osdc->lock); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); if (pool_name) { - size_t len = strlen(pool_name); - ret = snprintf(buf, sizeof(buf), + len = snprintf(buf, sizeof(buf), "stripe_unit=%u stripe_count=%u object_size=%u pool=", ci->i_layout.stripe_unit, ci->i_layout.stripe_count, ci->i_layout.object_size); - if (!size) { - ret += len; - } else if (ret + len > size) { - ret = -ERANGE; - } else { - memcpy(val, buf, ret); - memcpy(val + ret, pool_name, len); - ret += len; - } + total_len = len + strlen(pool_name); } else { - ret = snprintf(buf, sizeof(buf), + len = snprintf(buf, sizeof(buf), "stripe_unit=%u stripe_count=%u object_size=%u pool=%lld", ci->i_layout.stripe_unit, ci->i_layout.stripe_count, ci->i_layout.object_size, (unsigned long long)pool); - if (size) { - if (ret <= size) - memcpy(val, buf, ret); - else - ret = -ERANGE; + total_len = len; + } + + if (pool_ns) + total_len += strlen(ns_field) + pool_ns->len; + + if (!size) { + ret = total_len; + } else if (total_len > size) { + ret = -ERANGE; + } else { + memcpy(val, buf, len); + ret = len; + if (pool_name) { + len = strlen(pool_name); + memcpy(val + ret, pool_name, len); + ret += len; + } + if (pool_ns) { + len = strlen(ns_field); + memcpy(val + ret, ns_field, len); + ret += len; + memcpy(val + ret, pool_ns->str, pool_ns->len); + ret += pool_ns->len; } } up_read(&osdc->lock); + ceph_put_string(pool_ns); return ret; } @@ -147,6 +160,18 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, return ret; } +static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci, + char *val, size_t size) +{ + int ret = 0; + struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns); + if (ns) { + ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str); + ceph_put_string(ns); + } + return ret; +} + /* directories */ static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, @@ -235,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { XATTR_LAYOUT_FIELD(dir, layout, stripe_count), XATTR_LAYOUT_FIELD(dir, layout, object_size), XATTR_LAYOUT_FIELD(dir, layout, pool), + XATTR_LAYOUT_FIELD(dir, layout, pool_namespace), XATTR_NAME_CEPH(dir, entries), XATTR_NAME_CEPH(dir, files), XATTR_NAME_CEPH(dir, subdirs), @@ -262,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { XATTR_LAYOUT_FIELD(file, layout, stripe_count), XATTR_LAYOUT_FIELD(file, layout, object_size), XATTR_LAYOUT_FIELD(file, layout, pool), + XATTR_LAYOUT_FIELD(file, layout, pool_namespace), { .name = NULL, 0 } /* Required table terminator */ }; static size_t ceph_file_vxattrs_name_size; /* total size of all names */ -- cgit v1.2.3 From a22bd5ffae2d22c054c832fe0d60976ed9e4a49d Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Thu, 26 May 2016 10:30:13 +0800 Subject: ceph: set user pages dirty after direct IO read Signed-off-by: Yan, Zheng --- fs/ceph/file.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/file.c b/fs/ceph/file.c index cba5dcf49a65..ac75fa9fd858 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -708,7 +708,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) } } - ceph_put_page_vector(osd_data->pages, num_pages, false); + ceph_put_page_vector(osd_data->pages, num_pages, !aio_req->write); ceph_osdc_put_request(req); if (rc < 0) @@ -964,7 +964,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, len = ret; } - ceph_put_page_vector(pages, num_pages, false); + ceph_put_page_vector(pages, num_pages, !write); ceph_osdc_put_request(req); if (ret < 0) -- cgit v1.2.3 From 774a6a118c70f8c11fcfe636032b5016ad71a746 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Mon, 6 Jun 2016 16:01:39 +0800 Subject: ceph: reduce i_nr_by_mode array size Track usage count for individual fmode bit. This can reduce the array size by half. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 45 ++++++++++++++++++++++++++++++-------------- fs/ceph/inode.c | 2 +- fs/ceph/ioctl.c | 3 +-- fs/ceph/super.h | 7 ++----- include/linux/ceph/ceph_fs.h | 2 +- 5 files changed, 36 insertions(+), 23 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0a9406a8a794..a08d245f16f5 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -849,12 +849,14 @@ int __ceph_caps_used(struct ceph_inode_info *ci) */ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) { - int want = 0; - int mode; - for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++) - if (ci->i_nr_by_mode[mode]) - want |= ceph_caps_for_mode(mode); - return want; + int i, bits = 0; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (ci->i_nr_by_mode[i]) + bits |= 1 << i; + } + if (bits == 0) + return 0; + return ceph_caps_for_mode(bits >> 1); } /* @@ -3682,6 +3684,16 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) dout("flush_dirty_caps done\n"); } +void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) +{ + int i; + int bits = (fmode << 1) | 1; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (bits & (1 << i)) + ci->i_nr_by_mode[i]++; + } +} + /* * Drop open file reference. If we were the last open file, * we may need to release capabilities to the MDS (or schedule @@ -3689,15 +3701,20 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) */ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) { - struct inode *inode = &ci->vfs_inode; - int last = 0; - + int i, last = 0; + int bits = (fmode << 1) | 1; spin_lock(&ci->i_ceph_lock); - dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode, - ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1); - BUG_ON(ci->i_nr_by_mode[fmode] == 0); - if (--ci->i_nr_by_mode[fmode] == 0) - last++; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (bits & (1 << i)) { + BUG_ON(ci->i_nr_by_mode[i] == 0); + if (--ci->i_nr_by_mode[i] == 0) + last++; + } + } + dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", + &ci->vfs_inode, fmode, + ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], + ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); spin_unlock(&ci->i_ceph_lock); if (last && ci->i_vino.snap == CEPH_NOSNAP) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index dc032566ed71..8ca843371d4b 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -477,7 +477,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_head_snapc = NULL; ci->i_snap_caps = 0; - for (i = 0; i < CEPH_FILE_MODE_NUM; i++) + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) ci->i_nr_by_mode[i] = 0; mutex_init(&ci->i_truncate_mutex); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 6a30101b55ef..7d752d53353a 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -250,9 +250,8 @@ static long ceph_ioctl_lazyio(struct file *file) if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) { spin_lock(&ci->i_ceph_lock); - ci->i_nr_by_mode[fi->fmode]--; fi->fmode |= CEPH_FILE_MODE_LAZY; - ci->i_nr_by_mode[fi->fmode]++; + ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++; spin_unlock(&ci->i_ceph_lock); dout("ioctl_layzio: file %p marked lazy\n", file); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 7ceab18c8ee2..50846e6f6a8c 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -321,7 +321,7 @@ struct ceph_inode_info { dirty|flushing caps */ unsigned i_snap_caps; /* cap bits for snapped files */ - int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ + int i_nr_by_mode[CEPH_FILE_MODE_BITS]; /* open file counts */ struct mutex i_truncate_mutex; u32 i_truncate_seq; /* last truncate to smaller size */ @@ -906,10 +906,7 @@ extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page); /* for counting open files by mode */ -static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) -{ - ci->i_nr_by_mode[mode]++; -} +extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode); /* addr.c */ diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 08b8fd72261e..6bc7730d22ac 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -525,7 +525,7 @@ struct ceph_filelock { #define CEPH_FILE_MODE_WR 2 #define CEPH_FILE_MODE_RDWR 3 /* RD | WR */ #define CEPH_FILE_MODE_LAZY 4 /* lazy io */ -#define CEPH_FILE_MODE_NUM 8 /* bc these are bit fields.. mostly */ +#define CEPH_FILE_MODE_BITS 4 int ceph_flags_to_mode(int flags); -- cgit v1.2.3 From fc8c3892f30c39f28fdb835f7c8598ac4cf5ed1e Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Tue, 14 Jun 2016 11:13:59 +0800 Subject: ceph: fix use-after-free bug in ceph_direct_read_write() ceph_aio_complete() can free the ceph_aio_request struct before the code exits the while loop. Signed-off-by: Yan, Zheng --- fs/ceph/file.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/file.c b/fs/ceph/file.c index ac75fa9fd858..033e88753875 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -985,6 +985,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, } if (aio_req) { + LIST_HEAD(osd_reqs); + if (aio_req->num_reqs == 0) { kfree(aio_req); return ret; @@ -993,8 +995,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR : CEPH_CAP_FILE_RD); - while (!list_empty(&aio_req->osd_reqs)) { - req = list_first_entry(&aio_req->osd_reqs, + list_splice(&aio_req->osd_reqs, &osd_reqs); + while (!list_empty(&osd_reqs)) { + req = list_first_entry(&osd_reqs, struct ceph_osd_request, r_unsafe_item); list_del_init(&req->r_unsafe_item); -- cgit v1.2.3 From 9a5530c63889ac928a45c4645ab0bc23b4fbfcb8 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Wed, 15 Jun 2016 16:29:18 +0800 Subject: ceph: wait unsafe sync writes for evicting inode Otherwise ceph_sync_write_unsafe() may access/modify freed inode. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 50 ++------------------------------------------------ fs/ceph/file.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ fs/ceph/inode.c | 8 ++++++++ fs/ceph/super.c | 1 + fs/ceph/super.h | 2 ++ 5 files changed, 61 insertions(+), 48 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index a08d245f16f5..1e48377f18a7 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1927,53 +1927,6 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid) return ret; } -/* - * Wait on any unsafe replies for the given inode. First wait on the - * newest request, and make that the upper bound. Then, if there are - * more requests, keep waiting on the oldest as long as it is still older - * than the original request. - */ -static void sync_write_wait(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_writes; - struct ceph_osd_request *req; - u64 last_tid; - - if (!S_ISREG(inode->i_mode)) - return; - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - /* set upper bound as _last_ entry in chain */ - req = list_last_entry(head, struct ceph_osd_request, - r_unsafe_item); - last_tid = req->r_tid; - - do { - ceph_osdc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - dout("sync_write_wait on tid %llu (until %llu)\n", - req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); - spin_lock(&ci->i_unsafe_lock); - ceph_osdc_put_request(req); - - /* - * from here on look at first entry in chain, since we - * only want to wait for anything older than last_tid - */ - if (list_empty(head)) - break; - req = list_first_entry(head, struct ceph_osd_request, - r_unsafe_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); -} - /* * wait for any unsafe requests to complete. */ @@ -2026,7 +1979,8 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int dirty; dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); - sync_write_wait(inode); + + ceph_sync_write_wait(inode); ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 033e88753875..7f2ef262cdf7 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -821,6 +821,54 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } +/* + * Wait on any unsafe replies for the given inode. First wait on the + * newest request, and make that the upper bound. Then, if there are + * more requests, keep waiting on the oldest as long as it is still older + * than the original request. + */ +void ceph_sync_write_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct list_head *head = &ci->i_unsafe_writes; + struct ceph_osd_request *req; + u64 last_tid; + + if (!S_ISREG(inode->i_mode)) + return; + + spin_lock(&ci->i_unsafe_lock); + if (list_empty(head)) + goto out; + + /* set upper bound as _last_ entry in chain */ + + req = list_last_entry(head, struct ceph_osd_request, + r_unsafe_item); + last_tid = req->r_tid; + + do { + ceph_osdc_get_request(req); + spin_unlock(&ci->i_unsafe_lock); + + dout("sync_write_wait on tid %llu (until %llu)\n", + req->r_tid, last_tid); + wait_for_completion(&req->r_safe_completion); + ceph_osdc_put_request(req); + + spin_lock(&ci->i_unsafe_lock); + /* + * from here on look at first entry in chain, since we + * only want to wait for anything older than last_tid + */ + if (list_empty(head)) + break; + req = list_first_entry(head, struct ceph_osd_request, + r_unsafe_item); + } while (req->r_tid < last_tid); +out: + spin_unlock(&ci->i_unsafe_lock); +} static ssize_t ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 8ca843371d4b..6e16269277bd 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -585,6 +585,14 @@ int ceph_drop_inode(struct inode *inode) return 1; } +void ceph_evict_inode(struct inode *inode) +{ + /* wait unsafe sync writes */ + ceph_sync_write_wait(inode); + truncate_inode_pages_final(&inode->i_data); + clear_inode(inode); +} + static inline blkcnt_t calc_inode_blocks(u64 size) { return (size + (1<<9) - 1) >> 9; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 91e02481ce06..a5b2275e1573 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -731,6 +731,7 @@ static const struct super_operations ceph_super_ops = { .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, .drop_inode = ceph_drop_inode, + .evict_inode = ceph_evict_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 50846e6f6a8c..d5b9077467a4 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -749,6 +749,7 @@ extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); extern void ceph_destroy_inode(struct inode *inode); extern int ceph_drop_inode(struct inode *inode); +extern void ceph_evict_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); @@ -927,6 +928,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); +extern void ceph_sync_write_wait(struct inode *inode); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct file_operations ceph_snapdir_fops; -- cgit v1.2.3 From fce8515741dfb6a233927262555295788ad22ca7 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Wed, 15 Jun 2016 20:51:22 +0800 Subject: ceph: fix NULL dereference in ceph_queue_cap_snap() old_snapc->seq is used in dout(...) Signed-off-by: Yan, Zheng --- fs/ceph/snap.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs/ceph') diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 9caaa7ffc93f..eadf2c33edc6 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -551,7 +551,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ci->i_wrbuffer_ref_head = 0; capsnap->context = old_snapc; list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); - old_snapc = NULL; if (used & CEPH_CAP_FILE_WR) { dout("queue_cap_snap %p cap_snap %p snapc %p" @@ -563,6 +562,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) __ceph_finish_cap_snap(ci, capsnap); } capsnap = NULL; + old_snapc = NULL; update_snapc: if (ci->i_head_snapc) { -- cgit v1.2.3 From 679f0b825d84f8c9a618730b00ae816976bc240f Mon Sep 17 00:00:00 2001 From: Colin Ian King Date: Thu, 23 Jun 2016 14:45:24 +0100 Subject: ceph: fix spelling mistake: "resgister" -> "register" trivial fix to spelling mistake in pr_err message Signed-off-by: Colin Ian King --- fs/ceph/cache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs/ceph') diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 238c55b01723..5bc5d37b1217 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -71,7 +71,7 @@ int ceph_fscache_register_fs(struct ceph_fs_client* fsc) &ceph_fscache_fsid_object_def, fsc, true); if (!fsc->fscache) - pr_err("Unable to resgister fsid: %p fscache cookie", fsc); + pr_err("Unable to register fsid: %p fscache cookie\n", fsc); return 0; } -- cgit v1.2.3 From 9b16f03c474d05b16cbd9eed1ec335c6e71cb57b Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Wed, 22 Jun 2016 16:35:04 +0200 Subject: ceph: don't use ->d_time Pretty simple: just use ceph_dentry_info.time instead (which was already there, unused). Signed-off-by: Miklos Szeredi --- fs/ceph/dir.c | 6 +++--- fs/ceph/inode.c | 4 ++-- fs/ceph/mds_client.c | 4 ++-- fs/ceph/super.h | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 6e0fedf6713b..8ff7bcc7fc88 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -59,7 +59,7 @@ int ceph_init_dentry(struct dentry *dentry) di->dentry = dentry; di->lease_session = NULL; - dentry->d_time = jiffies; + di->time = jiffies; /* avoid reordering d_fsdata setup so that the check above is safe */ smp_mb(); dentry->d_fsdata = di; @@ -1124,7 +1124,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, void ceph_invalidate_dentry_lease(struct dentry *dentry) { spin_lock(&dentry->d_lock); - dentry->d_time = jiffies; + ceph_dentry(dentry)->time = jiffies; ceph_dentry(dentry)->lease_shared_gen = 0; spin_unlock(&dentry->d_lock); } @@ -1154,7 +1154,7 @@ static int dentry_lease_is_valid(struct dentry *dentry) spin_unlock(&s->s_gen_ttl_lock); if (di->lease_gen == gen && - time_before(jiffies, dentry->d_time) && + time_before(jiffies, di->time) && time_before(jiffies, ttl)) { valid = 1; if (di->lease_renew_after && diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6e16269277bd..a38b768bc158 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1042,7 +1042,7 @@ static void update_dentry_lease(struct dentry *dentry, goto out_unlock; if (di->lease_gen == session->s_cap_gen && - time_before(ttl, dentry->d_time)) + time_before(ttl, di->time)) goto out_unlock; /* we already have a newer lease. */ if (di->lease_session && di->lease_session != session) @@ -1056,7 +1056,7 @@ static void update_dentry_lease(struct dentry *dentry, di->lease_seq = le32_to_cpu(lease->seq); di->lease_renew_after = half_ttl; di->lease_renew_from = 0; - dentry->d_time = ttl; + di->time = ttl; out_unlock: spin_unlock(&dentry->d_lock); return; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 46641bbc8056..0d4bb24c670a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3226,7 +3226,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, msecs_to_jiffies(le32_to_cpu(h->duration_ms)); di->lease_seq = seq; - dentry->d_time = di->lease_renew_from + duration; + di->time = di->lease_renew_from + duration; di->lease_renew_after = di->lease_renew_from + (duration >> 1); di->lease_renew_from = 0; @@ -3311,7 +3311,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, if (!di || !di->lease_session || di->lease_session->s_mds < 0 || di->lease_gen != di->lease_session->s_cap_gen || - !time_before(jiffies, dentry->d_time)) { + !time_before(jiffies, di->time)) { dout("lease_release inode %p dentry %p -- " "no lease\n", inode, dentry); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index d5b9077467a4..5aa3158e8611 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -246,7 +246,7 @@ struct ceph_dentry_info { unsigned long lease_renew_after, lease_renew_from; struct list_head lru; struct dentry *dentry; - u64 time; + unsigned long time; u64 offset; }; -- cgit v1.2.3 From 8aa152c77890abd0731f119e4e6662375503e288 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 1 Jul 2016 09:39:20 -0400 Subject: ceph: remove ceph_mdsc_lease_release Nothing calls it. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng --- fs/ceph/mds_client.c | 41 ----------------------------------------- fs/ceph/mds_client.h | 4 ---- 2 files changed, 45 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 0d4bb24c670a..78a3495a11be 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3291,47 +3291,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, ceph_con_send(&session->s_con, msg); } -/* - * Preemptively release a lease we expect to invalidate anyway. - * Pass @inode always, @dentry is optional. - */ -void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, - struct dentry *dentry) -{ - struct ceph_dentry_info *di; - struct ceph_mds_session *session; - u32 seq; - - BUG_ON(inode == NULL); - BUG_ON(dentry == NULL); - - /* is dentry lease valid? */ - spin_lock(&dentry->d_lock); - di = ceph_dentry(dentry); - if (!di || !di->lease_session || - di->lease_session->s_mds < 0 || - di->lease_gen != di->lease_session->s_cap_gen || - !time_before(jiffies, di->time)) { - dout("lease_release inode %p dentry %p -- " - "no lease\n", - inode, dentry); - spin_unlock(&dentry->d_lock); - return; - } - - /* we do have a lease on this dentry; note mds and seq */ - session = ceph_get_mds_session(di->lease_session); - seq = di->lease_seq; - __ceph_mdsc_drop_dentry_lease(dentry); - spin_unlock(&dentry->d_lock); - - dout("lease_release inode %p dentry %p to mds%d\n", - inode, dentry, session->s_mds); - ceph_mdsc_lease_send_msg(session, inode, dentry, - CEPH_MDS_LEASE_RELEASE, seq); - ceph_put_mds_session(session); -} - /* * drop all leases (and dentry refs) in preparation for umount */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 2ce8e9f9bfc9..9dd2c82379f8 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -385,10 +385,6 @@ extern void ceph_mdsc_destroy(struct ceph_fs_client *fsc); extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc); -extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, - struct inode *inode, - struct dentry *dn); - extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, struct inode *dir); -- cgit v1.2.3 From 5b484a513149f53613d376a9d1cd0391de099fb4 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 1 Jul 2016 09:39:20 -0400 Subject: ceph: clear d_fsinfo pointer under d_lock To check for a valid dentry lease, we need to get at the ceph_dentry_info. Under rcuwalk though, we may end up with a dentry that is on its way to destruction. Since we need to take the d_lock in dentry_lease_is_valid already, we can just ensure that we clear the d_fsinfo pointer out under the same lock before destroying it. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng --- fs/ceph/dir.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'fs/ceph') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 8ff7bcc7fc88..904dc671580f 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1286,10 +1286,14 @@ static void ceph_d_release(struct dentry *dentry) dout("d_release %p\n", dentry); ceph_dentry_lru_del(dentry); + + spin_lock(&dentry->d_lock); + dentry->d_fsdata = NULL; + spin_unlock(&dentry->d_lock); + if (di->lease_session) ceph_put_mds_session(di->lease_session); kmem_cache_free(ceph_dentry_cachep, di); - dentry->d_fsdata = NULL; } static int ceph_snapdir_d_revalidate(struct dentry *dentry, -- cgit v1.2.3 From 14fb9c9efe3570459b6bddd76a990140917237ad Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 1 Jul 2016 09:39:20 -0400 Subject: ceph: allow dentry_lease_is_valid to work under RCU walk Under rcuwalk, we need to take extra care when dereferencing d_parent. We want to do that once and pass a pointer to dentry_lease_is_valid. Also, we must ensure that that function can handle the case where we're racing with d_release. Check whether "di" is NULL under the d_lock, and just return 0 if so. Finally, we still need to kick off a renewal job if the lease is getting close to expiration. If that's the case, then just drop out of rcuwalk mode since that could block. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng --- fs/ceph/dir.c | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 904dc671580f..60dcca163ba0 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1133,7 +1133,8 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry) * Check if dentry lease is valid. If not, delete the lease. Try to * renew if the least is more than half up. */ -static int dentry_lease_is_valid(struct dentry *dentry) +static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, + struct inode *dir) { struct ceph_dentry_info *di; struct ceph_mds_session *s; @@ -1141,12 +1142,11 @@ static int dentry_lease_is_valid(struct dentry *dentry) u32 gen; unsigned long ttl; struct ceph_mds_session *session = NULL; - struct inode *dir = NULL; u32 seq = 0; spin_lock(&dentry->d_lock); di = ceph_dentry(dentry); - if (di->lease_session) { + if (di && di->lease_session) { s = di->lease_session; spin_lock(&s->s_gen_ttl_lock); gen = s->s_cap_gen; @@ -1159,12 +1159,19 @@ static int dentry_lease_is_valid(struct dentry *dentry) valid = 1; if (di->lease_renew_after && time_after(jiffies, di->lease_renew_after)) { - /* we should renew */ - dir = d_inode(dentry->d_parent); - session = ceph_get_mds_session(s); - seq = di->lease_seq; - di->lease_renew_after = 0; - di->lease_renew_from = jiffies; + /* + * We should renew. If we're in RCU walk mode + * though, we can't do that so just return + * -ECHILD. + */ + if (flags & LOOKUP_RCU) { + valid = -ECHILD; + } else { + session = ceph_get_mds_session(s); + seq = di->lease_seq; + di->lease_renew_after = 0; + di->lease_renew_from = jiffies; + } } } } @@ -1224,12 +1231,16 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) } else if (d_really_is_positive(dentry) && ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) { valid = 1; - } else if (dentry_lease_is_valid(dentry) || - dir_lease_is_valid(dir, dentry)) { - if (d_really_is_positive(dentry)) - valid = ceph_is_any_caps(d_inode(dentry)); - else - valid = 1; + } else { + valid = dentry_lease_is_valid(dentry, flags, dir); + if (valid == -ECHILD) + return valid; + if (valid || dir_lease_is_valid(dir, dentry)) { + if (d_really_is_positive(dentry)) + valid = ceph_is_any_caps(d_inode(dentry)); + else + valid = 1; + } } if (!valid) { -- cgit v1.2.3 From f49d1e058d23a5fabeb1499be739af8b3db52164 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 1 Jul 2016 09:39:21 -0400 Subject: ceph: handle LOOKUP_RCU in ceph_d_revalidate We can now handle the snapshot cases under RCU, as well as the non-snapshot case when we don't need to queue up a lease renewal allow LOOKUP_RCU walks to proceed under those conditions. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng --- fs/ceph/dir.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 60dcca163ba0..c64a0b794d49 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1214,15 +1214,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) struct dentry *parent; struct inode *dir; - if (flags & LOOKUP_RCU) - return -ECHILD; + if (flags & LOOKUP_RCU) { + parent = ACCESS_ONCE(dentry->d_parent); + dir = d_inode_rcu(parent); + if (!dir) + return -ECHILD; + } else { + parent = dget_parent(dentry); + dir = d_inode(parent); + } dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry, dentry, d_inode(dentry), ceph_dentry(dentry)->offset); - parent = dget_parent(dentry); - dir = d_inode(parent); - /* always trust cached snapped dentries, snapdir dentry */ if (ceph_snap(dir) != CEPH_NOSNAP) { dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry, @@ -1249,6 +1253,9 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) struct ceph_mds_request *req; int op, mask, err; + if (flags & LOOKUP_RCU) + return -ECHILD; + op = ceph_snap(dir) == CEPH_SNAPDIR ? CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); @@ -1284,7 +1291,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) ceph_dir_clear_complete(dir); } - dput(parent); + if (!(flags & LOOKUP_RCU)) + dput(parent); return valid; } -- cgit v1.2.3 From 430afbadd6c885557ef2fb8c454bd5bba23a9850 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Fri, 8 Jul 2016 11:25:38 +0800 Subject: ceph: mount non-default filesystem by name To mount non-default filesytem, user currently needs to provide mds namespace ID. This is inconvenience. This patch makes user be able to mount filesystem by name. If user wants to mount non-default filesystem. Client first subscribes to fsmap.user. Subscribe to mdsmap. after getting ID of filesystem. Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++-- fs/ceph/mds_client.h | 7 +++-- fs/ceph/super.c | 38 +++++++++++++++-------- fs/ceph/super.h | 2 +- 4 files changed, 117 insertions(+), 17 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 78a3495a11be..e555745883da 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2166,6 +2166,11 @@ static int __do_request(struct ceph_mds_client *mdsc, mds = __choose_mds(mdsc, req); if (mds < 0 || ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { + if (mdsc->mdsmap_err) { + err = mdsc->mdsmap_err; + dout("do_request mdsmap err %d\n", err); + goto finish; + } dout("do_request no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); goto out; @@ -3683,11 +3688,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) dout("mdsc_destroy %p done\n", mdsc); } +void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +{ + struct ceph_fs_client *fsc = mdsc->fsc; + const char *mds_namespace = fsc->mount_options->mds_namespace; + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; + u32 epoch; + u32 map_len; + u32 num_fs; + u32 mount_fscid = (u32)-1; + u8 struct_v, struct_cv; + int err = -EINVAL; + + ceph_decode_need(&p, end, sizeof(u32), bad); + epoch = ceph_decode_32(&p); + + dout("handle_fsmap epoch %u\n", epoch); + + ceph_decode_need(&p, end, 2 + sizeof(u32), bad); + struct_v = ceph_decode_8(&p); + struct_cv = ceph_decode_8(&p); + map_len = ceph_decode_32(&p); + + ceph_decode_need(&p, end, sizeof(u32) * 3, bad); + p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ + + num_fs = ceph_decode_32(&p); + while (num_fs-- > 0) { + void *info_p, *info_end; + u32 info_len; + u8 info_v, info_cv; + u32 fscid, namelen; + + ceph_decode_need(&p, end, 2 + sizeof(u32), bad); + info_v = ceph_decode_8(&p); + info_cv = ceph_decode_8(&p); + info_len = ceph_decode_32(&p); + ceph_decode_need(&p, end, info_len, bad); + info_p = p; + info_end = p + info_len; + p = info_end; + + ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); + fscid = ceph_decode_32(&info_p); + namelen = ceph_decode_32(&info_p); + ceph_decode_need(&info_p, info_end, namelen, bad); + + if (mds_namespace && + strlen(mds_namespace) == namelen && + !strncmp(mds_namespace, (char *)info_p, namelen)) { + mount_fscid = fscid; + break; + } + } + + ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); + if (mount_fscid != (u32)-1) { + fsc->client->monc.fs_cluster_id = mount_fscid; + ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, + 0, true); + ceph_monc_renew_subs(&fsc->client->monc); + } else { + err = -ENOENT; + goto err_out; + } + return; +bad: + pr_err("error decoding fsmap\n"); +err_out: + mutex_lock(&mdsc->mutex); + mdsc->mdsmap_err = -ENOENT; + __wake_requests(mdsc, &mdsc->waiting_for_map); + mutex_unlock(&mdsc->mutex); + return; +} /* * handle mds map update. */ -void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { u32 epoch; u32 maplen; @@ -3794,7 +3874,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) switch (type) { case CEPH_MSG_MDS_MAP: - ceph_mdsc_handle_map(mdsc, msg); + ceph_mdsc_handle_mdsmap(mdsc, msg); + break; + case CEPH_MSG_FS_MAP_USER: + ceph_mdsc_handle_fsmap(mdsc, msg); break; case CEPH_MSG_CLIENT_SESSION: handle_session(s, msg); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 9dd2c82379f8..3c154b8d49bf 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -293,6 +293,7 @@ struct ceph_mds_client { struct completion safe_umount_waiters; wait_queue_head_t session_close_wq; struct list_head waiting_for_map; + int mdsmap_err; struct ceph_mds_session **sessions; /* NULL for mds if no session */ atomic_t num_sessions; @@ -419,8 +420,10 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, struct dentry *dentry, char action, u32 seq); -extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, - struct ceph_msg *msg); +extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, + struct ceph_msg *msg); +extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, + struct ceph_msg *msg); extern struct ceph_mds_session * ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index a5b2275e1573..7736a931376e 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -108,7 +108,6 @@ static int ceph_sync_fs(struct super_block *sb, int wait) * mount options */ enum { - Opt_mds_namespace, Opt_wsize, Opt_rsize, Opt_rasize, @@ -121,6 +120,7 @@ enum { Opt_last_int, /* int args above */ Opt_snapdirname, + Opt_mds_namespace, Opt_last_string, /* string args above */ Opt_dirstat, @@ -144,7 +144,6 @@ enum { }; static match_table_t fsopt_tokens = { - {Opt_mds_namespace, "mds_namespace=%d"}, {Opt_wsize, "wsize=%d"}, {Opt_rsize, "rsize=%d"}, {Opt_rasize, "rasize=%d"}, @@ -156,6 +155,7 @@ static match_table_t fsopt_tokens = { {Opt_congestion_kb, "write_congestion_kb=%d"}, /* int args above */ {Opt_snapdirname, "snapdirname=%s"}, + {Opt_mds_namespace, "mds_namespace=%s"}, /* string args above */ {Opt_dirstat, "dirstat"}, {Opt_nodirstat, "nodirstat"}, @@ -212,11 +212,14 @@ static int parse_fsopt_token(char *c, void *private) if (!fsopt->snapdir_name) return -ENOMEM; break; - - /* misc */ case Opt_mds_namespace: - fsopt->mds_namespace = intval; + fsopt->mds_namespace = kstrndup(argstr[0].from, + argstr[0].to-argstr[0].from, + GFP_KERNEL); + if (!fsopt->mds_namespace) + return -ENOMEM; break; + /* misc */ case Opt_wsize: fsopt->wsize = intval; break; @@ -302,6 +305,7 @@ static void destroy_mount_options(struct ceph_mount_options *args) { dout("destroy_mount_options %p\n", args); kfree(args->snapdir_name); + kfree(args->mds_namespace); kfree(args->server_path); kfree(args); } @@ -331,6 +335,9 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt, return ret; ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name); + if (ret) + return ret; + ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace); if (ret) return ret; @@ -376,7 +383,6 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; fsopt->congestion_kb = default_congestion_kb(); - fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE; /* * Distinguish the server list from the path in "dev_name". @@ -469,8 +475,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",noacl"); #endif - if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE) - seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace); + if (fsopt->mds_namespace) + seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace); if (fsopt->wsize) seq_printf(m, ",wsize=%d", fsopt->wsize); if (fsopt->rsize != CEPH_RSIZE_DEFAULT) @@ -509,9 +515,11 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg) switch (type) { case CEPH_MSG_MDS_MAP: - ceph_mdsc_handle_map(fsc->mdsc, msg); + ceph_mdsc_handle_mdsmap(fsc->mdsc, msg); + return 0; + case CEPH_MSG_FS_MAP_USER: + ceph_mdsc_handle_fsmap(fsc->mdsc, msg); return 0; - default: return -1; } @@ -543,8 +551,14 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, goto fail; } fsc->client->extra_mon_dispatch = extra_mon_dispatch; - fsc->client->monc.fs_cluster_id = fsopt->mds_namespace; - ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true); + + if (fsopt->mds_namespace == NULL) { + ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, + 0, true); + } else { + ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP, + 0, false); + } fsc->mount_options = fsopt; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 5aa3158e8611..9e82e29f86a1 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -62,7 +62,6 @@ struct ceph_mount_options { int cap_release_safety; int max_readdir; /* max readdir result (entires) */ int max_readdir_bytes; /* max readdir result (bytes) */ - int mds_namespace; /* * everything above this point can be memcmp'd; everything below @@ -70,6 +69,7 @@ struct ceph_mount_options { */ char *snapdir_name; /* default ".snap" */ + char *mds_namespace; /* default NULL */ char *server_path; /* default "/" */ }; -- cgit v1.2.3 From 121f22a19a30574abff68f2a22716c34cfae0b05 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Mon, 4 Jul 2016 22:05:18 +0800 Subject: ceph: update cap reconnect message to version 3 Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 68 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 47 insertions(+), 21 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index e555745883da..131a61f17d8a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -48,7 +48,7 @@ struct ceph_reconnect_state { int nr_caps; struct ceph_pagelist *pagelist; - bool flock; + unsigned msg_version; }; static void __wake_requests(struct ceph_mds_client *mdsc, @@ -2791,7 +2791,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, struct ceph_mds_cap_reconnect v2; struct ceph_mds_cap_reconnect_v1 v1; } rec; - size_t reclen; struct ceph_inode_info *ci; struct ceph_reconnect_state *recon_state = arg; struct ceph_pagelist *pagelist = recon_state->pagelist; @@ -2820,9 +2819,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, path = NULL; pathlen = 0; } - err = ceph_pagelist_encode_string(pagelist, path, pathlen); - if (err) - goto out_free; spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ @@ -2830,14 +2826,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, cap->mseq = 0; /* and migrate_seq */ cap->cap_gen = cap->session->s_cap_gen; - if (recon_state->flock) { + if (recon_state->msg_version >= 2) { rec.v2.cap_id = cpu_to_le64(cap->cap_id); rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); rec.v2.issued = cpu_to_le32(cap->issued); rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v2.pathbase = cpu_to_le64(pathbase); rec.v2.flock_len = 0; - reclen = sizeof(rec.v2); } else { rec.v1.cap_id = cpu_to_le64(cap->cap_id); rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); @@ -2847,13 +2842,14 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v1.pathbase = cpu_to_le64(pathbase); - reclen = sizeof(rec.v1); } spin_unlock(&ci->i_ceph_lock); - if (recon_state->flock) { + if (recon_state->msg_version >= 2) { int num_fcntl_locks, num_flock_locks; struct ceph_filelock *flocks; + size_t struct_len, total_len = 0; + u8 struct_v = 0; encode_again: ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); @@ -2872,20 +2868,46 @@ encode_again: goto encode_again; goto out_free; } + + if (recon_state->msg_version >= 3) { + /* version, compat_version and struct_len */ + total_len = 2 * sizeof(u8) + sizeof(u32); + struct_v = 1; + } /* * number of encoded locks is stable, so copy to pagelist */ - rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) + - (num_fcntl_locks+num_flock_locks) * - sizeof(struct ceph_filelock)); - err = ceph_pagelist_append(pagelist, &rec, reclen); - if (!err) - err = ceph_locks_to_pagelist(flocks, pagelist, - num_fcntl_locks, - num_flock_locks); + struct_len = 2 * sizeof(u32) + + (num_fcntl_locks + num_flock_locks) * + sizeof(struct ceph_filelock); + rec.v2.flock_len = cpu_to_le32(struct_len); + + struct_len += sizeof(rec.v2); + struct_len += sizeof(u32) + pathlen; + + total_len += struct_len; + err = ceph_pagelist_reserve(pagelist, total_len); + + if (!err) { + if (recon_state->msg_version >= 3) { + ceph_pagelist_encode_8(pagelist, struct_v); + ceph_pagelist_encode_8(pagelist, 1); + ceph_pagelist_encode_32(pagelist, struct_len); + } + ceph_pagelist_encode_string(pagelist, path, pathlen); + ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); + ceph_locks_to_pagelist(flocks, pagelist, + num_fcntl_locks, + num_flock_locks); + } kfree(flocks); } else { - err = ceph_pagelist_append(pagelist, &rec, reclen); + size_t size = sizeof(u32) + pathlen + sizeof(rec.v1); + err = ceph_pagelist_reserve(pagelist, size); + if (!err) { + ceph_pagelist_encode_string(pagelist, path, pathlen); + ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); + } } recon_state->nr_caps++; @@ -2976,7 +2998,12 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, recon_state.nr_caps = 0; recon_state.pagelist = pagelist; - recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; + if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) + recon_state.msg_version = 3; + else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK) + recon_state.msg_version = 2; + else + recon_state.msg_version = 1; err = iterate_session_caps(session, encode_caps_cb, &recon_state); if (err < 0) goto fail; @@ -3005,8 +3032,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, goto fail; } - if (recon_state.flock) - reply->hdr.version = cpu_to_le16(2); + reply->hdr.version = cpu_to_le16(recon_state.msg_version); /* raced with cap release? */ if (s_nr_caps != recon_state.nr_caps) { -- cgit v1.2.3 From 3469ed0d149ba7066e8fef72be55f67ee7de196d Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Tue, 5 Jul 2016 09:32:31 +0800 Subject: ceph: include 'follows' of pending snapflush in cap reconnect message This helps the recovering MDS to reconstruct the internal states that tracking pending snapflush. Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'fs/ceph') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 131a61f17d8a..bcf20344d904 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2797,6 +2797,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, char *path; int pathlen, err; u64 pathbase; + u64 snap_follows; struct dentry *dentry; ci = cap->ci; @@ -2843,6 +2844,15 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v1.pathbase = cpu_to_le64(pathbase); } + + if (list_empty(&ci->i_cap_snaps)) { + snap_follows = 0; + } else { + struct ceph_cap_snap *capsnap = + list_first_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, ci_item); + snap_follows = capsnap->follows; + } spin_unlock(&ci->i_ceph_lock); if (recon_state->msg_version >= 2) { @@ -2872,7 +2882,7 @@ encode_again: if (recon_state->msg_version >= 3) { /* version, compat_version and struct_len */ total_len = 2 * sizeof(u8) + sizeof(u32); - struct_v = 1; + struct_v = 2; } /* * number of encoded locks is stable, so copy to pagelist @@ -2885,6 +2895,9 @@ encode_again: struct_len += sizeof(rec.v2); struct_len += sizeof(u32) + pathlen; + if (struct_v >= 2) + struct_len += sizeof(u64); /* snap_follows */ + total_len += struct_len; err = ceph_pagelist_reserve(pagelist, total_len); @@ -2899,6 +2912,8 @@ encode_again: ceph_locks_to_pagelist(flocks, pagelist, num_fcntl_locks, num_flock_locks); + if (struct_v >= 2) + ceph_pagelist_encode_64(pagelist, snap_follows); } kfree(flocks); } else { -- cgit v1.2.3 From 3609404f8c782c01fe626a03949641dddbc65c34 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Wed, 6 Jul 2016 15:37:42 +0800 Subject: ceph: update types of some local varibles Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 1e48377f18a7..698cf002d3f1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -993,7 +993,7 @@ static int send_cap_msg(struct ceph_mds_session *session, u32 seq, u64 flush_tid, u64 oldest_flush_tid, u32 issue_seq, u32 mseq, u64 size, u64 max_size, struct timespec *mtime, struct timespec *atime, - struct timespec *ctime, u64 time_warp_seq, + struct timespec *ctime, u32 time_warp_seq, kuid_t uid, kgid_t gid, umode_t mode, u64 xattr_version, struct ceph_buffer *xattrs_buf, @@ -1118,8 +1118,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, struct inode *inode = &ci->vfs_inode; u64 cap_id = cap->cap_id; int held, revoking, dropping, keep; - u64 seq, issue_seq, mseq, time_warp_seq, follows; - u64 size, max_size; + u64 follows, size, max_size; + u32 seq, issue_seq, mseq, time_warp_seq; struct timespec mtime, atime, ctime; int wake = 0; umode_t mode; @@ -1585,10 +1585,11 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ struct rb_node *p; - int tried_invalidate = 0; - int delayed = 0, sent = 0, force_requeue = 0, num; - int queue_invalidate = 0; - int is_delayed = flags & CHECK_CAPS_NODELAY; + int delayed = 0, sent = 0, num; + bool is_delayed = flags & CHECK_CAPS_NODELAY; + bool queue_invalidate = false; + bool force_requeue = false; + bool tried_invalidate = false; /* if we are unmounting, flush any unused caps immediately. */ if (mdsc->stopping) @@ -1668,17 +1669,17 @@ retry_locked: if (revoking & (CEPH_CAP_FILE_CACHE| CEPH_CAP_FILE_LAZYIO)) { dout("check_caps queuing invalidate\n"); - queue_invalidate = 1; + queue_invalidate = true; ci->i_rdcache_revoking = ci->i_rdcache_gen; } else { dout("check_caps failed to invalidate pages\n"); /* we failed to invalidate pages. check these caps again later. */ - force_requeue = 1; + force_requeue = true; __cap_set_timeouts(mdsc, ci); } } - tried_invalidate = 1; + tried_invalidate = true; goto retry_locked; } @@ -1824,7 +1825,7 @@ ack: * otherwise cancel. */ if (delayed && is_delayed) - force_requeue = 1; /* __send_cap delayed release; requeue */ + force_requeue = true; /* __send_cap delayed release; requeue */ if (!delayed && !is_delayed) __cap_delay_cancel(mdsc, ci); else if (!is_delayed || force_requeue) -- cgit v1.2.3 From e4500b5e35c213e0f97be7cb69328c0877203a79 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Wed, 6 Jul 2016 11:12:56 +0800 Subject: ceph: use list instead of rbtree to track cap flushes We don't have requirement of searching cap flush by TID. In most cases, we just need to know TID of the oldest cap flush. List is ideal for this usage. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 120 +++++++++++++-------------------------------------- fs/ceph/inode.c | 2 +- fs/ceph/mds_client.c | 41 +++++++++--------- fs/ceph/mds_client.h | 2 +- fs/ceph/super.h | 9 ++-- 5 files changed, 56 insertions(+), 118 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 698cf002d3f1..e0efa75a1b98 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1413,52 +1413,6 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, return dirty; } -static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, - struct ceph_cap_flush *cf) -{ - struct rb_node **p = &ci->i_cap_flush_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap_flush *other = NULL; - - while (*p) { - parent = *p; - other = rb_entry(parent, struct ceph_cap_flush, i_node); - - if (cf->tid < other->tid) - p = &(*p)->rb_left; - else if (cf->tid > other->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&cf->i_node, parent, p); - rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); -} - -static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, - struct ceph_cap_flush *cf) -{ - struct rb_node **p = &mdsc->cap_flush_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap_flush *other = NULL; - - while (*p) { - parent = *p; - other = rb_entry(parent, struct ceph_cap_flush, g_node); - - if (cf->tid < other->tid) - p = &(*p)->rb_left; - else if (cf->tid > other->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&cf->g_node, parent, p); - rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); -} - struct ceph_cap_flush *ceph_alloc_cap_flush(void) { return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); @@ -1472,10 +1426,10 @@ void ceph_free_cap_flush(struct ceph_cap_flush *cf) static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) { - struct rb_node *n = rb_first(&mdsc->cap_flush_tree); - if (n) { + if (!list_empty(&mdsc->cap_flush_list)) { struct ceph_cap_flush *cf = - rb_entry(n, struct ceph_cap_flush, g_node); + list_first_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); return cf->tid; } return 0; @@ -1516,7 +1470,7 @@ static int __mark_caps_flushing(struct inode *inode, list_del_init(&ci->i_dirty_item); cf->tid = ++mdsc->last_cap_flush_tid; - __add_cap_flushing_to_mdsc(mdsc, cf); + list_add_tail(&cf->g_list, &mdsc->cap_flush_list); *oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) { @@ -1530,7 +1484,7 @@ static int __mark_caps_flushing(struct inode *inode, } spin_unlock(&mdsc->cap_dirty_lock); - __add_cap_flushing_to_inode(ci, cf); + list_add_tail(&cf->i_list, &ci->i_cap_flush_list); *flush_tid = cf->tid; return flushing; @@ -1890,10 +1844,10 @@ retry: spin_unlock(&ci->i_ceph_lock); } } else { - struct rb_node *n = rb_last(&ci->i_cap_flush_tree); - if (n) { + if (!list_empty(&ci->i_cap_flush_list)) { struct ceph_cap_flush *cf = - rb_entry(n, struct ceph_cap_flush, i_node); + list_last_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); flush_tid = cf->tid; } flushing = ci->i_flushing_caps; @@ -1913,14 +1867,13 @@ out: static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap_flush *cf; - struct rb_node *n; int ret = 1; spin_lock(&ci->i_ceph_lock); - n = rb_first(&ci->i_cap_flush_tree); - if (n) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (!list_empty(&ci->i_cap_flush_list)) { + struct ceph_cap_flush * cf = + list_first_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); if (cf->tid <= flush_tid) ret = 0; } @@ -2083,7 +2036,6 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; struct ceph_cap_flush *cf; - struct rb_node *n; int delayed = 0; u64 first_tid = 0; u64 oldest_flush_tid; @@ -2092,8 +2044,11 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); - while (true) { - spin_lock(&ci->i_ceph_lock); + spin_lock(&ci->i_ceph_lock); + list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { + if (cf->tid < first_tid) + continue; + cap = ci->i_auth_cap; if (!(cap && cap->session == session)) { pr_err("%p auth cap %p not mds%d ???\n", inode, @@ -2102,18 +2057,6 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, break; } - for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); - if (cf->tid >= first_tid) - break; - } - if (!n) { - spin_unlock(&ci->i_ceph_lock); - break; - } - - cf = rb_entry(n, struct ceph_cap_flush, i_node); - first_tid = cf->tid + 1; dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, @@ -2123,7 +2066,10 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, __ceph_caps_wanted(ci), cap->issued | cap->implemented, cf->caps, cf->tid, oldest_flush_tid); + + spin_lock(&ci->i_ceph_lock); } + spin_unlock(&ci->i_ceph_lock); return delayed; } @@ -2995,23 +2941,19 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; - struct ceph_cap_flush *cf; - struct rb_node *n; + struct ceph_cap_flush *cf, *tmp_cf; LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; int drop = 0; - n = rb_first(&ci->i_cap_flush_tree); - while (n) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); - n = rb_next(&cf->i_node); + list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { if (cf->tid == flush_tid) cleaned = cf->caps; if (cf->tid <= flush_tid) { - rb_erase(&cf->i_node, &ci->i_cap_flush_tree); - list_add_tail(&cf->list, &to_remove); + list_del(&cf->i_list); + list_add_tail(&cf->i_list, &to_remove); } else { cleaned &= ~cf->caps; if (!cleaned) @@ -3033,12 +2975,12 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, spin_lock(&mdsc->cap_dirty_lock); if (!list_empty(&to_remove)) { - list_for_each_entry(cf, &to_remove, list) - rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + u64 oldest_flush_tid; + list_for_each_entry(cf, &to_remove, i_list) + list_del(&cf->g_list); - n = rb_first(&mdsc->cap_flush_tree); - cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; - if (!cf || cf->tid > flush_tid) + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid) wake_up_all(&mdsc->cap_flushing_wq); } @@ -3075,8 +3017,8 @@ out: while (!list_empty(&to_remove)) { cf = list_first_entry(&to_remove, - struct ceph_cap_flush, list); - list_del(&cf->list); + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); ceph_free_cap_flush(cf); } if (drop) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index a38b768bc158..fd85b3c58960 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -468,7 +468,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); ci->i_prealloc_cap_flush = NULL; - ci->i_cap_flush_tree = RB_ROOT; + INIT_LIST_HEAD(&ci->i_cap_flush_list); init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index bcf20344d904..7cd6b861c2f3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1148,19 +1148,17 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) invalidate = true; - while (true) { - struct rb_node *n = rb_first(&ci->i_cap_flush_tree); - if (!n) - break; - cf = rb_entry(n, struct ceph_cap_flush, i_node); - rb_erase(&cf->i_node, &ci->i_cap_flush_tree); - list_add(&cf->list, &to_remove); + while (!list_empty(&ci->i_cap_flush_list)) { + cf = list_first_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); + list_add(&cf->i_list, &to_remove); } spin_lock(&mdsc->cap_dirty_lock); - list_for_each_entry(cf, &to_remove, list) - rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + list_for_each_entry(cf, &to_remove, i_list) + list_del(&cf->g_list); if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( @@ -1184,7 +1182,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_unlock(&mdsc->cap_dirty_lock); if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { - list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); ci->i_prealloc_cap_flush = NULL; } } @@ -1192,8 +1190,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, while (!list_empty(&to_remove)) { struct ceph_cap_flush *cf; cf = list_first_entry(&to_remove, - struct ceph_cap_flush, list); - list_del(&cf->list); + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); ceph_free_cap_flush(cf); } @@ -1499,17 +1497,18 @@ static int check_capsnap_flush(struct ceph_inode_info *ci, static int check_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_tid) { - struct rb_node *n; - struct ceph_cap_flush *cf; int ret = 1; spin_lock(&mdsc->cap_dirty_lock); - n = rb_first(&mdsc->cap_flush_tree); - cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; - if (cf && cf->tid <= want_flush_tid) { - dout("check_caps_flush still flushing tid %llu <= %llu\n", - cf->tid, want_flush_tid); - ret = 0; + if (!list_empty(&mdsc->cap_flush_list)) { + struct ceph_cap_flush *cf = + list_first_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); + if (cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid " + "%llu <= %llu\n", cf->tid, want_flush_tid); + ret = 0; + } } spin_unlock(&mdsc->cap_dirty_lock); return ret; @@ -3470,7 +3469,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); mdsc->last_cap_flush_tid = 1; - mdsc->cap_flush_tree = RB_ROOT; + INIT_LIST_HEAD(&mdsc->cap_flush_list); INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 3c154b8d49bf..93170b4b5d75 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -325,7 +325,7 @@ struct ceph_mds_client { spinlock_t snap_flush_lock; u64 last_cap_flush_tid; - struct rb_root cap_flush_tree; + struct list_head cap_flush_list; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 9e82e29f86a1..29e8b7bd9413 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -189,11 +189,8 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) struct ceph_cap_flush { u64 tid; int caps; - struct rb_node g_node; // global - union { - struct rb_node i_node; // inode - struct list_head list; - }; + struct list_head g_list; // global + struct list_head i_list; // per inode }; /* @@ -310,7 +307,7 @@ struct ceph_inode_info { * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ struct ceph_cap_flush *i_prealloc_cap_flush; - struct rb_root i_cap_flush_tree; + struct list_head i_cap_flush_list; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ -- cgit v1.2.3 From 0e2943878942aee7100c94d0d40c49087dac12cb Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Mon, 4 Jul 2016 18:06:41 +0800 Subject: ceph: unify cap flush and snapcap flush This patch includes following changes - Assign flush tid to snapcap flush - Remove session's s_cap_snaps_flushing list. Add inode to session's s_cap_flushing list instead. Inode is removed from the list when there is no pending snapcap flush or cap flush. - make __kick_flushing_caps() re-send both snapcap flushes and cap flushes. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 291 +++++++++++++++++++++++++++------------------------ fs/ceph/mds_client.c | 77 ++------------ fs/ceph/mds_client.h | 1 - fs/ceph/snap.c | 4 +- fs/ceph/super.h | 24 ++--- 5 files changed, 175 insertions(+), 222 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index e0efa75a1b98..0ac604719663 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -40,6 +40,7 @@ * cluster to release server state. */ +static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); /* * Generate readable cap strings for debugging output. @@ -1217,6 +1218,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, return delayed; } +static inline int __send_flush_snap(struct inode *inode, + struct ceph_mds_session *session, + struct ceph_cap_snap *capsnap, + u32 mseq, u64 oldest_flush_tid) +{ + return send_cap_msg(session, ceph_vino(inode).ino, 0, + CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, + capsnap->dirty, 0, capsnap->cap_flush.tid, + oldest_flush_tid, 0, mseq, capsnap->size, 0, + &capsnap->mtime, &capsnap->atime, + &capsnap->ctime, capsnap->time_warp_seq, + capsnap->uid, capsnap->gid, capsnap->mode, + capsnap->xattr_version, capsnap->xattr_blob, + capsnap->follows, capsnap->inline_data); +} + /* * When a snapshot is taken, clients accumulate dirty metadata on * inodes with capabilities in ceph_cap_snaps to describe the file @@ -1224,14 +1241,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * - * Unless @kick is true, skip cap_snaps that were already sent to - * the MDS (i.e., during this session). - * * Called under i_ceph_lock. Takes s_mutex as needed. */ void __ceph_flush_snaps(struct ceph_inode_info *ci, - struct ceph_mds_session **psession, - int kick) + struct ceph_mds_session **psession) __releases(ci->i_ceph_lock) __acquires(ci->i_ceph_lock) { @@ -1242,6 +1255,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_session *session = NULL; /* if session != NULL, we hold session->s_mutex */ + u64 oldest_flush_tid; u64 next_follows = 0; /* keep track of how far we've gotten through the i_cap_snaps list, and skip these entries next time around to avoid an infinite loop */ @@ -1272,7 +1286,7 @@ retry: } /* only flush each capsnap once */ - if (!kick && !list_empty(&capsnap->flushing_item)) { + if (capsnap->cap_flush.tid > 0) { dout("already flushed %p, skipping\n", capsnap); continue; } @@ -1282,8 +1296,6 @@ retry: if (session && session->s_mds != mds) { dout("oops, wrong session %p mutex\n", session); - if (kick) - goto out; mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); @@ -1309,26 +1321,27 @@ retry: } spin_lock(&mdsc->cap_dirty_lock); - capsnap->flush_tid = ++mdsc->last_cap_flush_tid; + capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; + list_add_tail(&capsnap->cap_flush.g_list, + &mdsc->cap_flush_list); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + + if (list_empty(&ci->i_flushing_item)) { + list_add_tail(&ci->i_flushing_item, + &session->s_cap_flushing); + } spin_unlock(&mdsc->cap_dirty_lock); + list_add_tail(&capsnap->cap_flush.i_list, + &ci->i_cap_flush_list); + atomic_inc(&capsnap->nref); - if (list_empty(&capsnap->flushing_item)) - list_add_tail(&capsnap->flushing_item, - &session->s_cap_snaps_flushing); spin_unlock(&ci->i_ceph_lock); dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", - inode, capsnap, capsnap->follows, capsnap->flush_tid); - send_cap_msg(session, ceph_vino(inode).ino, 0, - CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->flush_tid, 0, - 0, mseq, capsnap->size, 0, - &capsnap->mtime, &capsnap->atime, - &capsnap->ctime, capsnap->time_warp_seq, - capsnap->uid, capsnap->gid, capsnap->mode, - capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows, capsnap->inline_data); + inode, capsnap, capsnap->follows, capsnap->cap_flush.tid); + __send_flush_snap(inode, session, capsnap, mseq, + oldest_flush_tid); next_follows = capsnap->follows + 1; ceph_put_cap_snap(capsnap); @@ -1354,7 +1367,7 @@ out: static void ceph_flush_snaps(struct ceph_inode_info *ci) { spin_lock(&ci->i_ceph_lock); - __ceph_flush_snaps(ci, NULL, 0); + __ceph_flush_snaps(ci, NULL); spin_unlock(&ci->i_ceph_lock); } @@ -1476,11 +1489,6 @@ static int __mark_caps_flushing(struct inode *inode, if (list_empty(&ci->i_flushing_item)) { list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; - dout(" inode %p now flushing tid %llu\n", inode, cf->tid); - } else { - list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); - dout(" inode %p now flushing (more) tid %llu\n", - inode, cf->tid); } spin_unlock(&mdsc->cap_dirty_lock); @@ -1556,7 +1564,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, /* flush snaps first time around only */ if (!list_empty(&ci->i_cap_snaps)) - __ceph_flush_snaps(ci, &session, 0); + __ceph_flush_snaps(ci, &session); goto retry_locked; retry: spin_lock(&ci->i_ceph_lock); @@ -1997,80 +2005,74 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) return err; } -/* - * After a recovering MDS goes active, we need to resend any caps - * we were flushing. - * - * Caller holds session->s_mutex. - */ -static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_cap_snap *capsnap; - - dout("kick_flushing_capsnaps mds%d\n", session->s_mds); - list_for_each_entry(capsnap, &session->s_cap_snaps_flushing, - flushing_item) { - struct ceph_inode_info *ci = capsnap->ci; - struct inode *inode = &ci->vfs_inode; - struct ceph_cap *cap; - - spin_lock(&ci->i_ceph_lock); - cap = ci->i_auth_cap; - if (cap && cap->session == session) { - dout("kick_flushing_caps %p cap %p capsnap %p\n", inode, - cap, capsnap); - __ceph_flush_snaps(ci, &session, 1); - } else { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); - } - spin_unlock(&ci->i_ceph_lock); - } -} - -static int __kick_flushing_caps(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - struct ceph_inode_info *ci) +static void __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci, + u64 oldest_flush_tid) + __releases(ci->i_ceph_lock) + __acquires(ci->i_ceph_lock) { struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; struct ceph_cap_flush *cf; - int delayed = 0; + int ret; u64 first_tid = 0; - u64 oldest_flush_tid; - - spin_lock(&mdsc->cap_dirty_lock); - oldest_flush_tid = __get_oldest_flush_tid(mdsc); - spin_unlock(&mdsc->cap_dirty_lock); - spin_lock(&ci->i_ceph_lock); list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { if (cf->tid < first_tid) continue; cap = ci->i_auth_cap; if (!(cap && cap->session == session)) { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); - spin_unlock(&ci->i_ceph_lock); + pr_err("%p auth cap %p not mds%d ???\n", + inode, cap, session->s_mds); break; } first_tid = cf->tid + 1; - dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, - cap, cf->tid, ceph_cap_string(cf->caps)); - delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - cf->caps, cf->tid, oldest_flush_tid); + if (cf->caps) { + dout("kick_flushing_caps %p cap %p tid %llu %s\n", + inode, cap, cf->tid, ceph_cap_string(cf->caps)); + ci->i_ceph_flags |= CEPH_I_NODELAY; + ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + cap->issued | cap->implemented, + cf->caps, cf->tid, oldest_flush_tid); + if (ret) { + pr_err("kick_flushing_caps: error sending " + "cap flush, ino (%llx.%llx) " + "tid %llu flushing %s\n", + ceph_vinop(inode), cf->tid, + ceph_cap_string(cf->caps)); + } + } else { + struct ceph_cap_snap *capsnap = + container_of(cf, struct ceph_cap_snap, + cap_flush); + dout("kick_flushing_caps %p capsnap %p tid %llu %s\n", + inode, capsnap, cf->tid, + ceph_cap_string(capsnap->dirty)); + + atomic_inc(&capsnap->nref); + spin_unlock(&ci->i_ceph_lock); + + ret = __send_flush_snap(inode, session, capsnap, cap->mseq, + oldest_flush_tid); + if (ret < 0) { + pr_err("kick_flushing_caps: error sending " + "cap flushsnap, ino (%llx.%llx) " + "tid %llu follows %llu\n", + ceph_vinop(inode), cf->tid, + capsnap->follows); + } + + ceph_put_cap_snap(capsnap); + } spin_lock(&ci->i_ceph_lock); } - spin_unlock(&ci->i_ceph_lock); - return delayed; } void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, @@ -2078,8 +2080,14 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, { struct ceph_inode_info *ci; struct ceph_cap *cap; + u64 oldest_flush_tid; dout("early_kick_flushing_caps mds%d\n", session->s_mds); + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; @@ -2099,10 +2107,8 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, */ if ((cap->issued & ci->i_flushing_caps) != ci->i_flushing_caps) { - spin_unlock(&ci->i_ceph_lock); - if (!__kick_flushing_caps(mdsc, session, ci)) - continue; - spin_lock(&ci->i_ceph_lock); + __kick_flushing_caps(mdsc, session, ci, + oldest_flush_tid); } spin_unlock(&ci->i_ceph_lock); @@ -2113,50 +2119,43 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_inode_info *ci; - - kick_flushing_capsnaps(mdsc, session); + u64 oldest_flush_tid; dout("kick_flushing_caps mds%d\n", session->s_mds); + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - int delayed = __kick_flushing_caps(mdsc, session, ci); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } + spin_lock(&ci->i_ceph_lock); + __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); + spin_unlock(&ci->i_ceph_lock); } } static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, struct inode *inode) + __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; - spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; dout("kick_flushing_inode_caps %p flushing %s\n", inode, ceph_cap_string(ci->i_flushing_caps)); - __ceph_flush_snaps(ci, &session, 1); - - if (ci->i_flushing_caps) { - int delayed; - + if (!list_empty(&ci->i_cap_flush_list)) { + u64 oldest_flush_tid; spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &cap->session->s_cap_flushing); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); + __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); spin_unlock(&ci->i_ceph_lock); - - delayed = __kick_flushing_caps(mdsc, session, ci); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } } else { spin_unlock(&ci->i_ceph_lock); } @@ -2487,12 +2486,11 @@ static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) { if (!capsnap->need_flush && !capsnap->writing && !capsnap->dirty_pages) { - dout("dropping cap_snap %p follows %llu\n", capsnap, capsnap->follows); + BUG_ON(capsnap->cap_flush.tid > 0); ceph_put_snap_context(capsnap->context); list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); ceph_put_cap_snap(capsnap); return 1; } @@ -2891,13 +2889,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, fill_inline = true; } - spin_unlock(&ci->i_ceph_lock); - if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { - kick_flushing_inode_caps(mdsc, session, inode); - up_read(&mdsc->snap_rwsem); if (newcaps & ~issued) wake = true; + kick_flushing_inode_caps(mdsc, session, inode); + up_read(&mdsc->snap_rwsem); + } else { + spin_unlock(&ci->i_ceph_lock); } if (fill_inline) @@ -2951,6 +2949,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { if (cf->tid == flush_tid) cleaned = cf->caps; + if (cf->caps == 0) /* capsnap */ + continue; if (cf->tid <= flush_tid) { list_del(&cf->i_list); list_add_tail(&cf->i_list, &to_remove); @@ -2985,13 +2985,16 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, } if (ci->i_flushing_caps == 0) { - list_del_init(&ci->i_flushing_item); - if (!list_empty(&session->s_cap_flushing)) - dout(" mds%d still flushing cap on %p\n", - session->s_mds, - &list_entry(session->s_cap_flushing.next, - struct ceph_inode_info, - i_flushing_item)->vfs_inode); + if (list_empty(&ci->i_cap_flush_list)) { + list_del_init(&ci->i_flushing_item); + if (!list_empty(&session->s_cap_flushing)) { + dout(" mds%d still flushing cap on %p\n", + session->s_mds, + &list_first_entry(&session->s_cap_flushing, + struct ceph_inode_info, + i_flushing_item)->vfs_inode); + } + } mdsc->num_cap_flushing--; dout(" inode %p now !flushing\n", inode); @@ -3039,7 +3042,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; u64 follows = le64_to_cpu(m->snap_follows); struct ceph_cap_snap *capsnap; - int drop = 0; + int flushed = 0; dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", inode, ci, session->s_mds, follows); @@ -3047,30 +3050,47 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, spin_lock(&ci->i_ceph_lock); list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->follows == follows) { - if (capsnap->flush_tid != flush_tid) { + if (capsnap->cap_flush.tid != flush_tid) { dout(" cap_snap %p follows %lld tid %lld !=" " %lld\n", capsnap, follows, - flush_tid, capsnap->flush_tid); + flush_tid, capsnap->cap_flush.tid); break; } - WARN_ON(capsnap->dirty_pages || capsnap->writing); - dout(" removing %p cap_snap %p follows %lld\n", - inode, capsnap, follows); - ceph_put_snap_context(capsnap->context); - list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); - ceph_put_cap_snap(capsnap); - wake_up_all(&mdsc->cap_flushing_wq); - drop = 1; + flushed = 1; break; } else { dout(" skipping cap_snap %p follows %lld\n", capsnap, capsnap->follows); } } + if (flushed) { + u64 oldest_flush_tid; + WARN_ON(capsnap->dirty_pages || capsnap->writing); + dout(" removing %p cap_snap %p follows %lld\n", + inode, capsnap, follows); + list_del(&capsnap->ci_item); + list_del(&capsnap->cap_flush.i_list); + + spin_lock(&mdsc->cap_dirty_lock); + + if (list_empty(&ci->i_cap_flush_list)) + list_del_init(&ci->i_flushing_item); + + list_del(&capsnap->cap_flush.g_list); + + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid) + wake_up_all(&mdsc->cap_flushing_wq); + + spin_unlock(&mdsc->cap_dirty_lock); + wake_up_all(&ci->i_cap_wq); + } spin_unlock(&ci->i_ceph_lock); - if (drop) + if (flushed) { + ceph_put_snap_context(capsnap->context); + ceph_put_cap_snap(capsnap); iput(inode); + } } /* @@ -3175,7 +3195,8 @@ retry: tcap->implemented |= issued; if (cap == ci->i_auth_cap) ci->i_auth_cap = tcap; - if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { + if (!list_empty(&ci->i_cap_flush_list) && + ci->i_auth_cap == tcap) { spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &tcap->session->s_cap_flushing); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 7cd6b861c2f3..fa9036af5445 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -472,7 +472,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); INIT_LIST_HEAD(&s->s_cap_flushing); - INIT_LIST_HEAD(&s->s_cap_snaps_flushing); dout("register_session mds%d\n", mds); if (mds >= mdsc->max_sessions) { @@ -1479,21 +1478,6 @@ static int trim_caps(struct ceph_mds_client *mdsc, return 0; } -static int check_capsnap_flush(struct ceph_inode_info *ci, - u64 want_snap_seq) -{ - int ret = 1; - spin_lock(&ci->i_ceph_lock); - if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { - struct ceph_cap_snap *capsnap = - list_first_entry(&ci->i_cap_snaps, - struct ceph_cap_snap, ci_item); - ret = capsnap->follows >= want_snap_seq; - } - spin_unlock(&ci->i_ceph_lock); - return ret; -} - static int check_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_tid) { @@ -1520,54 +1504,9 @@ static int check_caps_flush(struct ceph_mds_client *mdsc, * returns true if we've flushed through want_flush_tid */ static void wait_caps_flush(struct ceph_mds_client *mdsc, - u64 want_flush_tid, u64 want_snap_seq) + u64 want_flush_tid) { - int mds; - - dout("check_caps_flush want %llu snap want %llu\n", - want_flush_tid, want_snap_seq); - mutex_lock(&mdsc->mutex); - for (mds = 0; mds < mdsc->max_sessions; ) { - struct ceph_mds_session *session = mdsc->sessions[mds]; - struct inode *inode = NULL; - - if (!session) { - mds++; - continue; - } - get_session(session); - mutex_unlock(&mdsc->mutex); - - mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_snaps_flushing)) { - struct ceph_cap_snap *capsnap = - list_first_entry(&session->s_cap_snaps_flushing, - struct ceph_cap_snap, - flushing_item); - struct ceph_inode_info *ci = capsnap->ci; - if (!check_capsnap_flush(ci, want_snap_seq)) { - dout("check_cap_flush still flushing snap %p " - "follows %lld <= %lld to mds%d\n", - &ci->vfs_inode, capsnap->follows, - want_snap_seq, mds); - inode = igrab(&ci->vfs_inode); - } - } - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - - if (inode) { - wait_event(mdsc->cap_flushing_wq, - check_capsnap_flush(ceph_inode(inode), - want_snap_seq)); - iput(inode); - } else { - mds++; - } - - mutex_lock(&mdsc->mutex); - } - mutex_unlock(&mdsc->mutex); + dout("check_caps_flush want %llu\n", want_flush_tid); wait_event(mdsc->cap_flushing_wq, check_caps_flush(mdsc, want_flush_tid)); @@ -3584,7 +3523,7 @@ restart: void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { - u64 want_tid, want_flush, want_snap; + u64 want_tid, want_flush; if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return; @@ -3599,15 +3538,11 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) want_flush = mdsc->last_cap_flush_tid; spin_unlock(&mdsc->cap_dirty_lock); - down_read(&mdsc->snap_rwsem); - want_snap = mdsc->last_snap_seq; - up_read(&mdsc->snap_rwsem); - - dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", - want_tid, want_flush, want_snap); + dout("sync want tid %lld flush_seq %lld\n", + want_tid, want_flush); wait_unsafe_requests(mdsc, want_tid); - wait_caps_flush(mdsc, want_flush, want_snap); + wait_caps_flush(mdsc, want_flush); } /* diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 93170b4b5d75..6b3679737d4a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -152,7 +152,6 @@ struct ceph_mds_session { /* protected by mutex */ struct list_head s_cap_flushing; /* inodes w/ flushing caps */ - struct list_head s_cap_snaps_flushing; unsigned long s_renew_requested; /* last time we sent a renew req */ u64 s_renew_seq; diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index eadf2c33edc6..20d5b0cdf655 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -520,9 +520,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ihold(inode); atomic_set(&capsnap->nref, 1); - capsnap->ci = ci; INIT_LIST_HEAD(&capsnap->ci_item); - INIT_LIST_HEAD(&capsnap->flushing_item); capsnap->follows = old_snapc->seq; capsnap->issued = __ceph_caps_issued(ci, NULL); @@ -800,7 +798,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ihold(inode); spin_unlock(&mdsc->snap_flush_lock); spin_lock(&ci->i_ceph_lock); - __ceph_flush_snaps(ci, &session, 0); + __ceph_flush_snaps(ci, &session); spin_unlock(&ci->i_ceph_lock); iput(inode); spin_lock(&mdsc->snap_flush_lock); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 29e8b7bd9413..08ed51299f9f 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -147,6 +147,13 @@ struct ceph_cap { #define CHECK_CAPS_AUTHONLY 2 /* only check auth cap */ #define CHECK_CAPS_FLUSH 4 /* flush any dirty caps */ +struct ceph_cap_flush { + u64 tid; + int caps; /* 0 means capsnap */ + struct list_head g_list; // global + struct list_head i_list; // per inode +}; + /* * Snapped cap state that is pending flush to mds. When a snapshot occurs, * we first complete any in-process sync writes and writeback any dirty @@ -154,10 +161,11 @@ struct ceph_cap { */ struct ceph_cap_snap { atomic_t nref; - struct ceph_inode_info *ci; - struct list_head ci_item, flushing_item; + struct list_head ci_item; - u64 follows, flush_tid; + struct ceph_cap_flush cap_flush; + + u64 follows; int issued, dirty; struct ceph_snap_context *context; @@ -186,13 +194,6 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) } } -struct ceph_cap_flush { - u64 tid; - int caps; - struct list_head g_list; // global - struct list_head i_list; // per inode -}; - /* * The frag tree describes how a directory is fragmented, potentially across * multiple metadata servers. It is also used to indicate points where @@ -888,8 +889,7 @@ extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc); extern void __ceph_flush_snaps(struct ceph_inode_info *ci, - struct ceph_mds_session **psession, - int again); + struct ceph_mds_session **psession); extern void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session); extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); -- cgit v1.2.3 From 13c2b57d81ec27716b9c943fd4077264b9804e55 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Tue, 5 Jul 2016 16:45:21 +0800 Subject: ceph: avoid sending duplicated cap flush message make ceph_kick_flushing_caps() ignore inodes whose cap flushes have already been re-sent by ceph_early_kick_flushing_caps() Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 18 +++++++++++++++++- fs/ceph/super.h | 1 + 2 files changed, 18 insertions(+), 1 deletion(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0ac604719663..f12d59d26a04 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2107,8 +2107,11 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, */ if ((cap->issued & ci->i_flushing_caps) != ci->i_flushing_caps) { + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); + } else { + ci->i_ceph_flags |= CEPH_I_KICK_FLUSH; } spin_unlock(&ci->i_ceph_lock); @@ -2119,6 +2122,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_inode_info *ci; + struct ceph_cap *cap; u64 oldest_flush_tid; dout("kick_flushing_caps mds%d\n", session->s_mds); @@ -2129,7 +2133,18 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { spin_lock(&ci->i_ceph_lock); - __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", + &ci->vfs_inode, cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + continue; + } + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + __kick_flushing_caps(mdsc, session, ci, + oldest_flush_tid); + } spin_unlock(&ci->i_ceph_lock); } } @@ -2154,6 +2169,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); spin_unlock(&ci->i_ceph_lock); } else { diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 08ed51299f9f..16068787afb4 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -468,6 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, #define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ #define CEPH_I_SEC_INITED (1 << 7) /* security initialized */ #define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */ +#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, long long release_count, -- cgit v1.2.3 From 70220ac8c220495b2a335868293be80a31dfdd4a Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Wed, 6 Jul 2016 16:21:30 +0800 Subject: ceph: introduce an inode flag to indicates if snapflush is needed Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 48 +++++++++++++++++++++++++++++++++--------------- fs/ceph/snap.c | 2 ++ fs/ceph/super.h | 1 + 3 files changed, 36 insertions(+), 15 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f12d59d26a04..45fe7a3658dc 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1368,6 +1368,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) { spin_lock(&ci->i_ceph_lock); __ceph_flush_snaps(ci, NULL); + ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; spin_unlock(&ci->i_ceph_lock); } @@ -1563,8 +1564,10 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, flags |= CHECK_CAPS_FLUSH; /* flush snaps first time around only */ - if (!list_empty(&ci->i_cap_snaps)) + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { __ceph_flush_snaps(ci, &session); + ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; + } goto retry_locked; retry: spin_lock(&ci->i_ceph_lock); @@ -2498,7 +2501,8 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) * drop cap_snap that is not associated with any snapshot. * we don't need to send FLUSHSNAP message for it. */ -static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) +static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci, + struct ceph_cap_snap *capsnap) { if (!capsnap->need_flush && !capsnap->writing && !capsnap->dirty_pages) { @@ -2506,6 +2510,9 @@ static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) capsnap, capsnap->follows); BUG_ON(capsnap->cap_flush.tid > 0); ceph_put_snap_context(capsnap->context); + if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps)) + ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; + list_del(&capsnap->ci_item); ceph_put_cap_snap(capsnap); return 1; @@ -2553,7 +2560,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) struct ceph_cap_snap, ci_item); capsnap->writing = 0; - if (ceph_try_drop_cap_snap(capsnap)) + if (ceph_try_drop_cap_snap(ci, capsnap)) put++; else if (__ceph_finish_cap_snap(ci, capsnap)) flushsnaps = 1; @@ -2596,15 +2603,19 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc) { struct inode *inode = &ci->vfs_inode; - int last = 0; - int complete_capsnap = 0; - int drop_capsnap = 0; - int found = 0; struct ceph_cap_snap *capsnap = NULL; + int put = 0; + bool last = false; + bool found = false; + bool flush_snaps = false; + bool complete_capsnap = false; spin_lock(&ci->i_ceph_lock); ci->i_wrbuffer_ref -= nr; - last = !ci->i_wrbuffer_ref; + if (ci->i_wrbuffer_ref == 0) { + last = true; + put++; + } if (ci->i_head_snapc == snapc) { ci->i_wrbuffer_ref_head -= nr; @@ -2624,15 +2635,22 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, } else { list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->context == snapc) { - found = 1; + found = true; break; } } BUG_ON(!found); capsnap->dirty_pages -= nr; if (capsnap->dirty_pages == 0) { - complete_capsnap = 1; - drop_capsnap = ceph_try_drop_cap_snap(capsnap); + complete_capsnap = true; + if (!capsnap->writing) { + if (ceph_try_drop_cap_snap(ci, capsnap)) { + put++; + } else { + ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; + flush_snaps = true; + } + } } dout("put_wrbuffer_cap_refs on %p cap_snap %p " " snap %lld %d/%d -> %d/%d %s%s\n", @@ -2647,12 +2665,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (last) { ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); - iput(inode); - } else if (complete_capsnap) { + } else if (flush_snaps) { ceph_flush_snaps(ci); - wake_up_all(&ci->i_cap_wq); } - if (drop_capsnap) + if (complete_capsnap) + wake_up_all(&ci->i_cap_wq); + while (put-- > 0) iput(inode); } diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 20d5b0cdf655..c3b03ae1976c 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -601,6 +601,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, capsnap->dirty_pages); return 0; } + + ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n", inode, capsnap, capsnap->context, capsnap->context->seq, ceph_cap_string(capsnap->dirty), diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 16068787afb4..63fdb57606fe 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -469,6 +469,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, #define CEPH_I_SEC_INITED (1 << 7) /* security initialized */ #define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */ #define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */ +#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, long long release_count, -- cgit v1.2.3 From 7bc00fddb9de7f78f742bc24d95e15abde15c078 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Thu, 7 Jul 2016 18:34:45 +0800 Subject: ceph: kick cap flushes before sending other cap message If ceph_check_caps() wants to send cap message to a recovering MDS, make sure it kicks cap flushes first. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 45fe7a3658dc..39e471d0aa50 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -41,6 +41,10 @@ */ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); +static void __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci, + u64 oldest_flush_tid); /* * Generate readable cap strings for debugging output. @@ -1563,11 +1567,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; - /* flush snaps first time around only */ - if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { - __ceph_flush_snaps(ci, &session); - ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; - } goto retry_locked; retry: spin_lock(&ci->i_ceph_lock); @@ -1688,10 +1687,15 @@ retry_locked: } } /* flush anything dirty? */ - if (cap == ci->i_auth_cap && (flags & CHECK_CAPS_FLUSH) && - ci->i_dirty_caps) { - dout("flushing dirty caps\n"); - goto ack; + if (cap == ci->i_auth_cap) { + if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) { + dout("flushing dirty caps\n"); + goto ack; + } + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { + dout("flushing snap caps\n"); + goto ack; + } } /* completed revocation? going down and there are no caps? */ @@ -1750,6 +1754,27 @@ ack: goto retry; } } + + /* kick flushing and flush snaps before sending normal + * cap message */ + if (cap == ci->i_auth_cap && + (ci->i_ceph_flags & + (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + __kick_flushing_caps(mdsc, session, ci, + oldest_flush_tid); + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + } + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { + __ceph_flush_snaps(ci, &session); + ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; + } + goto retry_locked; + } + /* take snap_rwsem after session mutex */ if (!took_snap_rwsem) { if (down_read_trylock(&mdsc->snap_rwsem) == 0) { -- cgit v1.2.3 From ed9b430c9ba99e70e8ddd7e08429c4c2a620ba74 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Tue, 5 Jul 2016 21:08:07 +0800 Subject: ceph: cleanup ceph_flush_snaps() This patch devide __ceph_flush_snaps() into two stags. In the first stage, __ceph_flush_snaps() assign snapcaps flush TIDs and add them to cap flush lists. __ceph_flush_snaps() keeps holding the i_ceph_lock in this stagge. So inode's auth cap can not change. In the second stage, __ceph_flush_snaps() send flushsnap cap messages. i_ceph_lock is unlocked before sending each cap message. If auth cap changes in the middle, __ceph_flush_snaps() just stops. This is OK because kick_flushing_inode_caps() will re-send flushsnap cap messages to inode's new auth MDS. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 185 +++++++++++++++++++++++++++++++------------------------- fs/ceph/snap.c | 4 +- fs/ceph/super.h | 4 +- 3 files changed, 105 insertions(+), 88 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 39e471d0aa50..736e1c86bcf3 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1247,32 +1247,20 @@ static inline int __send_flush_snap(struct inode *inode, * * Called under i_ceph_lock. Takes s_mutex as needed. */ -void __ceph_flush_snaps(struct ceph_inode_info *ci, - struct ceph_mds_session **psession) +static void __ceph_flush_snaps(struct ceph_inode_info *ci, + struct ceph_mds_session *session) __releases(ci->i_ceph_lock) __acquires(ci->i_ceph_lock) { struct inode *inode = &ci->vfs_inode; - int mds; + struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_cap_snap *capsnap; - u32 mseq; - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; - struct ceph_mds_session *session = NULL; /* if session != NULL, we hold - session->s_mutex */ - u64 oldest_flush_tid; - u64 next_follows = 0; /* keep track of how far we've gotten through the - i_cap_snaps list, and skip these entries next time - around to avoid an infinite loop */ + u64 oldest_flush_tid = 0; + u64 first_tid = 1, last_tid = 0; - if (psession) - session = *psession; + dout("__flush_snaps %p session %p\n", inode, session); - dout("__flush_snaps %p\n", inode); -retry: list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { - /* avoid an infiniute loop after retry */ - if (capsnap->follows < next_follows) - continue; /* * we need to wait for sync writes to complete and for dirty * pages to be written out. @@ -1283,53 +1271,18 @@ retry: /* should be removed by ceph_try_drop_cap_snap() */ BUG_ON(!capsnap->need_flush); - /* pick mds, take s_mutex */ - if (ci->i_auth_cap == NULL) { - dout("no auth cap (migrating?), doing nothing\n"); - goto out; - } - /* only flush each capsnap once */ if (capsnap->cap_flush.tid > 0) { - dout("already flushed %p, skipping\n", capsnap); + dout(" already flushed %p, skipping\n", capsnap); continue; } - mds = ci->i_auth_cap->session->s_mds; - mseq = ci->i_auth_cap->mseq; - - if (session && session->s_mds != mds) { - dout("oops, wrong session %p mutex\n", session); - - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - session = NULL; - } - if (!session) { - spin_unlock(&ci->i_ceph_lock); - mutex_lock(&mdsc->mutex); - session = __ceph_lookup_mds_session(mdsc, mds); - mutex_unlock(&mdsc->mutex); - if (session) { - dout("inverting session/ino locks on %p\n", - session); - mutex_lock(&session->s_mutex); - } - /* - * if session == NULL, we raced against a cap - * deletion or migration. retry, and we'll - * get a better @mds value next time. - */ - spin_lock(&ci->i_ceph_lock); - goto retry; - } - spin_lock(&mdsc->cap_dirty_lock); capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; list_add_tail(&capsnap->cap_flush.g_list, &mdsc->cap_flush_list); - oldest_flush_tid = __get_oldest_flush_tid(mdsc); - + if (oldest_flush_tid == 0) + oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) { list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); @@ -1339,41 +1292,108 @@ retry: list_add_tail(&capsnap->cap_flush.i_list, &ci->i_cap_flush_list); + if (first_tid == 1) + first_tid = capsnap->cap_flush.tid; + last_tid = capsnap->cap_flush.tid; + } + + ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; + + while (first_tid <= last_tid) { + struct ceph_cap *cap = ci->i_auth_cap; + struct ceph_cap_flush *cf; + int ret; + + if (!(cap && cap->session == session)) { + dout("__flush_snaps %p auth cap %p not mds%d, " + "stop\n", inode, cap, session->s_mds); + break; + } + + ret = -ENOENT; + list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { + if (cf->tid >= first_tid) { + ret = 0; + break; + } + } + if (ret < 0) + break; + + first_tid = cf->tid + 1; + + capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); atomic_inc(&capsnap->nref); spin_unlock(&ci->i_ceph_lock); - dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", - inode, capsnap, capsnap->follows, capsnap->cap_flush.tid); - __send_flush_snap(inode, session, capsnap, mseq, - oldest_flush_tid); + dout("__flush_snaps %p capsnap %p tid %llu %s\n", + inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); - next_follows = capsnap->follows + 1; - ceph_put_cap_snap(capsnap); + ret = __send_flush_snap(inode, session, capsnap, cap->mseq, + oldest_flush_tid); + if (ret < 0) { + pr_err("__flush_snaps: error sending cap flushsnap, " + "ino (%llx.%llx) tid %llu follows %llu\n", + ceph_vinop(inode), cf->tid, capsnap->follows); + } + ceph_put_cap_snap(capsnap); spin_lock(&ci->i_ceph_lock); - goto retry; } +} - /* we flushed them all; remove this inode from the queue */ - spin_lock(&mdsc->snap_flush_lock); - list_del_init(&ci->i_snap_flush_item); - spin_unlock(&mdsc->snap_flush_lock); +void ceph_flush_snaps(struct ceph_inode_info *ci, + struct ceph_mds_session **psession) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_session *session = *psession; + int mds; + dout("ceph_flush_snaps %p\n", inode); +retry: + spin_lock(&ci->i_ceph_lock); + if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) { + dout(" no capsnap needs flush, doing nothing\n"); + goto out; + } + if (!ci->i_auth_cap) { + dout(" no auth cap (migrating?), doing nothing\n"); + goto out; + } -out: - if (psession) - *psession = session; - else if (session) { + mds = ci->i_auth_cap->session->s_mds; + if (session && session->s_mds != mds) { + dout(" oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); + session = NULL; + } + if (!session) { + spin_unlock(&ci->i_ceph_lock); + mutex_lock(&mdsc->mutex); + session = __ceph_lookup_mds_session(mdsc, mds); + mutex_unlock(&mdsc->mutex); + if (session) { + dout(" inverting session/ino locks on %p\n", session); + mutex_lock(&session->s_mutex); + } + goto retry; } -} -static void ceph_flush_snaps(struct ceph_inode_info *ci) -{ - spin_lock(&ci->i_ceph_lock); - __ceph_flush_snaps(ci, NULL); - ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; + __ceph_flush_snaps(ci, session); +out: spin_unlock(&ci->i_ceph_lock); + + if (psession) { + *psession = session; + } else { + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); + } + /* we flushed them all; remove this inode from the queue */ + spin_lock(&mdsc->snap_flush_lock); + list_del_init(&ci->i_snap_flush_item); + spin_unlock(&mdsc->snap_flush_lock); } /* @@ -1768,10 +1788,9 @@ ack: oldest_flush_tid); ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; } - if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { - __ceph_flush_snaps(ci, &session); - ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; - } + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) + __ceph_flush_snaps(ci, session); + goto retry_locked; } @@ -2610,7 +2629,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) if (last && !flushsnaps) ceph_check_caps(ci, 0, NULL); else if (flushsnaps) - ceph_flush_snaps(ci); + ceph_flush_snaps(ci, NULL); if (wake) wake_up_all(&ci->i_cap_wq); while (put-- > 0) @@ -2691,7 +2710,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (last) { ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } else if (flush_snaps) { - ceph_flush_snaps(ci); + ceph_flush_snaps(ci, NULL); } if (complete_capsnap) wake_up_all(&ci->i_cap_wq); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index c3b03ae1976c..9ff5219d849e 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -799,9 +799,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) inode = &ci->vfs_inode; ihold(inode); spin_unlock(&mdsc->snap_flush_lock); - spin_lock(&ci->i_ceph_lock); - __ceph_flush_snaps(ci, &session); - spin_unlock(&ci->i_ceph_lock); + ceph_flush_snaps(ci, &session); iput(inode); spin_lock(&mdsc->snap_flush_lock); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 63fdb57606fe..b097d474f888 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -890,8 +890,8 @@ extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps); extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc); -extern void __ceph_flush_snaps(struct ceph_inode_info *ci, - struct ceph_mds_session **psession); +extern void ceph_flush_snaps(struct ceph_inode_info *ci, + struct ceph_mds_session **psession); extern void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session); extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); -- cgit v1.2.3 From c8799fc4674fe5bb9b9391f9eac202250b8370e1 Mon Sep 17 00:00:00 2001 From: Yan, Zheng Date: Thu, 7 Jul 2016 15:22:38 +0800 Subject: ceph: optimize cap flush waiting Add a 'wake' flag to ceph_cap_flush struct, which indicates if there is someone waiting for it to finish. When getting flush ack message, we check the 'wake' flag in corresponding ceph_cap_flush struct to decide if we should wake up waiters. One corner case is that the acked cap flush has 'wake' flags is set, but it is not the first one on the flushing list. We do not wake up waiters in this case, set 'wake' flags of preceding ceph_cap_flush struct instead Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 91 ++++++++++++++++++++++++++++++++++++---------------- fs/ceph/mds_client.c | 8 +++++ fs/ceph/super.h | 1 + 3 files changed, 73 insertions(+), 27 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 736e1c86bcf3..99115cae1652 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1473,6 +1473,37 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) return 0; } +/* + * Remove cap_flush from the mdsc's or inode's flushing cap list. + * Return true if caller needs to wake up flush waiters. + */ +static bool __finish_cap_flush(struct ceph_mds_client *mdsc, + struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct ceph_cap_flush *prev; + bool wake = cf->wake; + if (mdsc) { + /* are there older pending cap flushes? */ + if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { + prev = list_prev_entry(cf, g_list); + prev->wake = true; + wake = false; + } + list_del(&cf->g_list); + } else if (ci) { + if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { + prev = list_prev_entry(cf, i_list); + prev->wake = true; + wake = false; + } + list_del(&cf->i_list); + } else { + BUG_ON(1); + } + return wake; +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1480,7 +1511,7 @@ static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, - struct ceph_mds_session *session, + struct ceph_mds_session *session, bool wake, u64 *flush_tid, u64 *oldest_flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; @@ -1503,6 +1534,7 @@ static int __mark_caps_flushing(struct inode *inode, swap(cf, ci->i_prealloc_cap_flush); cf->caps = flushing; + cf->wake = wake; spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); @@ -1808,7 +1840,7 @@ ack: } if (cap == ci->i_auth_cap && ci->i_dirty_caps) { - flushing = __mark_caps_flushing(inode, session, + flushing = __mark_caps_flushing(inode, session, false, &flush_tid, &oldest_flush_tid); } else { @@ -1885,8 +1917,8 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session, &flush_tid, - &oldest_flush_tid); + flushing = __mark_caps_flushing(inode, session, true, + &flush_tid, &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, @@ -1902,7 +1934,8 @@ retry: if (!list_empty(&ci->i_cap_flush_list)) { struct ceph_cap_flush *cf = list_last_entry(&ci->i_cap_flush_list, - struct ceph_cap_flush, i_list); + struct ceph_cap_flush, i_list); + cf->wake = true; flush_tid = cf->tid; } flushing = ci->i_flushing_caps; @@ -3022,7 +3055,9 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; - int drop = 0; + bool drop = false; + bool wake_ci = 0; + bool wake_mdsc = 0; list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { if (cf->tid == flush_tid) @@ -3030,7 +3065,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, if (cf->caps == 0) /* capsnap */ continue; if (cf->tid <= flush_tid) { - list_del(&cf->i_list); + if (__finish_cap_flush(NULL, ci, cf)) + wake_ci = true; list_add_tail(&cf->i_list, &to_remove); } else { cleaned &= ~cf->caps; @@ -3052,14 +3088,9 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&to_remove)) { - u64 oldest_flush_tid; - list_for_each_entry(cf, &to_remove, i_list) - list_del(&cf->g_list); - - oldest_flush_tid = __get_oldest_flush_tid(mdsc); - if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid) - wake_up_all(&mdsc->cap_flushing_wq); + list_for_each_entry(cf, &to_remove, i_list) { + if (__finish_cap_flush(mdsc, NULL, cf)) + wake_mdsc = true; } if (ci->i_flushing_caps == 0) { @@ -3079,7 +3110,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, if (ci->i_dirty_caps == 0) { dout(" inode %p now clean\n", inode); BUG_ON(!list_empty(&ci->i_dirty_item)); - drop = 1; + drop = true; if (ci->i_wr_ref == 0 && ci->i_wrbuffer_ref_head == 0) { BUG_ON(!ci->i_head_snapc); @@ -3091,7 +3122,6 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, } } spin_unlock(&mdsc->cap_dirty_lock); - wake_up_all(&ci->i_cap_wq); out: spin_unlock(&ci->i_ceph_lock); @@ -3102,6 +3132,11 @@ out: list_del(&cf->i_list); ceph_free_cap_flush(cf); } + + if (wake_ci) + wake_up_all(&ci->i_cap_wq); + if (wake_mdsc) + wake_up_all(&mdsc->cap_flushing_wq); if (drop) iput(inode); } @@ -3120,7 +3155,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; u64 follows = le64_to_cpu(m->snap_follows); struct ceph_cap_snap *capsnap; - int flushed = 0; + bool flushed = false; + bool wake_ci = false; + bool wake_mdsc = false; dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", inode, ci, session->s_mds, follows); @@ -3134,7 +3171,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, flush_tid, capsnap->cap_flush.tid); break; } - flushed = 1; + flushed = true; break; } else { dout(" skipping cap_snap %p follows %lld\n", @@ -3142,31 +3179,31 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, } } if (flushed) { - u64 oldest_flush_tid; WARN_ON(capsnap->dirty_pages || capsnap->writing); dout(" removing %p cap_snap %p follows %lld\n", inode, capsnap, follows); list_del(&capsnap->ci_item); - list_del(&capsnap->cap_flush.i_list); + if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) + wake_ci = true; spin_lock(&mdsc->cap_dirty_lock); if (list_empty(&ci->i_cap_flush_list)) list_del_init(&ci->i_flushing_item); - list_del(&capsnap->cap_flush.g_list); - - oldest_flush_tid = __get_oldest_flush_tid(mdsc); - if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid) - wake_up_all(&mdsc->cap_flushing_wq); + if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) + wake_mdsc = true; spin_unlock(&mdsc->cap_dirty_lock); - wake_up_all(&ci->i_cap_wq); } spin_unlock(&ci->i_ceph_lock); if (flushed) { ceph_put_snap_context(capsnap->context); ceph_put_cap_snap(capsnap); + if (wake_ci) + wake_up_all(&ci->i_cap_wq); + if (wake_mdsc) + wake_up_all(&mdsc->cap_flushing_wq); iput(inode); } } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index fa9036af5445..cdc6a17f5867 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1212,6 +1212,8 @@ static void remove_session_caps(struct ceph_mds_session *session) dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, fsc); + wake_up_all(&fsc->mdsc->cap_flushing_wq); + spin_lock(&session->s_cap_lock); if (session->s_nr_caps > 0) { struct inode *inode; @@ -3536,6 +3538,12 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ceph_flush_dirty_caps(mdsc); spin_lock(&mdsc->cap_dirty_lock); want_flush = mdsc->last_cap_flush_tid; + if (!list_empty(&mdsc->cap_flush_list)) { + struct ceph_cap_flush *cf = + list_last_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); + cf->wake = true; + } spin_unlock(&mdsc->cap_dirty_lock); dout("sync want tid %lld flush_seq %lld\n", diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b097d474f888..3e3fa9163059 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -150,6 +150,7 @@ struct ceph_cap { struct ceph_cap_flush { u64 tid; int caps; /* 0 means capsnap */ + bool wake; /* wake up flush waiters when finish ? */ struct list_head g_list; // global struct list_head i_list; // per inode }; -- cgit v1.2.3 From 6b1a9a6c54122dade800fd61e90d441e58de19eb Mon Sep 17 00:00:00 2001 From: Nikolay Borisov Date: Mon, 25 Jul 2016 20:12:13 +0300 Subject: ceph: Mark the file cache as unreclaimable Ceph creates multiple caches with the SLAB_RECLAIMABLE flag set, so that it can satisfy its internal needs. Inspecting the code shows that most of the caches are indeed reclaimable since they are directly related to the generic inode/dentry shrinkers. However, one of the cache used to satisfy struct file is not reclaimable since its entries are freed only when the last reference to the file is dropped. If a heavily loaded node opens a lot of files it can introduce non-trivial discrepancies between memory shown as reclaimable and what is actually reclaimed when drop_caches is used. Fix this by removing the reclaimable flag for the file's cache. Signed-off-by: Nikolay Borisov Signed-off-by: Yan, Zheng --- fs/ceph/super.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 7736a931376e..e247f6f0feb7 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -686,8 +686,8 @@ static int __init init_caches(void) if (ceph_dentry_cachep == NULL) goto bad_dentry; - ceph_file_cachep = KMEM_CACHE(ceph_file_info, - SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); + ceph_file_cachep = KMEM_CACHE(ceph_file_info, SLAB_MEM_SPREAD); + if (ceph_file_cachep == NULL) goto bad_file; -- cgit v1.2.3 From 955818cd5b6c4b58ea574ace4573e7afa4c19c1e Mon Sep 17 00:00:00 2001 From: Phil Turnbull Date: Thu, 21 Jul 2016 13:43:09 -0400 Subject: ceph: Correctly return NXIO errors from ceph_llseek ceph_llseek does not correctly return NXIO errors because the 'out' path always returns 'offset'. Fixes: 06222e491e66 ("fs: handle SEEK_HOLE/SEEK_DATA properly in all fs's that define their own llseek") Signed-off-by: Phil Turnbull Signed-off-by: Yan, Zheng --- fs/ceph/file.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'fs/ceph') diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 7f2ef262cdf7..0f5375d8e030 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1499,16 +1499,14 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) { struct inode *inode = file->f_mapping->host; loff_t i_size; - int ret; + loff_t ret; inode_lock(inode); if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); - if (ret < 0) { - offset = ret; + if (ret < 0) goto out; - } } i_size = i_size_read(inode); @@ -1524,7 +1522,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) * write() or lseek() might have altered it */ if (offset == 0) { - offset = file->f_pos; + ret = file->f_pos; goto out; } offset += file->f_pos; @@ -1544,11 +1542,11 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) break; } - offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes); + ret = vfs_setpos(file, offset, inode->i_sb->s_maxbytes); out: inode_unlock(inode); - return offset; + return ret; } static inline void ceph_zero_partial_page( -- cgit v1.2.3