diff options
Diffstat (limited to 'client/acronc/handler_socket.c')
-rw-r--r-- | client/acronc/handler_socket.c | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/client/acronc/handler_socket.c b/client/acronc/handler_socket.c new file mode 100644 index 0000000..ff8eefb --- /dev/null +++ b/client/acronc/handler_socket.c @@ -0,0 +1,255 @@ +/* + * Created by yuuta on 7/24/22. + */ + +#include "handler.h" +#include "log.h" +#include "helpers.h" + +#include <uv.h> +#include <stdlib.h> +#include <string.h> + +static void (*cb_conn)(bool); + +static int (*cb_ready)(void); + +static int (*cb_recv)(ac_obj_t *obj); + +static void (*cb_close)(void); + +static ac_connection_parameters_t *params; + +static uv_tcp_t sock; +static uv_write_t write; + +static uv_connect_t conn; + +static bool ready = false; +static void *ac_conn = NULL; + +#define RUNNING !uv_is_closing((uv_handle_t *) &sock) + +static void on_close(uv_handle_t *handle) { + LOGDV("on_close(handle = %p)", + handle); + if (handle->data) + cb_close(); +} + +static void ex2(bool force, bool cb) { + LOGDV("Exiting socket handlers: Force WebSocket close: %s; Socket still running: %s.", + force ? "true" : "false", + RUNNING ? "true" : "false"); + if (ac_conn) { + ac_disconnect(ac_conn, + RUNNING ? force : true /* If sock is not running, always force */); + ac_conn = NULL; + } + if (RUNNING) { + sock.data = cb ? &ex2 /* Any non-NULL value */ : NULL; + uv_close((uv_handle_t *) &sock, on_close); + } else { + if (cb) { + cb_close(); + } + } +} + +static void ex(bool force) { + ex2(force, true); +} + +int sock_ext(bool trigger_callback) { + ex2(false, trigger_callback); + return 0; +} + +static void on_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) { + LOGDV("on_alloc(handle = %p): size = %lu", + handle, + size); + void *b = malloc(size); + if (!b) { + LOGEV("Cannot allocate memory of %u bytes: %s.", + size, + strerror(errno)); + /* Socket is still working now. */ + ex(false); + *buf = uv_buf_init(NULL, 0); /* Just in case? */ + return; + } + *buf = uv_buf_init(b, size); +} + +static void on_read(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) { + LOGDV("on_read(stream = %p): nread = %ld, buf = %p, buf->base = %p, buf->len = %lu", + tcp, + nread, + buf, buf->base, buf->len); + if (!nread) { + /* EAGAIN */ + if (buf->base) free(buf->base); + return; + } + if (nread == UV_EOF) { + LOGW("Received EOF from server."); + free(buf->base); + /* The socket * should * be closed already? */ + ex(true); + return; + } + if (nread < 0) { + LOGEV("Encountered a failure while reading from the socket: ", uv_strerror(nread)); + if (buf->base) free(buf->base); + /* Docs: The callee is responsible for stopping/closing the stream when an error happens */ + ex(true); /* Force close libac connection here. */ + return; + } + int r; + ac_obj_t *obj = NULL; + if ((r = ac_receive(ac_conn, buf->base, nread, &obj))) { + LOGEV("Cannot parse the response (%d).", r); + /* libac error. Socket is working. */ + ex(false); + return; + } + + if (!ready) { + enum ac_connection_state state; + if ((r = ac_get_state(ac_conn, &state))) { + LOGEV("Cannot get state (%d).", r); + /* libac error. Socket is working. */ + ex(false); + return; + } + if (state == AC_STATE_READY) { + ready = true; + if ((cb_ready())) { + /* acronc error. Socket is working. */ + ex(false); + return; + } + } + } + if (obj) { + LOGDV("Got object: %p", obj); + /* uv_async_send is unreliable, and missed messages will cause memory leak. */ + if (cb_recv(obj)) { + /* acronc error. Socket is working. */ + ex(false); + } + } + + free(buf->base); +} + +static void on_write(uv_write_t *req, int status) { + LOGDV("on_write(req = %p): %d", + req, + status); + if (status) { + LOGEV("Cannot write to socket: %s", uv_strerror(status)); + /* Socket may be closed? Anyway writing again will definitely be problematic, but closing may not. */ + ex(true); + } +} + +static int on_send(const void *s, + const void *buf, + const size_t len) { + LOGDV("on_send(s = %p): len = %u", + s, + len); + const uv_stream_t *stream = s; + uv_buf_t buffer[] = { + {.base = (char *) buf, .len = len} + }; + int r; + if ((r = uv_write(&write, (uv_stream_t *) stream, buffer, 1, on_write))) { + LOGEV("Cannot write to socket: %s", uv_strerror(r)); + /* Socket may be closed? Anyway writing again will definitely be problematic, but closing may not. */ + ex(true); + } + return 0; +} + +static void on_retry_closed(uv_handle_t *handle) { + cb_conn(false); +} + +static void on_connect(uv_connect_t *req, int status) { + LOGDV("on_connect(req = %p): status = %d", + req, + status); + if (status) { + LOGEV("Cannot connect to the server: %s", uv_strerror(status)); + /* Close it for the next retry. */ + if (RUNNING) { + LOGD("Closing socket before retry."); + uv_close((uv_handle_t *) &sock, on_retry_closed); + } else { + cb_conn(false); + } + return; + } + LOGI("Connected."); + uv_stream_t *stream = req->handle; + int r; + params->sock = stream; + if ((r = ac_connect(*params, &ac_conn))) { + LOGEV("Cannot initialize connection: %d.", r); + cb_conn(false); + return; + } + if ((r = uv_read_start(stream, on_alloc, on_read))) { + LOGEV("Cannot read socket: %s", uv_strerror(r)); + cb_conn(false); + return; + } + cb_conn(true); +} + +int h_socket(ac_connection_parameters_t *p, + const struct addrinfo *ai, + void (*on_connect_result)(bool), + int (*on_ready)(void), + int (*on_received)(ac_obj_t *obj), + void (*on_closed)(void)) { + cb_conn = on_connect_result; + cb_ready = on_ready; + cb_recv = on_received; + cb_close = on_closed; + params = p; + params->on_send = on_send; + + struct sockaddr *sa = ai->ai_addr; + LOGIV("Connecting to %s...", ntop(sa)); + int r; + if ((r = uv_tcp_init(loop, &sock))) { + LOGEV("Cannot initialize the socket: %s", uv_strerror(r)); + return r; + } + if ((r = uv_tcp_connect(&conn, &sock, sa, on_connect))) { + LOGEV("Cannot initialize the connection: %s", uv_strerror(r)); + return r; + } + return 0; +} + +int sock_request(ac_request_t *req) { + if (!ac_conn) { + /* Just in case. */ + LOGE("sock_request() called on a closed socket."); + /* Ignore it. */ + return 0; + } + int r = ac_request(ac_conn, req); + if (r) { + LOGEV("Cannot send request: %d.", r); + /* libac error. Socket is working. */ + ex(false); + } + return r; +} + |