From 7edaa14c2c822e5b776edaa6534753b258d29488 Mon Sep 17 00:00:00 2001 From: Trumeet Date: Tue, 9 Aug 2022 17:15:45 -0700 Subject: fix(libacron): infinite loop when wic_parse returns 0 This is a rather complicated way to fix backlogging issues. It asks the client to pass partial buffer, and the client also must retry with NULL buffer if ac_receive returns AC_E_AGAIN. --- client/libacron/README.md | 58 +++++++++++++++++++++++++++---- client/libacron/include/common.h | 2 ++ client/libacron/include/net.h | 9 ++++- client/libacron/net.c | 66 ++++++++++++++++++++++++++++-------- client/libacron/private/connection.h | 1 + 5 files changed, 115 insertions(+), 21 deletions(-) (limited to 'client/libacron') diff --git a/client/libacron/README.md b/client/libacron/README.md index 14d37ad..736b44a 100644 --- a/client/libacron/README.md +++ b/client/libacron/README.md @@ -202,12 +202,22 @@ Then, the program can listen for responses or events. The client should do its own magic to receive from socket and supply `ac_receive` with a buffer and size. +Function `ac_receive` may only parse part of the buffer, so it is the client's responsibility to keep +track of the bytes read: + ```c ac_obj_t *obj = NULL; uint8_t buffer[1000U]; -size_t bytes; -/* TODO: recv() */ -while (!(r = ac_receive(connection, buffer, bytes, &obj))) { +size_t len; +size_t pos = 0; +size_t read; +/* TODO: recv(): reset pos to 0 and set len. */ +while (1) { + if ((r = ac_receive(connection, buffer, pos, len, &obj, &read))) { + /* Handle error. */ + break; + } + pos += read; /* The obj is now referencing to a library allocated event or response. */ /* Do things with the event or response. */ ac_obj_free(obj); @@ -220,9 +230,45 @@ while (!(r = ac_receive(connection, buffer, bytes, &obj))) { > `ac_receive` will backlog the additional parsed messages, and they will return the second time > calling `ac_receive` with buffer = bytes = 0. > -> This will happen only `ac_receive` returns zero with a non-NULL obj output. If that is true, -> the client must additionally call `ac_receive` infinite times with buffer = bytes = 0, until either -> `ac_receive` returns an error or obj output is NULL. +> This will happen only `ac_receive` returns `AC_E_AGAIN`. If that is true, the client must additionally +> call `ac_receive` infinite times with buffer = bytes = 0, until either `ac_receive` returns an error or +> returns `AC_E_OK`. See the code below: + +```c +ac_obj_t *obj = NULL; +uint8_t buffer[1000U]; +size_t len; +size_t pos = 0; +size_t read; +/* TODO: recv(): reset pos to 0 and set len. */ +bool again = false; +while (1) { + if ((r = ac_receive(connection, buffer, pos, len, &obj, &read))) { + if (r == AC_E_AGAIN) { + again = true; + } else { + /* Handle error. */ + break; + } + } + pos += read; + /* The obj is now referencing to a library allocated event or response. */ + /* Do things with the event or response. */ + ac_obj_free(obj); + /* Clear backlog */ + while (again) { + if ((r = ac_receive(connection, NULL, 0, 0, &obj, NULL))) { + if (r == AC_E_AGAIN) { + again = true; + } else { + /* Handle error. */ + break; + } + } + ac_obj_free(obj); + } +} +``` The program can make requests using `ac_request()`: diff --git a/client/libacron/include/common.h b/client/libacron/include/common.h index 79dd29b..a263f7e 100644 --- a/client/libacron/include/common.h +++ b/client/libacron/include/common.h @@ -22,6 +22,8 @@ #define AC_E_INVALID_REQUEST 6 /* Network error */ #define AC_E_NET 7 +/* Call ac_receive again */ +#define AC_E_AGAIN 8 /* Remote errors */ #define AC_ER_BAD_REQUEST 400 diff --git a/client/libacron/include/net.h b/client/libacron/include/net.h index 6f5619a..7b28df2 100644 --- a/client/libacron/include/net.h +++ b/client/libacron/include/net.h @@ -76,15 +76,22 @@ LIBAC_EXPORT int ac_disconnect(void *connection, bool force); * Blocks the current thread until a new response or event arrives. * @param connection A non-NULL and connected connection passed as-is from ac_connect. * @param buffer Buffer of data read from the socket. + * @param pos Read the buffer starting at pos. * @param len Length to buffer. * @param response Output response of either an event or a response. May be NULL even if it succeeds. + * @param len_read Output the bytes read. Must be <= len. The caller must call it again with pos = *len_read until + * the sum of *len_read == len or when return value == AC_E_AGAIN, except when return value != AC_E_OK. + * Note: if return value == AC_E_AGAIN, call ac_receive with buffer = pos = len = 0 once, regardless of *len_read. + * In this case, len_read is ignored. * @return AC_E_OK or an error code. When failed, *response is NULL. * Notes: if the state is changed to CLOSED after receiving, this function will return AC_E_NET. */ LIBAC_EXPORT int ac_receive(void *connection, const void *buffer, + size_t pos, size_t len, - ac_obj_t **response); + ac_obj_t **response, + size_t *len_read); /** * Get the current state of the connection. diff --git a/client/libacron/net.c b/client/libacron/net.c index 55b10ce..cbbe113 100644 --- a/client/libacron/net.c +++ b/client/libacron/net.c @@ -70,7 +70,14 @@ static bool on_message_handler(struct wic_inst *inst, * To prevent the new message from being overwritten (and thus causing * memory leaks), we return false here, making wic backlog the new message * until the next wic_parse. */ + /* wic_parse will return the bytes it read. This includes the bytes of the + * frame that caused us to return false here. Thus, relying on its return value + * to determine if we are blocked here is unreliable. + * We are using our own field to cause ac_receive to know that we are blocked. + * However, we still cannot know where the pos exactly is before this blocked call. + */ LOGD("Two or more frames arrived before wic_parse returns. Keeping the message."); + res->blocked = true; return false; } int r; @@ -202,34 +209,65 @@ int ac_disconnect(void *connection, int ac_receive(void *connection, const void *buffer, + size_t pos, const size_t len, - ac_obj_t **response) { + ac_obj_t **response, + size_t *len_read) { AC_CHECK_INIT; + LOGDV("ac_receive(buffer = %p, pos = %u, len = %u)", + buffer, + pos, + len); struct ac_connection *conn = connection; struct wic_inst *inst = &conn->inst; const uint8_t *ptr = buffer; - size_t retval, pos; + size_t retval; + struct ac_result *res = &conn->result; + bool blocked = false; /* In case the deserializer does not run at all. */ - memset(&conn->result, 0, sizeof(struct ac_result)); - if (!len) { - retval = wic_parse(inst, NULL, 0); - if (wic_get_state(inst) == WIC_STATE_CLOSED) { - LOGE("Connection closed."); - return AC_E_NET; - } - } else { - for (pos = 0U; pos < len; pos += retval) { + memset(res, 0, sizeof(struct ac_result)); + if (len) { + for (; pos < len; pos += retval) { retval = wic_parse(inst, &ptr[pos], len - pos); if (wic_get_state(inst) == WIC_STATE_CLOSED) { LOGE("Connection closed."); return AC_E_NET; } + if (!retval) { + /* Blocked. Ask the client to call again. */ + LOGD("Blocked."); + blocked = true; + break; + } } + *len_read = pos; + } else { + retval = wic_parse(inst, &ptr[pos], len - pos); + if (wic_get_state(inst) == WIC_STATE_CLOSED) { + LOGE("Connection closed."); + return AC_E_NET; + } + /* retval is always 0. We should rely on res->blocked to see if + * there are more than one messages blocked (unlikely). */ } + if (!blocked) { + /* In case we reached pos >= len but there is still a frame + * blocked. Calling wic_parse(NULL, 0) won't help because + * it will return 0 even if nothing is blocking. + * Thus we use the custom 'blocked' field in res to check + * if there is still a blocked message. Then, call wic_parse + * again with NULL and 0 to parse it. + * The assumption is that there is only one possible blocked + * message. + */ + if (res->blocked) blocked = true; + } + LOGDV("Done parsing with %u / %u bytes read.", + pos, + len); - struct ac_result *res = &conn->result; LOGDV("res { has_result = %d, res = %d, obj = %p }", res->has_result, res->res, @@ -240,13 +278,13 @@ int ac_receive(void *connection, return res->res; } else { *response = res->obj; - return AC_E_OK; + return blocked ? AC_E_AGAIN : AC_E_OK; } } else { *response = NULL; } - return AC_E_OK; + return blocked ? AC_E_AGAIN : AC_E_OK; } int ac_request(void *connection, const ac_request_t *request) { diff --git a/client/libacron/private/connection.h b/client/libacron/private/connection.h index a2bb62f..34c2fbe 100644 --- a/client/libacron/private/connection.h +++ b/client/libacron/private/connection.h @@ -14,6 +14,7 @@ * Used to transfer deserialization result from receive handler to ac_receive(). */ struct ac_result { + bool blocked; bool has_result; int res; ac_obj_t *obj; -- cgit v1.2.3