From b4afa06e383325f4a0c751a64ca896d769db07a8 Mon Sep 17 00:00:00 2001 From: Trumeet Date: Wed, 20 Jul 2022 18:12:22 -0700 Subject: libac: First Commit --- client/libacron/net.c | 306 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 306 insertions(+) create mode 100644 client/libacron/net.c (limited to 'client/libacron/net.c') diff --git a/client/libacron/net.c b/client/libacron/net.c new file mode 100644 index 0000000..51a99af --- /dev/null +++ b/client/libacron/net.c @@ -0,0 +1,306 @@ +/* + * Created by yuuta on 7/13/22. + */ + +#include "config.h" +#include "net.h" +#include "connection.h" +#include "log.h" +#include "helpers.h" +#include "serializer.h" + +/* TODO: wic logging is not yet supported */ +#include "wic.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +static void on_open_handler(struct wic_inst *inst) { + struct ac_connection *conn = wic_get_app(inst); + conn->established = true; +} + +static void on_send_handler(struct wic_inst *inst, + const void *data, + size_t size, + enum wic_buffer type) { + struct ac_connection *conn = wic_get_app(inst); + size_t pos; + ssize_t retval; + + for (pos = 0U; pos < size; pos += retval) { + size_t n = size - pos; + LOGDV("write(%d, %p[%u, %u])", conn->fd, data, pos, n); + retval = write(conn->fd, &data[pos], (int) n); + if (retval <= 0) { + /* There's no way to abort the process. */ + int e = errno; + LOGEV("Cannot write to socket: %s (%d).", + strerror2(errno), + e); + break; + } + } + free((void *) data); +} + +static void *on_buffer_handler(struct wic_inst *inst, + size_t min_size, + enum wic_buffer type, + size_t *max_size) { + void *buf; + if ((alloc(NULL, + 1, + *max_size = (min_size ? min_size : 1024), + false, + &buf))) { + return NULL; + } + return buf; +} + +static bool on_message_handler(struct wic_inst *inst, + enum wic_encoding encoding, + bool fin, + const char *data, + uint16_t size) { + struct ac_connection *conn = wic_get_app(inst); + if (encoding != WIC_ENCODING_UTF8) { + LOGE("Invalid encoding received from server. Only text frames are supported."); + return true; + } + /* TODO: Only parse when fin = true? */ + struct ac_result *res = &conn->result; + int r; + if ((r = deserialize(data, size, &res->obj))) { + if (r == AC_E_SERIALIZER_CONTINUE) { + if (fin) { + serializer_finalize(); + LOGE("Server returned an incomplete response."); + res->res = AC_E_INVALID_RESPONSE; + } else { + res->res = AC_E_OK; + } + } else { + res->res = r; + } + } else { + res->has_result = true; + res->res = r; + } + LOGDV("Post deserializing: { has_result = %d, res = %d, obj = %p }", + res->has_result, + res->res, + res->obj); + + return true; +} + +int ac_connect(ac_connection_parameters_t parameters, void **out) { + AC_CHECK_INIT; + struct ac_connection *conn; + int r; + if ((r = SALLOC(struct ac_connection, &conn))) { + return r; + } + int s; + static uint8_t rx[1000]; + struct wic_inst *inst = &conn->inst; + struct wic_init_arg arg = {0}; + + arg.rx = rx; + arg.rx_max = sizeof(rx); + + arg.on_send = on_send_handler; + arg.on_open = on_open_handler; + arg.on_message = on_message_handler; + arg.on_buffer = on_buffer_handler; + + arg.app = &s; + arg.url = parameters.url; + arg.role = WIC_ROLE_CLIENT; + + if (!wic_init(inst, &arg)) { + LOGE("wic_init"); + free(conn); + return AC_E_INTERNAL; + } + inst->app = conn; + + struct addrinfo *res; + char service[6]; + + const enum wic_schema schema = wic_get_url_schema(inst); + const uint16_t port = wic_get_url_port(inst); + snprintf(service, 6, "%u", port); + const char *host = wic_get_url_hostname(inst); + + switch (schema) { + case WIC_SCHEMA_WS: + break; + case WIC_SCHEMA_WSS: + LOGE("WSS is not supported yet."); + free(conn); + return AC_E_INVALID_REQUEST; + default: + LOGE("Unsupported protocol. The URL must be ws://"); + free(conn); + return AC_E_INVALID_REQUEST; + } + + if ((r = getaddrinfo(host, service, NULL, &res))) { + LOGEV("Resolve host: %s.", gai_strerror(r)); + r = AC_E_NET; + free(conn); + return r; + } + + int fd; + if ((fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) <= 0) { + const int e = errno; + LOGEV("Cannot create socket: %s (%d).", + strerror2(e), + e); + freeaddrinfo(res); + free(conn); + return AC_E_NET; + } + if (connect(fd, res->ai_addr, res->ai_addrlen) < 0) { + const int e = errno; + LOGEV("Cannot connect to socket: %s (%d).", + strerror2(e), + e); + close(fd); + freeaddrinfo(res); + free(conn); + return AC_E_NET; + } + freeaddrinfo(res); + + conn->fd = fd; + if (wic_start(inst) != WIC_STATUS_SUCCESS) { + LOGE("Cannot start the WIC client."); + close(fd); + free(conn); + return AC_E_NET; + } + + /* Wait until established (ready to send). */ + while (!conn->established) { + ac_obj_t *obj = NULL; + r = ac_receive(conn, &obj); + if (obj) { + LOGW("Received an object before connection is established. Dropping."); + ac_object_free(obj); + } + if (r) { + ac_disconnect(conn); + return r; + } + } + *out = conn; + return AC_E_OK; +} + +int ac_disconnect(void *connection) { + AC_CHECK_INIT; + struct ac_connection *conn = connection; + if (!conn->fd) { + LOGE("Trying to disconnect an already disconnected connection."); + return AC_E_INVALID_REQUEST; + } + LOGD("Disconnecting..."); + wic_close(&conn->inst); + close(conn->fd); + free(conn); + return AC_E_OK; +} + +int ac_receive(void *connection, ac_obj_t **response) { + AC_CHECK_INIT; + struct ac_connection *conn = connection; + struct wic_inst *inst = &conn->inst; + + static uint8_t buffer[1000U]; + ssize_t bytes; + size_t retval, pos; + + if ((bytes = recv(conn->fd, buffer, sizeof(buffer), 0)) <= 0) { + LOGDV("recv(%d) = %d", conn->fd, bytes); + if (bytes < 0) { + const int e = errno; + LOGEV("Failed to receive from socket : %s (%d).", + strerror2(e), + e); + } else { + LOGI("Peer abnormally shutdown."); + } + wic_close_with_reason(inst, WIC_CLOSE_ABNORMAL_2, NULL, 0); + return AC_E_NET; + } + LOGDV("recv(%d) = %d", conn->fd, bytes); + + /* In case the deserializer does not run at all. */ + memset(&conn->result, 0, sizeof(struct ac_result)); + for (pos = 0U; pos < bytes; pos += retval) { + retval = wic_parse(inst, &buffer[pos], bytes - pos); + if (wic_get_state(inst) == WIC_STATE_CLOSED) { + LOGE("Connection closed."); + return AC_E_NET; + } + } + + struct ac_result *res = &conn->result; + LOGDV("res { has_result = %d, res = %d, obj = %p }", + res->has_result, + res->res, + res->obj); + *response = NULL; + if (res->has_result) { + if (res->res) { + return res->res; + } else { + *response = res->obj; + return AC_E_OK; + } + } else { + *response = NULL; + } + + return AC_E_OK; +} + +int ac_request(void *connection, const ac_request_t *request) { + AC_CHECK_INIT; + struct ac_connection *conn = connection; + struct wic_inst *inst = &conn->inst; + if (wic_get_state(inst) != WIC_STATE_OPEN) { + LOGE("Invalid state."); + return AC_E_INVALID_REQUEST; + } + int r; + json_object *obj; + if ((r = serialize_request(request, &obj))) { + return r; + } + const char *str; + size_t len; + if (!(str = json_object_to_json_string_length(obj, JSON_C_TO_STRING_PLAIN, &len))) { + json_object_put(obj); + LOGE("Cannot serialize JSON."); + return AC_E_INTERNAL; + } + if (wic_send_text(inst, true, str, len) != WIC_STATUS_SUCCESS) { + LOGE("Cannot send."); + json_object_put(obj); + return AC_E_NET; + } + json_object_put(obj); + return AC_E_OK; +} -- cgit v1.2.3