/*
- * Copyright (c) 2009 Nicira Networks.
+ * Copyright (c) 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include "jsonrpc.h"
+#include <assert.h>
#include <errno.h>
#include "byteq.h"
return;
}
+ stream_run(rpc->stream);
while (!queue_is_empty(&rpc->output)) {
struct ofpbuf *buf = rpc->output.head;
int retval;
void
jsonrpc_wait(struct jsonrpc *rpc)
{
- if (!rpc->status && !queue_is_empty(&rpc->output)) {
- stream_send_wait(rpc->stream);
+ if (!rpc->status) {
+ stream_run_wait(rpc->stream);
+ if (!queue_is_empty(&rpc->output)) {
+ stream_send_wait(rpc->stream);
+ }
}
}
}
if (msg->params) {
ds_put_cstr(&s, ", params=");
- ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
+ json_to_ds(msg->params, 0, &s);
}
if (msg->result) {
ds_put_cstr(&s, ", result=");
- ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
+ json_to_ds(msg->result, 0, &s);
}
if (msg->error) {
ds_put_cstr(&s, ", error=");
- ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
+ json_to_ds(msg->error, 0, &s);
}
if (msg->id) {
ds_put_cstr(&s, ", id=");
- ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
+ json_to_ds(msg->id, 0, &s);
}
VLOG_DBG("%s: %s %s%s", rpc->name, title,
jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
}
}
+/* Always takes ownership of 'msg', regardless of success. */
int
jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
{
}
}
+/* Always takes ownership of 'msg', regardless of success. */
int
jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
{
return error;
}
- while (!queue_is_empty(&rpc->output) && !rpc->status) {
+ for (;;) {
jsonrpc_run(rpc);
+ if (queue_is_empty(&rpc->output) || rpc->status) {
+ return rpc->status;
+ }
jsonrpc_wait(rpc);
poll_block();
}
- return rpc->status;
}
int
}
}
+/* Always takes ownership of 'request', regardless of success. */
int
jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
struct jsonrpc_msg **replyp)
static void
jsonrpc_session_disconnect(struct jsonrpc_session *s)
{
- reconnect_disconnected(s->reconnect, time_msec(), 0);
if (s->rpc) {
jsonrpc_error(s->rpc, EOF);
jsonrpc_close(s->rpc);
jsonrpc_run(s->rpc);
error = jsonrpc_get_status(s->rpc);
if (error) {
+ reconnect_disconnected(s->reconnect, time_msec(), 0);
jsonrpc_session_disconnect(s);
}
} else if (s->stream) {
- int error = stream_connect(s->stream);
+ int error;
+
+ stream_run(s->stream);
+ error = stream_connect(s->stream);
if (!error) {
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(s->stream);
break;
case RECONNECT_DISCONNECT:
+ reconnect_disconnected(s->reconnect, time_msec(), 0);
jsonrpc_session_disconnect(s);
break;
if (s->rpc) {
jsonrpc_wait(s->rpc);
} else if (s->stream) {
+ stream_run_wait(s->stream);
stream_connect_wait(s->stream);
}
reconnect_wait(s->reconnect, time_msec());
return reconnect_get_name(s->reconnect);
}
+/* Always takes ownership of 'msg', regardless of success. */
int
jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
{
- return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
+ if (s->rpc) {
+ return jsonrpc_send(s->rpc, msg);
+ } else {
+ jsonrpc_msg_destroy(msg);
+ return ENOTCONN;
+ }
}
struct jsonrpc_msg *