/* * Created by yuuta on 7/24/22. */ #include "handler.h" #include "log.h" #include "helpers.h" #include #include #include static void (*cb_conn)(bool); static int (*cb_ready)(void); static int (*cb_recv)(ac_obj_t *obj); static void (*cb_close)(void); static ac_connection_parameters_t *params; static uv_tcp_t sock; static uv_write_t write; static uv_connect_t conn; static bool ready = false; static void *ac_conn = NULL; #define RUNNING !uv_is_closing((uv_handle_t *) &sock) static void on_close(uv_handle_t *handle) { LOGDV("on_close(handle = %p)", handle); if (handle->data) cb_close(); } static void ex2(bool force, bool cb) { LOGDV("Exiting socket handlers: Force WebSocket close: %s; Socket still running: %s.", force ? "true" : "false", RUNNING ? "true" : "false"); if (ac_conn) { /* We do not want to read anything after. * We will try to read as much as we can until an error or cmd result is received. * Although it is possible that the server sends anything else with the same ID after, * we will discard it. * (The above applies to ad-hoc executions of acronc, e.g. echo cmd | acronc, where * an EOF immediately follows the command.) * If we do not wait, on_read will come AFTER ac_disconnect, thus calling ac_receive * on a dangling pointer. * If we simply stop reading the socket upon ac_disconnect, everything will be lost, including * the command outputs, which is undesirable for ad-hoc runs. * Therefore, the approach is to pause stdin until we believe that it is unlikely for any new * responses to come, then resume stdin, and exit. */ LOGD("Stopping socket input."); uv_read_stop((uv_stream_t *) &sock); ac_disconnect(ac_conn, RUNNING ? force : true /* If sock is not running, always force */); ac_conn = NULL; } if (RUNNING) { sock.data = cb ? &ex2 /* Any non-NULL value */ : NULL; uv_close((uv_handle_t *) &sock, on_close); } else { if (cb) { cb_close(); } } } static void ex(bool force) { ex2(force, true); } int sock_ext(bool trigger_callback) { ex2(false, trigger_callback); return 0; } static void on_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) { LOGDV("on_alloc(handle = %p): size = %lu", handle, size); void *b = malloc(size); if (!b) { LOGEV("Cannot allocate memory of %u bytes: %s.", size, strerror(errno)); /* Socket is still working now. */ ex(false); *buf = uv_buf_init(NULL, 0); /* Just in case? */ return; } *buf = uv_buf_init(b, size); } static void on_read(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) { LOGDV("on_read(stream = %p): nread = %ld, buf = %p, buf->base = %p, buf->len = %lu", tcp, nread, buf, buf->base, buf->len); if (!nread) { /* EAGAIN */ if (buf->base) free(buf->base); return; } if (nread == UV_EOF) { LOGW("Received EOF from server."); free(buf->base); /* The socket * should * be closed already? */ ex(true); return; } if (nread < 0) { LOGEV("Encountered a failure while reading from the socket: ", uv_strerror(nread)); if (buf->base) free(buf->base); /* Docs: The callee is responsible for stopping/closing the stream when an error happens */ ex(true); /* Force close libac connection here. */ return; } if (!ac_conn) { LOGE("on_read called after ac_disconnect!"); } int r; ac_obj_t *obj = NULL; size_t pos = 0; size_t read = 0; bool again = false; while (read < nread || again) { if (again) { LOGD("Clearing backlog."); r = ac_receive(ac_conn, NULL, 0, 0, &obj, NULL); again = false; } else { r = ac_receive(ac_conn, buf->base, pos, nread, &obj, &read); pos += read; } if (r == AC_E_AGAIN) { again = true; } else if (r) { LOGEV("Cannot parse the response (%d).", r); /* libac error. Socket is working. */ ex(false); free(buf->base); return; } if (!ready) { enum ac_connection_state state; if ((r = ac_get_state(ac_conn, &state))) { LOGEV("Cannot get state (%d).", r); /* libac error. Socket is working. */ free(buf->base); ex(false); return; } if (state == AC_STATE_READY) { ready = true; if ((cb_ready())) { /* acronc error. Socket is working. */ free(buf->base); ex(false); return; } } } if (obj) { LOGDV("Got object: %p", obj); /* uv_async_send is unreliable, and missed messages will cause memory leak. */ if (cb_recv(obj)) { /* acronc error. Socket is working. */ ex(false); free(buf->base); return; } } } free(buf->base); } static void on_write(uv_write_t *req, int status) { LOGDV("on_write(req = %p): %d", req, status); if (status) { LOGEV("Cannot write to socket: %s", uv_strerror(status)); /* Socket may be closed? Anyway writing again will definitely be problematic, but closing may not. */ ex(true); } } static int on_send(const void *s, const void *buf, const size_t len) { LOGDV("on_send(s = %p): len = %u", s, len); const uv_stream_t *stream = s; uv_buf_t buffer[] = { {.base = (char *) buf, .len = len} }; int r; if ((r = uv_write(&write, (uv_stream_t *) stream, buffer, 1, on_write))) { LOGEV("Cannot write to socket: %s", uv_strerror(r)); /* Socket may be closed? Anyway writing again will definitely be problematic, but closing may not. */ ex(true); } return 0; } static void on_retry_closed(uv_handle_t *handle) { cb_conn(false); } static void on_connect(uv_connect_t *req, int status) { LOGDV("on_connect(req = %p): status = %d", req, status); if (status) { LOGEV("Cannot connect to the server: %s", uv_strerror(status)); /* Close it for the next retry. */ if (RUNNING) { LOGD("Closing socket before retry."); uv_close((uv_handle_t *) &sock, on_retry_closed); } else { cb_conn(false); } return; } LOGI("Connected."); uv_stream_t *stream = req->handle; int r; params->sock = stream; if ((r = ac_connect(*params, &ac_conn))) { LOGEV("Cannot initialize connection: %d.", r); cb_conn(false); return; } if ((r = uv_read_start(stream, on_alloc, on_read))) { LOGEV("Cannot read socket: %s", uv_strerror(r)); cb_conn(false); return; } cb_conn(true); } int h_socket(ac_connection_parameters_t *p, const struct addrinfo *ai, void (*on_connect_result)(bool), int (*on_ready)(void), int (*on_received)(ac_obj_t *obj), void (*on_closed)(void)) { cb_conn = on_connect_result; cb_ready = on_ready; cb_recv = on_received; cb_close = on_closed; params = p; params->on_send = on_send; struct sockaddr *sa = ai->ai_addr; LOGIV("Connecting to %s...", ntop(sa)); int r; if ((r = uv_tcp_init(loop, &sock))) { LOGEV("Cannot initialize the socket: %s", uv_strerror(r)); return r; } if ((r = uv_tcp_connect(&conn, &sock, sa, on_connect))) { LOGEV("Cannot initialize the connection: %s", uv_strerror(r)); return r; } return 0; } int sock_request(ac_request_t *req) { if (!ac_conn) { /* Just in case. */ LOGE("sock_request() called on a closed socket."); /* Ignore it. */ return 0; } int r = ac_request(ac_conn, req); if (r) { LOGEV("Cannot send request: %d.", r); /* libac error. Socket is working. */ ex(false); } return r; }