/* * Created by yuuta on 7/13/22. */ #include "config.h" #include "net.h" #include "connection.h" #include "log.h" #include "helpers.h" #include "serializer.h" /* TODO: wic logging is not yet supported */ #include "wic.h" #include #include #include #include #include static void conn_free(struct ac_connection *conn) { if (conn->url) free(conn->url); if (conn->parameters.id) free(conn->parameters.id); if (conn->parameters.host) free(conn->parameters.host); if (conn->parameters.token) free(conn->parameters.token); free(conn); } static void on_send_handler(struct wic_inst *inst, const void *data, size_t size, enum wic_buffer type) { struct ac_connection *conn = wic_get_app(inst); conn->parameters.on_send(conn->parameters.sock, data, size); free((void *) data); } static void *on_buffer_handler(struct wic_inst *inst, size_t min_size, enum wic_buffer type, size_t *max_size) { void *buf; if ((alloc(NULL, 1, *max_size = (min_size ? min_size : 1024), false, &buf))) { return NULL; } return buf; } static bool on_message_handler(struct wic_inst *inst, enum wic_encoding encoding, bool fin, const char *data, uint16_t size) { struct ac_connection *conn = wic_get_app(inst); if (encoding != WIC_ENCODING_UTF8) { LOGE("Invalid encoding received from server. Only text frames are supported."); return true; } /* TODO: Only parse when fin = true? */ struct ac_result *res = &conn->result; if (res->has_result) { /* This flag will be cleared upon ac_receive using memset(). * A positive flag indicates that on_message_handler was called * more than once before wic_parse returns. This could happen due * to more than one WebSocket frames were returned by a single read. * To prevent the new message from being overwritten (and thus causing * memory leaks), we return false here, making wic backlog the new message * until the next wic_parse. */ /* wic_parse will return the bytes it read. This includes the bytes of the * frame that caused us to return false here. Thus, relying on its return value * to determine if we are blocked here is unreliable. * We are using our own field to cause ac_receive to know that we are blocked. * However, we still cannot know where the pos exactly is before this blocked call. */ LOGD("Two or more frames arrived before wic_parse returns. Keeping the message."); res->blocked = true; return false; } int r; if ((r = deserialize(data, size, &res->obj))) { if (r == AC_E_SERIALIZER_CONTINUE) { if (fin) { serializer_finalize(); LOGE("Server returned an incomplete response."); res->res = AC_E_INVALID_RESPONSE; } else { res->res = AC_E_OK; } } else { res->res = r; } } else { res->has_result = true; res->res = r; } LOGDV("Post deserializing: { has_result = %d, res = %d, obj = %p }", res->has_result, res->res, res->obj); return true; } int ac_connect(ac_connection_parameters_t parameters, void **out) { AC_CHECK_INIT; if (!parameters.id || !parameters.host || !parameters.token || !parameters.sock || !parameters.on_send) { LOGE("Required parameters are missing."); return AC_E_INVALID_REQUEST; } struct ac_connection *conn; int r; if ((r = SALLOC(struct ac_connection, &conn))) { return r; } /* Do not use memcpy() here because we want to keep track of copied and not-copied strings. */ conn->parameters.port = parameters.port; conn->parameters.sock = parameters.sock; conn->parameters.version = parameters.version; conn->parameters.on_send = parameters.on_send; if ((r = strdup2(parameters.token, &conn->parameters.token)) || (r = strdup2(parameters.host, &conn->parameters.host)) || (r = strdup2(parameters.id, &conn->parameters.id))) { conn_free(conn); return r; } if ((r = alloc(NULL, 5 /* ws:// */ + strlen(parameters.host) + 1 /* : */ + /* Without the () the result seems to be strange. */ (parameters.port ? ((int) floor(log10(parameters.port)) + 1) : 1) + 3 /* /ws */ + 4 /* ?id= */ + strlen(parameters.id) + 7 /* &token = */ + strlen(parameters.token) + 9 /* &version= */ + /* Without the () the result seems to be strange. */ (parameters.version ? ((int) floor(log10(parameters.version)) + 1) : 1) + 1 /* NULL */, sizeof(char), false, (void **) &conn->url))) { conn_free(conn); return r; } sprintf(conn->url, "ws://%s:%u/ws?id=%s&token=%s&version=%u", parameters.host, parameters.port, parameters.id, parameters.token, parameters.version); static uint8_t rx[1000]; struct wic_inst *inst = &conn->inst; struct wic_init_arg arg = {0}; arg.rx = rx; arg.rx_max = sizeof(rx); arg.on_send = on_send_handler; arg.on_message = on_message_handler; arg.on_buffer = on_buffer_handler; arg.app = conn; arg.url = conn->url; arg.role = WIC_ROLE_CLIENT; if (!wic_init(inst, &arg)) { LOGE("wic_init"); conn_free(conn); return AC_E_INTERNAL; } if (wic_start(inst) != WIC_STATUS_SUCCESS) { LOGE("Cannot start the WIC client."); conn_free(conn); return AC_E_NET; } *out = conn; return AC_E_OK; } int ac_disconnect(void *connection, bool force) { AC_CHECK_INIT; struct ac_connection *conn = connection; LOGD("Disconnecting..."); if (force) { wic_close_with_reason(&conn->inst, WIC_CLOSE_ABNORMAL_1, NULL, 0U); } else { wic_close_with_reason(&conn->inst, WIC_CLOSE_NORMAL, NULL, 0U); } conn_free(conn); return AC_E_OK; } int ac_receive(void *connection, const void *buffer, size_t pos, const size_t len, ac_obj_t **response, size_t *len_read) { AC_CHECK_INIT; LOGDV("ac_receive(buffer = %p, pos = %u, len = %u)", buffer, pos, len); struct ac_connection *conn = connection; struct wic_inst *inst = &conn->inst; const uint8_t *ptr = buffer; size_t retval; struct ac_result *res = &conn->result; bool blocked = false; /* In case the deserializer does not run at all. */ memset(res, 0, sizeof(struct ac_result)); if (len) { size_t pos_starting = pos; for (; pos < len; pos += retval) { LOGDV("wic_parse[%lu]", pos); retval = wic_parse(inst, &ptr[pos], len - pos); if (wic_get_state(inst) == WIC_STATE_CLOSED) { LOGE("Connection closed."); return AC_E_NET; } if (!retval) { /* Blocked. Ask the client to call again. */ LOGD("Blocked."); blocked = true; break; } } *len_read = pos - pos_starting; } else { retval = wic_parse(inst, &ptr[pos], len - pos); if (wic_get_state(inst) == WIC_STATE_CLOSED) { LOGE("Connection closed."); return AC_E_NET; } /* retval is always 0. We should rely on res->blocked to see if * there are more than one messages blocked (unlikely). */ } if (!blocked) { /* In case we reached pos >= len but there is still a frame * blocked. Calling wic_parse(NULL, 0) won't help because * it will return 0 even if nothing is blocking. * Thus we use the custom 'blocked' field in res to check * if there is still a blocked message. Then, call wic_parse * again with NULL and 0 to parse it. * The assumption is that there is only one possible blocked * message. */ if (res->blocked) blocked = true; } LOGDV("Done parsing with %u / %u bytes read.", pos, len); LOGDV("res { has_result = %d, res = %d, obj = %p }", res->has_result, res->res, res->obj); *response = NULL; if (res->has_result) { if (res->res) { return res->res; } else { *response = res->obj; return blocked ? AC_E_AGAIN : AC_E_OK; } } else { *response = NULL; } return blocked ? AC_E_AGAIN : AC_E_OK; } int ac_request(void *connection, const ac_request_t *request) { AC_CHECK_INIT; struct ac_connection *conn = connection; struct wic_inst *inst = &conn->inst; if (wic_get_state(inst) != WIC_STATE_OPEN) { LOGE("Invalid state."); return AC_E_INVALID_REQUEST; } int r; json_object *obj; if ((r = serialize_request(request, &obj))) { return r; } const char *str; size_t len; if (!(str = json_object_to_json_string_length(obj, JSON_C_TO_STRING_PLAIN, &len))) { json_object_put(obj); LOGE("Cannot serialize JSON."); return AC_E_INTERNAL; } /** * Access: * * inst->role (ro, safe) * * inst->state (rw, unsafe) * * inst->utf8_tx (rw, safe) * * inst->frag (rw, safe) * * inst->on_buffer (ro, unsafe) * * inst->on_sand (ro, unsafe) * * conn->fd (ro, unsafe) * * write(ro, unsafe) */ if (wic_send_text(inst, true, str, len) != WIC_STATUS_SUCCESS) { LOGE("Cannot send."); json_object_put(obj); return AC_E_NET; } json_object_put(obj); return AC_E_OK; } int ac_get_state(void *connection, enum ac_connection_state *out) { AC_CHECK_INIT; struct ac_connection *conn = connection; switch (wic_get_state(&conn->inst)) { case WIC_STATE_CLOSED: { *out = AC_STATE_CLOSED; break; } case WIC_STATE_PARSE_HANDSHAKE: case WIC_STATE_INIT: { *out = AC_STATE_INIT; break; } case WIC_STATE_OPEN: case WIC_STATE_READY: { *out = AC_STATE_READY; break; } default: { *out = AC_STATE_INIT; break; } } return AC_E_OK; }