aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/acronc/handler_socket.c66
-rw-r--r--client/helloworld/main.c92
-rw-r--r--client/libacron/README.md58
-rw-r--r--client/libacron/include/common.h2
-rw-r--r--client/libacron/include/net.h9
-rw-r--r--client/libacron/net.c66
-rw-r--r--client/libacron/private/connection.h1
7 files changed, 206 insertions, 88 deletions
diff --git a/client/acronc/handler_socket.c b/client/acronc/handler_socket.c
index 2dfda5e..c0a9923 100644
--- a/client/acronc/handler_socket.c
+++ b/client/acronc/handler_socket.c
@@ -125,50 +125,50 @@ static void on_read(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) {
}
int r;
ac_obj_t *obj = NULL;
- if ((r = ac_receive(ac_conn, buf->base, nread, &obj))) {
- LOGEV("Cannot parse the response (%d).", r);
- /* libac error. Socket is working. */
- ex(false);
- return;
- }
-
- if (!ready) {
- enum ac_connection_state state;
- if ((r = ac_get_state(ac_conn, &state))) {
- LOGEV("Cannot get state (%d).", r);
+ size_t pos = 0;
+ size_t read = 0;
+ bool again = false;
+ while (read < nread || again) {
+ if (again) {
+ LOGD("Clearing backlog.");
+ r = ac_receive(ac_conn, NULL, 0, 0, &obj, NULL);
+ again = false;
+ } else {
+ r = ac_receive(ac_conn, buf->base, pos, nread, &obj, &read);
+ pos += read;
+ }
+ if (r == AC_E_AGAIN) {
+ again = true;
+ } else if (r) {
+ LOGEV("Cannot parse the response (%d).", r);
/* libac error. Socket is working. */
ex(false);
+ free(buf->base);
return;
}
- if (state == AC_STATE_READY) {
- ready = true;
- if ((cb_ready())) {
- /* acronc error. Socket is working. */
- ex(false);
- return;
- }
- }
- }
- if (obj) {
- LOGDV("Got object: %p", obj);
- /* uv_async_send is unreliable, and missed messages will cause memory leak. */
- if (cb_recv(obj)) {
- /* acronc error. Socket is working. */
- ex(false);
- }
- LOGD("Clearing backlog.");
- while (true) {
- if ((r = ac_receive(ac_conn, NULL, 0, &obj))) {
- LOGEV("Cannot clear backlog (%d).", r);
+ if (!ready) {
+ enum ac_connection_state state;
+ if ((r = ac_get_state(ac_conn, &state))) {
+ LOGEV("Cannot get state (%d).", r);
/* libac error. Socket is working. */
free(buf->base);
ex(false);
return;
}
- if (!obj) {
- break;
+ if (state == AC_STATE_READY) {
+ ready = true;
+ if ((cb_ready())) {
+ /* acronc error. Socket is working. */
+ free(buf->base);
+ ex(false);
+ return;
+ }
}
+ }
+ if (obj) {
+ LOGDV("Got object: %p", obj);
+ /* uv_async_send is unreliable, and missed messages will cause memory leak. */
if (cb_recv(obj)) {
/* acronc error. Socket is working. */
ex(false);
diff --git a/client/helloworld/main.c b/client/helloworld/main.c
index 8b3486b..079b957 100644
--- a/client/helloworld/main.c
+++ b/client/helloworld/main.c
@@ -74,6 +74,8 @@ return 0; \
#endif
+static bool ready = false;
+
static const char *world_name(const enum ac_world world) {
switch (world) {
case overworld:
@@ -227,6 +229,39 @@ static int say(void *connection) {
return 0;
}
+static int process(void *connection, ac_obj_t *obj) {
+ int r;
+ if (!ready) {
+ /* Wait until ready. */
+ enum ac_connection_state state;
+ if ((r = ac_get_state(connection, &state))) {
+ return r;
+ }
+ switch (state) {
+ case AC_STATE_INIT:
+ return 0;
+ case AC_STATE_READY:
+ printf("Connection is established.\n");
+ ready = true;
+ say(connection);
+ return 0;
+ default:
+ fprintf(stderr, "Unexpected state.\n");
+ return 1;
+ }
+ }
+ if (!obj) {
+ return 0;
+ }
+ if (AC_IS_EVENT(obj->type)) {
+ handle_event((ac_event_t *) obj);
+ } else if (AC_IS_RESPONSE(obj->type)) {
+ handle_response((ac_response_t *) obj);
+ }
+ ac_object_free(obj);
+ return 0;
+}
+
int main(int argc, char **argv) {
int r;
if ((r = net_init())) {
@@ -299,11 +334,12 @@ int main(int argc, char **argv) {
ac_obj_t *obj;
int nr;
uint8_t buffer[1000U];
- size_t bytes;
- enum ac_connection_state state;
printf("Waiting until the connection is established.\n");
- bool ready = false;
+ bool again = false;
while (1) {
+ size_t read = 0;
+ size_t pos = 0;
+ size_t bytes;
if ((nr = net_read(&sock, buffer, sizeof(buffer), &bytes, 10))) {
if (nr == NET_TIMEOUT) {
printf("Receive timeout.\n");
@@ -313,42 +349,30 @@ int main(int argc, char **argv) {
}
}
if ((r = lock())) { goto end; }
- if ((r = ac_receive(connection, buffer, bytes, &obj))) {
- if ((r = unlock())) { goto end; }
- goto end;
- }
- if (!ready) {
- /* Wait until ready. */
- if ((r = ac_get_state(connection, &state))) {
- if ((r = unlock())) { goto end; }
+ if ((r = ac_receive(connection, buffer, pos, bytes, &obj, &read))) {
+ bool again = r == AC_E_AGAIN;
+ if (!again) {
+ unlock();
goto end;
}
- switch (state) {
- case AC_STATE_INIT:
- continue;
- case AC_STATE_READY:
- printf("Connection is established.\n");
- ready = true;
- say(connection);
- if ((r = unlock())) { goto end; }
- continue;
- default:
- fprintf(stderr, "Unexpected state.\n");
- if ((r = unlock())) { goto end; }
- goto end;
- }
}
+ r = process(connection, obj);
if ((r = unlock())) { goto end; }
-
- if (!obj) {
- continue;
- }
- if (AC_IS_EVENT(obj->type)) {
- handle_event((ac_event_t *) obj);
- } else if (AC_IS_RESPONSE(obj->type)) {
- handle_response((ac_response_t *) obj);
+ if (r) { goto end; }
+ while (!again) {
+ if ((r = lock())) { goto end; }
+ if ((r = ac_receive(connection, NULL, 0, 0, &obj, NULL))) {
+ bool again = r == AC_E_AGAIN;
+ if (!again) {
+ unlock();
+ goto end;
+ }
+ }
+ again = false;
+ r = process(connection, obj);
+ if ((r = unlock())) { goto end; }
+ if (r) { goto end; }
}
- ac_object_free(obj);
}
end:
if (nr) {
diff --git a/client/libacron/README.md b/client/libacron/README.md
index 14d37ad..736b44a 100644
--- a/client/libacron/README.md
+++ b/client/libacron/README.md
@@ -202,12 +202,22 @@ Then, the program can listen for responses or events.
The client should do its own magic to receive from socket and
supply `ac_receive` with a buffer and size.
+Function `ac_receive` may only parse part of the buffer, so it is the client's responsibility to keep
+track of the bytes read:
+
```c
ac_obj_t *obj = NULL;
uint8_t buffer[1000U];
-size_t bytes;
-/* TODO: recv() */
-while (!(r = ac_receive(connection, buffer, bytes, &obj))) {
+size_t len;
+size_t pos = 0;
+size_t read;
+/* TODO: recv(): reset pos to 0 and set len. */
+while (1) {
+ if ((r = ac_receive(connection, buffer, pos, len, &obj, &read))) {
+ /* Handle error. */
+ break;
+ }
+ pos += read;
/* The obj is now referencing to a library allocated event or response. */
/* Do things with the event or response. */
ac_obj_free(obj);
@@ -220,9 +230,45 @@ while (!(r = ac_receive(connection, buffer, bytes, &obj))) {
> `ac_receive` will backlog the additional parsed messages, and they will return the second time
> calling `ac_receive` with buffer = bytes = 0.
>
-> This will happen only `ac_receive` returns zero with a non-NULL obj output. If that is true,
-> the client must additionally call `ac_receive` infinite times with buffer = bytes = 0, until either
-> `ac_receive` returns an error or obj output is NULL.
+> This will happen only `ac_receive` returns `AC_E_AGAIN`. If that is true, the client must additionally
+> call `ac_receive` infinite times with buffer = bytes = 0, until either `ac_receive` returns an error or
+> returns `AC_E_OK`. See the code below:
+
+```c
+ac_obj_t *obj = NULL;
+uint8_t buffer[1000U];
+size_t len;
+size_t pos = 0;
+size_t read;
+/* TODO: recv(): reset pos to 0 and set len. */
+bool again = false;
+while (1) {
+ if ((r = ac_receive(connection, buffer, pos, len, &obj, &read))) {
+ if (r == AC_E_AGAIN) {
+ again = true;
+ } else {
+ /* Handle error. */
+ break;
+ }
+ }
+ pos += read;
+ /* The obj is now referencing to a library allocated event or response. */
+ /* Do things with the event or response. */
+ ac_obj_free(obj);
+ /* Clear backlog */
+ while (again) {
+ if ((r = ac_receive(connection, NULL, 0, 0, &obj, NULL))) {
+ if (r == AC_E_AGAIN) {
+ again = true;
+ } else {
+ /* Handle error. */
+ break;
+ }
+ }
+ ac_obj_free(obj);
+ }
+}
+```
The program can make requests using `ac_request()`:
diff --git a/client/libacron/include/common.h b/client/libacron/include/common.h
index 79dd29b..a263f7e 100644
--- a/client/libacron/include/common.h
+++ b/client/libacron/include/common.h
@@ -22,6 +22,8 @@
#define AC_E_INVALID_REQUEST 6
/* Network error */
#define AC_E_NET 7
+/* Call ac_receive again */
+#define AC_E_AGAIN 8
/* Remote errors */
#define AC_ER_BAD_REQUEST 400
diff --git a/client/libacron/include/net.h b/client/libacron/include/net.h
index 6f5619a..7b28df2 100644
--- a/client/libacron/include/net.h
+++ b/client/libacron/include/net.h
@@ -76,15 +76,22 @@ LIBAC_EXPORT int ac_disconnect(void *connection, bool force);
* Blocks the current thread until a new response or event arrives.
* @param connection A non-NULL and connected connection passed as-is from ac_connect.
* @param buffer Buffer of data read from the socket.
+ * @param pos Read the buffer starting at pos.
* @param len Length to buffer.
* @param response Output response of either an event or a response. May be NULL even if it succeeds.
+ * @param len_read Output the bytes read. Must be <= len. The caller must call it again with pos = *len_read until
+ * the sum of *len_read == len or when return value == AC_E_AGAIN, except when return value != AC_E_OK.
+ * Note: if return value == AC_E_AGAIN, call ac_receive with buffer = pos = len = 0 once, regardless of *len_read.
+ * In this case, len_read is ignored.
* @return AC_E_OK or an error code. When failed, *response is NULL.
* Notes: if the state is changed to CLOSED after receiving, this function will return AC_E_NET.
*/
LIBAC_EXPORT int ac_receive(void *connection,
const void *buffer,
+ size_t pos,
size_t len,
- ac_obj_t **response);
+ ac_obj_t **response,
+ size_t *len_read);
/**
* Get the current state of the connection.
diff --git a/client/libacron/net.c b/client/libacron/net.c
index 55b10ce..cbbe113 100644
--- a/client/libacron/net.c
+++ b/client/libacron/net.c
@@ -70,7 +70,14 @@ static bool on_message_handler(struct wic_inst *inst,
* To prevent the new message from being overwritten (and thus causing
* memory leaks), we return false here, making wic backlog the new message
* until the next wic_parse. */
+ /* wic_parse will return the bytes it read. This includes the bytes of the
+ * frame that caused us to return false here. Thus, relying on its return value
+ * to determine if we are blocked here is unreliable.
+ * We are using our own field to cause ac_receive to know that we are blocked.
+ * However, we still cannot know where the pos exactly is before this blocked call.
+ */
LOGD("Two or more frames arrived before wic_parse returns. Keeping the message.");
+ res->blocked = true;
return false;
}
int r;
@@ -202,34 +209,65 @@ int ac_disconnect(void *connection,
int ac_receive(void *connection,
const void *buffer,
+ size_t pos,
const size_t len,
- ac_obj_t **response) {
+ ac_obj_t **response,
+ size_t *len_read) {
AC_CHECK_INIT;
+ LOGDV("ac_receive(buffer = %p, pos = %u, len = %u)",
+ buffer,
+ pos,
+ len);
struct ac_connection *conn = connection;
struct wic_inst *inst = &conn->inst;
const uint8_t *ptr = buffer;
- size_t retval, pos;
+ size_t retval;
+ struct ac_result *res = &conn->result;
+ bool blocked = false;
/* In case the deserializer does not run at all. */
- memset(&conn->result, 0, sizeof(struct ac_result));
- if (!len) {
- retval = wic_parse(inst, NULL, 0);
- if (wic_get_state(inst) == WIC_STATE_CLOSED) {
- LOGE("Connection closed.");
- return AC_E_NET;
- }
- } else {
- for (pos = 0U; pos < len; pos += retval) {
+ memset(res, 0, sizeof(struct ac_result));
+ if (len) {
+ for (; pos < len; pos += retval) {
retval = wic_parse(inst, &ptr[pos], len - pos);
if (wic_get_state(inst) == WIC_STATE_CLOSED) {
LOGE("Connection closed.");
return AC_E_NET;
}
+ if (!retval) {
+ /* Blocked. Ask the client to call again. */
+ LOGD("Blocked.");
+ blocked = true;
+ break;
+ }
}
+ *len_read = pos;
+ } else {
+ retval = wic_parse(inst, &ptr[pos], len - pos);
+ if (wic_get_state(inst) == WIC_STATE_CLOSED) {
+ LOGE("Connection closed.");
+ return AC_E_NET;
+ }
+ /* retval is always 0. We should rely on res->blocked to see if
+ * there are more than one messages blocked (unlikely). */
}
+ if (!blocked) {
+ /* In case we reached pos >= len but there is still a frame
+ * blocked. Calling wic_parse(NULL, 0) won't help because
+ * it will return 0 even if nothing is blocking.
+ * Thus we use the custom 'blocked' field in res to check
+ * if there is still a blocked message. Then, call wic_parse
+ * again with NULL and 0 to parse it.
+ * The assumption is that there is only one possible blocked
+ * message.
+ */
+ if (res->blocked) blocked = true;
+ }
+ LOGDV("Done parsing with %u / %u bytes read.",
+ pos,
+ len);
- struct ac_result *res = &conn->result;
LOGDV("res { has_result = %d, res = %d, obj = %p }",
res->has_result,
res->res,
@@ -240,13 +278,13 @@ int ac_receive(void *connection,
return res->res;
} else {
*response = res->obj;
- return AC_E_OK;
+ return blocked ? AC_E_AGAIN : AC_E_OK;
}
} else {
*response = NULL;
}
- return AC_E_OK;
+ return blocked ? AC_E_AGAIN : AC_E_OK;
}
int ac_request(void *connection, const ac_request_t *request) {
diff --git a/client/libacron/private/connection.h b/client/libacron/private/connection.h
index a2bb62f..34c2fbe 100644
--- a/client/libacron/private/connection.h
+++ b/client/libacron/private/connection.h
@@ -14,6 +14,7 @@
* Used to transfer deserialization result from receive handler to ac_receive().
*/
struct ac_result {
+ bool blocked;
bool has_result;
int res;
ac_obj_t *obj;