aboutsummaryrefslogtreecommitdiff
path: root/client/acronc/handler_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'client/acronc/handler_socket.c')
-rw-r--r--client/acronc/handler_socket.c255
1 files changed, 255 insertions, 0 deletions
diff --git a/client/acronc/handler_socket.c b/client/acronc/handler_socket.c
new file mode 100644
index 0000000..ff8eefb
--- /dev/null
+++ b/client/acronc/handler_socket.c
@@ -0,0 +1,255 @@
+/*
+ * Created by yuuta on 7/24/22.
+ */
+
+#include "handler.h"
+#include "log.h"
+#include "helpers.h"
+
+#include <uv.h>
+#include <stdlib.h>
+#include <string.h>
+
+static void (*cb_conn)(bool);
+
+static int (*cb_ready)(void);
+
+static int (*cb_recv)(ac_obj_t *obj);
+
+static void (*cb_close)(void);
+
+static ac_connection_parameters_t *params;
+
+static uv_tcp_t sock;
+static uv_write_t write;
+
+static uv_connect_t conn;
+
+static bool ready = false;
+static void *ac_conn = NULL;
+
+#define RUNNING !uv_is_closing((uv_handle_t *) &sock)
+
+static void on_close(uv_handle_t *handle) {
+ LOGDV("on_close(handle = %p)",
+ handle);
+ if (handle->data)
+ cb_close();
+}
+
+static void ex2(bool force, bool cb) {
+ LOGDV("Exiting socket handlers: Force WebSocket close: %s; Socket still running: %s.",
+ force ? "true" : "false",
+ RUNNING ? "true" : "false");
+ if (ac_conn) {
+ ac_disconnect(ac_conn,
+ RUNNING ? force : true /* If sock is not running, always force */);
+ ac_conn = NULL;
+ }
+ if (RUNNING) {
+ sock.data = cb ? &ex2 /* Any non-NULL value */ : NULL;
+ uv_close((uv_handle_t *) &sock, on_close);
+ } else {
+ if (cb) {
+ cb_close();
+ }
+ }
+}
+
+static void ex(bool force) {
+ ex2(force, true);
+}
+
+int sock_ext(bool trigger_callback) {
+ ex2(false, trigger_callback);
+ return 0;
+}
+
+static void on_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
+ LOGDV("on_alloc(handle = %p): size = %lu",
+ handle,
+ size);
+ void *b = malloc(size);
+ if (!b) {
+ LOGEV("Cannot allocate memory of %u bytes: %s.",
+ size,
+ strerror(errno));
+ /* Socket is still working now. */
+ ex(false);
+ *buf = uv_buf_init(NULL, 0); /* Just in case? */
+ return;
+ }
+ *buf = uv_buf_init(b, size);
+}
+
+static void on_read(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) {
+ LOGDV("on_read(stream = %p): nread = %ld, buf = %p, buf->base = %p, buf->len = %lu",
+ tcp,
+ nread,
+ buf, buf->base, buf->len);
+ if (!nread) {
+ /* EAGAIN */
+ if (buf->base) free(buf->base);
+ return;
+ }
+ if (nread == UV_EOF) {
+ LOGW("Received EOF from server.");
+ free(buf->base);
+ /* The socket * should * be closed already? */
+ ex(true);
+ return;
+ }
+ if (nread < 0) {
+ LOGEV("Encountered a failure while reading from the socket: ", uv_strerror(nread));
+ if (buf->base) free(buf->base);
+ /* Docs: The callee is responsible for stopping/closing the stream when an error happens */
+ ex(true); /* Force close libac connection here. */
+ return;
+ }
+ int r;
+ ac_obj_t *obj = NULL;
+ if ((r = ac_receive(ac_conn, buf->base, nread, &obj))) {
+ LOGEV("Cannot parse the response (%d).", r);
+ /* libac error. Socket is working. */
+ ex(false);
+ return;
+ }
+
+ if (!ready) {
+ enum ac_connection_state state;
+ if ((r = ac_get_state(ac_conn, &state))) {
+ LOGEV("Cannot get state (%d).", r);
+ /* libac error. Socket is working. */
+ ex(false);
+ return;
+ }
+ if (state == AC_STATE_READY) {
+ ready = true;
+ if ((cb_ready())) {
+ /* acronc error. Socket is working. */
+ ex(false);
+ return;
+ }
+ }
+ }
+ if (obj) {
+ LOGDV("Got object: %p", obj);
+ /* uv_async_send is unreliable, and missed messages will cause memory leak. */
+ if (cb_recv(obj)) {
+ /* acronc error. Socket is working. */
+ ex(false);
+ }
+ }
+
+ free(buf->base);
+}
+
+static void on_write(uv_write_t *req, int status) {
+ LOGDV("on_write(req = %p): %d",
+ req,
+ status);
+ if (status) {
+ LOGEV("Cannot write to socket: %s", uv_strerror(status));
+ /* Socket may be closed? Anyway writing again will definitely be problematic, but closing may not. */
+ ex(true);
+ }
+}
+
+static int on_send(const void *s,
+ const void *buf,
+ const size_t len) {
+ LOGDV("on_send(s = %p): len = %u",
+ s,
+ len);
+ const uv_stream_t *stream = s;
+ uv_buf_t buffer[] = {
+ {.base = (char *) buf, .len = len}
+ };
+ int r;
+ if ((r = uv_write(&write, (uv_stream_t *) stream, buffer, 1, on_write))) {
+ LOGEV("Cannot write to socket: %s", uv_strerror(r));
+ /* Socket may be closed? Anyway writing again will definitely be problematic, but closing may not. */
+ ex(true);
+ }
+ return 0;
+}
+
+static void on_retry_closed(uv_handle_t *handle) {
+ cb_conn(false);
+}
+
+static void on_connect(uv_connect_t *req, int status) {
+ LOGDV("on_connect(req = %p): status = %d",
+ req,
+ status);
+ if (status) {
+ LOGEV("Cannot connect to the server: %s", uv_strerror(status));
+ /* Close it for the next retry. */
+ if (RUNNING) {
+ LOGD("Closing socket before retry.");
+ uv_close((uv_handle_t *) &sock, on_retry_closed);
+ } else {
+ cb_conn(false);
+ }
+ return;
+ }
+ LOGI("Connected.");
+ uv_stream_t *stream = req->handle;
+ int r;
+ params->sock = stream;
+ if ((r = ac_connect(*params, &ac_conn))) {
+ LOGEV("Cannot initialize connection: %d.", r);
+ cb_conn(false);
+ return;
+ }
+ if ((r = uv_read_start(stream, on_alloc, on_read))) {
+ LOGEV("Cannot read socket: %s", uv_strerror(r));
+ cb_conn(false);
+ return;
+ }
+ cb_conn(true);
+}
+
+int h_socket(ac_connection_parameters_t *p,
+ const struct addrinfo *ai,
+ void (*on_connect_result)(bool),
+ int (*on_ready)(void),
+ int (*on_received)(ac_obj_t *obj),
+ void (*on_closed)(void)) {
+ cb_conn = on_connect_result;
+ cb_ready = on_ready;
+ cb_recv = on_received;
+ cb_close = on_closed;
+ params = p;
+ params->on_send = on_send;
+
+ struct sockaddr *sa = ai->ai_addr;
+ LOGIV("Connecting to %s...", ntop(sa));
+ int r;
+ if ((r = uv_tcp_init(loop, &sock))) {
+ LOGEV("Cannot initialize the socket: %s", uv_strerror(r));
+ return r;
+ }
+ if ((r = uv_tcp_connect(&conn, &sock, sa, on_connect))) {
+ LOGEV("Cannot initialize the connection: %s", uv_strerror(r));
+ return r;
+ }
+ return 0;
+}
+
+int sock_request(ac_request_t *req) {
+ if (!ac_conn) {
+ /* Just in case. */
+ LOGE("sock_request() called on a closed socket.");
+ /* Ignore it. */
+ return 0;
+ }
+ int r = ac_request(ac_conn, req);
+ if (r) {
+ LOGEV("Cannot send request: %d.", r);
+ /* libac error. Socket is working. */
+ ex(false);
+ }
+ return r;
+}
+