From 7edaa14c2c822e5b776edaa6534753b258d29488 Mon Sep 17 00:00:00 2001 From: Trumeet Date: Tue, 9 Aug 2022 17:15:45 -0700 Subject: fix(libacron): infinite loop when wic_parse returns 0 This is a rather complicated way to fix backlogging issues. It asks the client to pass partial buffer, and the client also must retry with NULL buffer if ac_receive returns AC_E_AGAIN. --- client/acronc/handler_socket.c | 66 +++++++++++++------------- client/helloworld/main.c | 92 +++++++++++++++++++++++------------- client/libacron/README.md | 58 ++++++++++++++++++++--- client/libacron/include/common.h | 2 + client/libacron/include/net.h | 9 +++- client/libacron/net.c | 66 ++++++++++++++++++++------ client/libacron/private/connection.h | 1 + 7 files changed, 206 insertions(+), 88 deletions(-) diff --git a/client/acronc/handler_socket.c b/client/acronc/handler_socket.c index 2dfda5e..c0a9923 100644 --- a/client/acronc/handler_socket.c +++ b/client/acronc/handler_socket.c @@ -125,50 +125,50 @@ static void on_read(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) { } int r; ac_obj_t *obj = NULL; - if ((r = ac_receive(ac_conn, buf->base, nread, &obj))) { - LOGEV("Cannot parse the response (%d).", r); - /* libac error. Socket is working. */ - ex(false); - return; - } - - if (!ready) { - enum ac_connection_state state; - if ((r = ac_get_state(ac_conn, &state))) { - LOGEV("Cannot get state (%d).", r); + size_t pos = 0; + size_t read = 0; + bool again = false; + while (read < nread || again) { + if (again) { + LOGD("Clearing backlog."); + r = ac_receive(ac_conn, NULL, 0, 0, &obj, NULL); + again = false; + } else { + r = ac_receive(ac_conn, buf->base, pos, nread, &obj, &read); + pos += read; + } + if (r == AC_E_AGAIN) { + again = true; + } else if (r) { + LOGEV("Cannot parse the response (%d).", r); /* libac error. Socket is working. */ ex(false); + free(buf->base); return; } - if (state == AC_STATE_READY) { - ready = true; - if ((cb_ready())) { - /* acronc error. Socket is working. */ - ex(false); - return; - } - } - } - if (obj) { - LOGDV("Got object: %p", obj); - /* uv_async_send is unreliable, and missed messages will cause memory leak. */ - if (cb_recv(obj)) { - /* acronc error. Socket is working. */ - ex(false); - } - LOGD("Clearing backlog."); - while (true) { - if ((r = ac_receive(ac_conn, NULL, 0, &obj))) { - LOGEV("Cannot clear backlog (%d).", r); + if (!ready) { + enum ac_connection_state state; + if ((r = ac_get_state(ac_conn, &state))) { + LOGEV("Cannot get state (%d).", r); /* libac error. Socket is working. */ free(buf->base); ex(false); return; } - if (!obj) { - break; + if (state == AC_STATE_READY) { + ready = true; + if ((cb_ready())) { + /* acronc error. Socket is working. */ + free(buf->base); + ex(false); + return; + } } + } + if (obj) { + LOGDV("Got object: %p", obj); + /* uv_async_send is unreliable, and missed messages will cause memory leak. */ if (cb_recv(obj)) { /* acronc error. Socket is working. */ ex(false); diff --git a/client/helloworld/main.c b/client/helloworld/main.c index 8b3486b..079b957 100644 --- a/client/helloworld/main.c +++ b/client/helloworld/main.c @@ -74,6 +74,8 @@ return 0; \ #endif +static bool ready = false; + static const char *world_name(const enum ac_world world) { switch (world) { case overworld: @@ -227,6 +229,39 @@ static int say(void *connection) { return 0; } +static int process(void *connection, ac_obj_t *obj) { + int r; + if (!ready) { + /* Wait until ready. */ + enum ac_connection_state state; + if ((r = ac_get_state(connection, &state))) { + return r; + } + switch (state) { + case AC_STATE_INIT: + return 0; + case AC_STATE_READY: + printf("Connection is established.\n"); + ready = true; + say(connection); + return 0; + default: + fprintf(stderr, "Unexpected state.\n"); + return 1; + } + } + if (!obj) { + return 0; + } + if (AC_IS_EVENT(obj->type)) { + handle_event((ac_event_t *) obj); + } else if (AC_IS_RESPONSE(obj->type)) { + handle_response((ac_response_t *) obj); + } + ac_object_free(obj); + return 0; +} + int main(int argc, char **argv) { int r; if ((r = net_init())) { @@ -299,11 +334,12 @@ int main(int argc, char **argv) { ac_obj_t *obj; int nr; uint8_t buffer[1000U]; - size_t bytes; - enum ac_connection_state state; printf("Waiting until the connection is established.\n"); - bool ready = false; + bool again = false; while (1) { + size_t read = 0; + size_t pos = 0; + size_t bytes; if ((nr = net_read(&sock, buffer, sizeof(buffer), &bytes, 10))) { if (nr == NET_TIMEOUT) { printf("Receive timeout.\n"); @@ -313,42 +349,30 @@ int main(int argc, char **argv) { } } if ((r = lock())) { goto end; } - if ((r = ac_receive(connection, buffer, bytes, &obj))) { - if ((r = unlock())) { goto end; } - goto end; - } - if (!ready) { - /* Wait until ready. */ - if ((r = ac_get_state(connection, &state))) { - if ((r = unlock())) { goto end; } + if ((r = ac_receive(connection, buffer, pos, bytes, &obj, &read))) { + bool again = r == AC_E_AGAIN; + if (!again) { + unlock(); goto end; } - switch (state) { - case AC_STATE_INIT: - continue; - case AC_STATE_READY: - printf("Connection is established.\n"); - ready = true; - say(connection); - if ((r = unlock())) { goto end; } - continue; - default: - fprintf(stderr, "Unexpected state.\n"); - if ((r = unlock())) { goto end; } - goto end; - } } + r = process(connection, obj); if ((r = unlock())) { goto end; } - - if (!obj) { - continue; - } - if (AC_IS_EVENT(obj->type)) { - handle_event((ac_event_t *) obj); - } else if (AC_IS_RESPONSE(obj->type)) { - handle_response((ac_response_t *) obj); + if (r) { goto end; } + while (!again) { + if ((r = lock())) { goto end; } + if ((r = ac_receive(connection, NULL, 0, 0, &obj, NULL))) { + bool again = r == AC_E_AGAIN; + if (!again) { + unlock(); + goto end; + } + } + again = false; + r = process(connection, obj); + if ((r = unlock())) { goto end; } + if (r) { goto end; } } - ac_object_free(obj); } end: if (nr) { diff --git a/client/libacron/README.md b/client/libacron/README.md index 14d37ad..736b44a 100644 --- a/client/libacron/README.md +++ b/client/libacron/README.md @@ -202,12 +202,22 @@ Then, the program can listen for responses or events. The client should do its own magic to receive from socket and supply `ac_receive` with a buffer and size. +Function `ac_receive` may only parse part of the buffer, so it is the client's responsibility to keep +track of the bytes read: + ```c ac_obj_t *obj = NULL; uint8_t buffer[1000U]; -size_t bytes; -/* TODO: recv() */ -while (!(r = ac_receive(connection, buffer, bytes, &obj))) { +size_t len; +size_t pos = 0; +size_t read; +/* TODO: recv(): reset pos to 0 and set len. */ +while (1) { + if ((r = ac_receive(connection, buffer, pos, len, &obj, &read))) { + /* Handle error. */ + break; + } + pos += read; /* The obj is now referencing to a library allocated event or response. */ /* Do things with the event or response. */ ac_obj_free(obj); @@ -220,9 +230,45 @@ while (!(r = ac_receive(connection, buffer, bytes, &obj))) { > `ac_receive` will backlog the additional parsed messages, and they will return the second time > calling `ac_receive` with buffer = bytes = 0. > -> This will happen only `ac_receive` returns zero with a non-NULL obj output. If that is true, -> the client must additionally call `ac_receive` infinite times with buffer = bytes = 0, until either -> `ac_receive` returns an error or obj output is NULL. +> This will happen only `ac_receive` returns `AC_E_AGAIN`. If that is true, the client must additionally +> call `ac_receive` infinite times with buffer = bytes = 0, until either `ac_receive` returns an error or +> returns `AC_E_OK`. See the code below: + +```c +ac_obj_t *obj = NULL; +uint8_t buffer[1000U]; +size_t len; +size_t pos = 0; +size_t read; +/* TODO: recv(): reset pos to 0 and set len. */ +bool again = false; +while (1) { + if ((r = ac_receive(connection, buffer, pos, len, &obj, &read))) { + if (r == AC_E_AGAIN) { + again = true; + } else { + /* Handle error. */ + break; + } + } + pos += read; + /* The obj is now referencing to a library allocated event or response. */ + /* Do things with the event or response. */ + ac_obj_free(obj); + /* Clear backlog */ + while (again) { + if ((r = ac_receive(connection, NULL, 0, 0, &obj, NULL))) { + if (r == AC_E_AGAIN) { + again = true; + } else { + /* Handle error. */ + break; + } + } + ac_obj_free(obj); + } +} +``` The program can make requests using `ac_request()`: diff --git a/client/libacron/include/common.h b/client/libacron/include/common.h index 79dd29b..a263f7e 100644 --- a/client/libacron/include/common.h +++ b/client/libacron/include/common.h @@ -22,6 +22,8 @@ #define AC_E_INVALID_REQUEST 6 /* Network error */ #define AC_E_NET 7 +/* Call ac_receive again */ +#define AC_E_AGAIN 8 /* Remote errors */ #define AC_ER_BAD_REQUEST 400 diff --git a/client/libacron/include/net.h b/client/libacron/include/net.h index 6f5619a..7b28df2 100644 --- a/client/libacron/include/net.h +++ b/client/libacron/include/net.h @@ -76,15 +76,22 @@ LIBAC_EXPORT int ac_disconnect(void *connection, bool force); * Blocks the current thread until a new response or event arrives. * @param connection A non-NULL and connected connection passed as-is from ac_connect. * @param buffer Buffer of data read from the socket. + * @param pos Read the buffer starting at pos. * @param len Length to buffer. * @param response Output response of either an event or a response. May be NULL even if it succeeds. + * @param len_read Output the bytes read. Must be <= len. The caller must call it again with pos = *len_read until + * the sum of *len_read == len or when return value == AC_E_AGAIN, except when return value != AC_E_OK. + * Note: if return value == AC_E_AGAIN, call ac_receive with buffer = pos = len = 0 once, regardless of *len_read. + * In this case, len_read is ignored. * @return AC_E_OK or an error code. When failed, *response is NULL. * Notes: if the state is changed to CLOSED after receiving, this function will return AC_E_NET. */ LIBAC_EXPORT int ac_receive(void *connection, const void *buffer, + size_t pos, size_t len, - ac_obj_t **response); + ac_obj_t **response, + size_t *len_read); /** * Get the current state of the connection. diff --git a/client/libacron/net.c b/client/libacron/net.c index 55b10ce..cbbe113 100644 --- a/client/libacron/net.c +++ b/client/libacron/net.c @@ -70,7 +70,14 @@ static bool on_message_handler(struct wic_inst *inst, * To prevent the new message from being overwritten (and thus causing * memory leaks), we return false here, making wic backlog the new message * until the next wic_parse. */ + /* wic_parse will return the bytes it read. This includes the bytes of the + * frame that caused us to return false here. Thus, relying on its return value + * to determine if we are blocked here is unreliable. + * We are using our own field to cause ac_receive to know that we are blocked. + * However, we still cannot know where the pos exactly is before this blocked call. + */ LOGD("Two or more frames arrived before wic_parse returns. Keeping the message."); + res->blocked = true; return false; } int r; @@ -202,34 +209,65 @@ int ac_disconnect(void *connection, int ac_receive(void *connection, const void *buffer, + size_t pos, const size_t len, - ac_obj_t **response) { + ac_obj_t **response, + size_t *len_read) { AC_CHECK_INIT; + LOGDV("ac_receive(buffer = %p, pos = %u, len = %u)", + buffer, + pos, + len); struct ac_connection *conn = connection; struct wic_inst *inst = &conn->inst; const uint8_t *ptr = buffer; - size_t retval, pos; + size_t retval; + struct ac_result *res = &conn->result; + bool blocked = false; /* In case the deserializer does not run at all. */ - memset(&conn->result, 0, sizeof(struct ac_result)); - if (!len) { - retval = wic_parse(inst, NULL, 0); - if (wic_get_state(inst) == WIC_STATE_CLOSED) { - LOGE("Connection closed."); - return AC_E_NET; - } - } else { - for (pos = 0U; pos < len; pos += retval) { + memset(res, 0, sizeof(struct ac_result)); + if (len) { + for (; pos < len; pos += retval) { retval = wic_parse(inst, &ptr[pos], len - pos); if (wic_get_state(inst) == WIC_STATE_CLOSED) { LOGE("Connection closed."); return AC_E_NET; } + if (!retval) { + /* Blocked. Ask the client to call again. */ + LOGD("Blocked."); + blocked = true; + break; + } } + *len_read = pos; + } else { + retval = wic_parse(inst, &ptr[pos], len - pos); + if (wic_get_state(inst) == WIC_STATE_CLOSED) { + LOGE("Connection closed."); + return AC_E_NET; + } + /* retval is always 0. We should rely on res->blocked to see if + * there are more than one messages blocked (unlikely). */ } + if (!blocked) { + /* In case we reached pos >= len but there is still a frame + * blocked. Calling wic_parse(NULL, 0) won't help because + * it will return 0 even if nothing is blocking. + * Thus we use the custom 'blocked' field in res to check + * if there is still a blocked message. Then, call wic_parse + * again with NULL and 0 to parse it. + * The assumption is that there is only one possible blocked + * message. + */ + if (res->blocked) blocked = true; + } + LOGDV("Done parsing with %u / %u bytes read.", + pos, + len); - struct ac_result *res = &conn->result; LOGDV("res { has_result = %d, res = %d, obj = %p }", res->has_result, res->res, @@ -240,13 +278,13 @@ int ac_receive(void *connection, return res->res; } else { *response = res->obj; - return AC_E_OK; + return blocked ? AC_E_AGAIN : AC_E_OK; } } else { *response = NULL; } - return AC_E_OK; + return blocked ? AC_E_AGAIN : AC_E_OK; } int ac_request(void *connection, const ac_request_t *request) { diff --git a/client/libacron/private/connection.h b/client/libacron/private/connection.h index a2bb62f..34c2fbe 100644 --- a/client/libacron/private/connection.h +++ b/client/libacron/private/connection.h @@ -14,6 +14,7 @@ * Used to transfer deserialization result from receive handler to ac_receive(). */ struct ac_result { + bool blocked; bool has_result; int res; ac_obj_t *obj; -- cgit v1.2.3