rbd: there is really only one op

Throughout the rbd code there are spots where it appears we can
handle an osd request containing more than one osd request op.

But that is only the way it appears.  In fact, currently only one
operation at a time can be supported, and supporting more than
one will require much more than fleshing out the support that's
there now.

This patch changes names to make it perfectly clear that anywhere
we're dealing with a block of ops, we're in fact dealing with
exactly one of them.  We'll be able to simplify some things as
a result.

When multiple op support is implemented, we can update things again
accordingly.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2012-11-13 21:11:15 -06:00
parent ae7ca4a35b
commit 139b4318ad
1 changed files with 56 additions and 62 deletions

View File

@ -1023,32 +1023,26 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
return NULL;
}
/*
* helpers for osd request op vectors.
*/
static struct ceph_osd_req_op *rbd_create_rw_ops(int num_op,
int opcode, u32 payload_len)
static struct ceph_osd_req_op *rbd_create_rw_op(int opcode, u32 payload_len)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
ops = kzalloc(num_op * sizeof (*ops), GFP_NOIO);
if (!ops)
op = kzalloc(sizeof (*op), GFP_NOIO);
if (!op)
return NULL;
ops[0].op = opcode;
/*
* op extent offset and length will be set later on
* in calc_raw_layout()
*/
ops[0].payload_len = payload_len;
op->op = opcode;
op->payload_len = payload_len;
return ops;
return op;
}
static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
static void rbd_destroy_op(struct ceph_osd_req_op *op)
{
kfree(ops);
kfree(op);
}
static void rbd_coll_end_req_index(struct request *rq,
@ -1314,7 +1308,7 @@ static int rbd_do_op(struct request *rq,
u64 seg_ofs;
u64 seg_len;
int ret;
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
u32 payload_len;
int opcode;
int flags;
@ -1340,8 +1334,8 @@ static int rbd_do_op(struct request *rq,
}
ret = -ENOMEM;
ops = rbd_create_rw_ops(1, opcode, payload_len);
if (!ops)
op = rbd_create_rw_op(opcode, payload_len);
if (!op)
goto done;
/* we've taken care of segment sizes earlier when we
@ -1354,13 +1348,13 @@ static int rbd_do_op(struct request *rq,
bio,
NULL, 0,
flags,
1, ops,
1, op,
coll, coll_index,
rbd_req_cb, 0, NULL);
if (ret < 0)
rbd_coll_end_req_index(rq, coll, coll_index,
(s32)ret, seg_len);
rbd_destroy_ops(ops);
rbd_destroy_op(op);
done:
kfree(seg_name);
return ret;
@ -1375,16 +1369,16 @@ static int rbd_req_sync_read(struct rbd_device *rbd_dev,
char *buf,
u64 *ver)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
if (!ops)
op = rbd_create_rw_op(CEPH_OSD_OP_READ, 0);
if (!op)
return -ENOMEM;
ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1, ops, object_name, ofs, len, buf, NULL, ver);
rbd_destroy_ops(ops);
1, op, object_name, ofs, len, buf, NULL, ver);
rbd_destroy_op(op);
return ret;
}
@ -1396,26 +1390,26 @@ static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
u64 ver,
u64 notify_id)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
if (!ops)
op = rbd_create_rw_op(CEPH_OSD_OP_NOTIFY_ACK, 0);
if (!op)
return -ENOMEM;
ops[0].watch.ver = cpu_to_le64(ver);
ops[0].watch.cookie = notify_id;
ops[0].watch.flag = 0;
op->watch.ver = cpu_to_le64(ver);
op->watch.cookie = notify_id;
op->watch.flag = 0;
ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
rbd_dev->header_name, 0, 0, NULL,
NULL, 0,
CEPH_OSD_FLAG_READ,
1, ops,
1, op,
NULL, 0,
rbd_simple_req_cb, 0, NULL);
rbd_destroy_ops(ops);
rbd_destroy_op(op);
return ret;
}
@ -1444,12 +1438,12 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
*/
static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
if (!ops)
op = rbd_create_rw_op(CEPH_OSD_OP_WATCH, 0);
if (!op)
return -ENOMEM;
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
@ -1457,13 +1451,13 @@ static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
if (ret < 0)
goto fail;
ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
ops[0].watch.flag = 1;
op->watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
op->watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
op->watch.flag = 1;
ret = rbd_req_sync_op(rbd_dev,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1, ops,
1, op,
rbd_dev->header_name,
0, 0, NULL,
&rbd_dev->watch_request, NULL);
@ -1471,14 +1465,14 @@ static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
if (ret < 0)
goto fail_event;
rbd_destroy_ops(ops);
rbd_destroy_op(op);
return 0;
fail_event:
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
fail:
rbd_destroy_ops(ops);
rbd_destroy_op(op);
return ret;
}
@ -1487,25 +1481,25 @@ static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
*/
static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
if (!ops)
op = rbd_create_rw_op(CEPH_OSD_OP_WATCH, 0);
if (!op)
return -ENOMEM;
ops[0].watch.ver = 0;
ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
ops[0].watch.flag = 0;
op->watch.ver = 0;
op->watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
op->watch.flag = 0;
ret = rbd_req_sync_op(rbd_dev,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1, ops,
1, op,
rbd_dev->header_name,
0, 0, NULL, NULL, NULL);
rbd_destroy_ops(ops);
rbd_destroy_op(op);
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
return ret;
@ -1524,7 +1518,7 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
size_t inbound_size,
u64 *ver)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_req_op *op;
int class_name_len = strlen(class_name);
int method_name_len = strlen(method_name);
int payload_size;
@ -1539,23 +1533,23 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
* operation.
*/
payload_size = class_name_len + method_name_len + outbound_size;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
if (!ops)
op = rbd_create_rw_op(CEPH_OSD_OP_CALL, payload_size);
if (!op)
return -ENOMEM;
ops[0].cls.class_name = class_name;
ops[0].cls.class_len = (__u8) class_name_len;
ops[0].cls.method_name = method_name;
ops[0].cls.method_len = (__u8) method_name_len;
ops[0].cls.argc = 0;
ops[0].cls.indata = outbound;
ops[0].cls.indata_len = outbound_size;
op->cls.class_name = class_name;
op->cls.class_len = (__u8) class_name_len;
op->cls.method_name = method_name;
op->cls.method_len = (__u8) method_name_len;
op->cls.argc = 0;
op->cls.indata = outbound;
op->cls.indata_len = outbound_size;
ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, 1, ops,
ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, 1, op,
object_name, 0, inbound_size, inbound,
NULL, ver);
rbd_destroy_ops(ops);
rbd_destroy_op(op);
dout("cls_exec returned %d\n", ret);
return ret;