diff options
author | Alex Elder | 2013-03-11 23:34:24 -0500 |
---|---|---|
committer | Sage Weil | 2013-05-01 21:17:37 -0700 |
commit | 6644ed7b7e04f8e588aebdaa58cededb9416ab95 (patch) | |
tree | 2770308c0a48245b81754238f0c37e87605fde3b /net | |
parent | 8ea299bcbc85aeaf5348d99614b35433287bec6b (diff) |
libceph: make message data be a pointer
Begin the transition from a single message data item to a list of
them by replacing the "data" structure in a message with a pointer
to a ceph_msg_data structure.
A null pointer will indicate the message has no data; replace the
use of ceph_msg_has_data() with a simple check for a null pointer.
Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
to dynamically allocate and free a data item structure of a given type.
When a message has its data item "set," allocate one of these to
hold the data description, and free it when the last reference to
the message is dropped.
This partially resolves:
http://tracker.ceph.com/issues/4429
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/messenger.c | 94 |
1 files changed, 61 insertions, 33 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index dd4b8226a48a..d4e46d8a088c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg) /* Initialize data cursor */ - ceph_msg_data_cursor_init(&msg->data, data_len); + ceph_msg_data_cursor_init(msg->data, data_len); } /* @@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page, static int write_partial_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; - struct ceph_msg_data_cursor *cursor = &msg->data.cursor; + struct ceph_msg_data_cursor *cursor = &msg->data->cursor; bool do_datacrc = !con->msgr->nocrc; u32 crc; dout("%s %p msg %p\n", __func__, con, msg); - if (WARN_ON(!ceph_msg_has_data(msg))) + if (WARN_ON(!msg->data)) return -EINVAL; /* @@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con) bool need_crc; int ret; - page = ceph_msg_data_next(&msg->data, &page_offset, &length, + page = ceph_msg_data_next(msg->data, &page_offset, &length, &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, last_piece); @@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con) } if (do_datacrc && cursor->need_crc) crc = ceph_crc32c_page(crc, page, page_offset, length); - need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret); + need_crc = ceph_msg_data_advance(msg->data, (size_t)ret); } dout("%s %p msg %p done\n", __func__, con, msg); @@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con, static int read_partial_msg_data(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; - struct ceph_msg_data_cursor *cursor = &msg->data.cursor; + struct ceph_msg_data_cursor *cursor = &msg->data->cursor; const bool do_datacrc = !con->msgr->nocrc; struct page *page; size_t page_offset; @@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con) int ret; BUG_ON(!msg); - if (WARN_ON(!ceph_msg_has_data(msg))) + if (!msg->data) return -EIO; if (do_datacrc) crc = con->in_data_crc; while (cursor->resid) { - page = ceph_msg_data_next(&msg->data, &page_offset, &length, + page = ceph_msg_data_next(msg->data, &page_offset, &length, NULL); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); if (ret <= 0) { @@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = ceph_crc32c_page(crc, page, page_offset, ret); - (void) ceph_msg_data_advance(&msg->data, (size_t) ret); + (void) ceph_msg_data_advance(msg->data, (size_t)ret); } if (do_datacrc) con->in_data_crc = crc; @@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con) } EXPORT_SYMBOL(ceph_con_keepalive); -static void ceph_msg_data_init(struct ceph_msg_data *data) +static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) { - data->type = CEPH_MSG_DATA_NONE; + struct ceph_msg_data *data; + + if (WARN_ON(!ceph_msg_data_type_valid(type))) + return NULL; + + data = kzalloc(sizeof (*data), GFP_NOFS); + if (data) + data->type = type; + + return data; +} + +static void ceph_msg_data_destroy(struct ceph_msg_data *data) +{ + if (!data) + return; + + if (data->type == CEPH_MSG_DATA_PAGELIST) { + ceph_pagelist_release(data->pagelist); + kfree(data->pagelist); + } + kfree(data); } void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, size_t length, size_t alignment) { + struct ceph_msg_data *data; + BUG_ON(!pages); BUG_ON(!length); - BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); + BUG_ON(msg->data != NULL); + + data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); + BUG_ON(!data); + data->pages = pages; + data->length = length; + data->alignment = alignment & ~PAGE_MASK; - msg->data.type = CEPH_MSG_DATA_PAGES; - msg->data.pages = pages; - msg->data.length = length; - msg->data.alignment = alignment & ~PAGE_MASK; + msg->data = data; } EXPORT_SYMBOL(ceph_msg_data_set_pages); void ceph_msg_data_set_pagelist(struct ceph_msg *msg, struct ceph_pagelist *pagelist) { + struct ceph_msg_data *data; + BUG_ON(!pagelist); BUG_ON(!pagelist->length); - BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); + BUG_ON(msg->data != NULL); - msg->data.type = CEPH_MSG_DATA_PAGELIST; - msg->data.pagelist = pagelist; + data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); + BUG_ON(!data); + data->pagelist = pagelist; + + msg->data = data; } EXPORT_SYMBOL(ceph_msg_data_set_pagelist); void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) { + struct ceph_msg_data *data; + BUG_ON(!bio); - BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); + BUG_ON(msg->data != NULL); - msg->data.type = CEPH_MSG_DATA_BIO; - msg->data.bio = bio; + data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); + BUG_ON(!data); + data->bio = bio; + + msg->data = data; } EXPORT_SYMBOL(ceph_msg_data_set_bio); @@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->list_head); kref_init(&m->kref); - ceph_msg_data_init(&m->data); - /* front */ m->front_max = front_len; if (front_len) { @@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref) ceph_buffer_put(m->middle); m->middle = NULL; } - if (ceph_msg_has_data(m)) { - if (m->data.type == CEPH_MSG_DATA_PAGELIST) { - ceph_pagelist_release(m->data.pagelist); - kfree(m->data.pagelist); - } - memset(&m->data, 0, sizeof m->data); - ceph_msg_data_init(&m->data); - } + ceph_msg_data_destroy(m->data); + m->data = NULL; if (m->pool) ceph_msgpool_put(m->pool, m); @@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put); void ceph_msg_dump(struct ceph_msg *msg) { pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, - msg->front_max, msg->data.length); + msg->front_max, msg->data->length); print_hex_dump(KERN_DEBUG, "header: ", DUMP_PREFIX_OFFSET, 16, 1, &msg->hdr, sizeof(msg->hdr), true); |