From 83d5b5431a1571147e5188fb3ad4254660ef86ae Mon Sep 17 00:00:00 2001 From: Trumeet Date: Sun, 4 Apr 2021 13:50:09 -0700 Subject: feat(agent): persistent configuration --- .../java/moe/yuuta/dn42peering/agent/Deploy.java | 51 ++++++++++++++ .../java/moe/yuuta/dn42peering/agent/Main.java | 2 +- .../moe/yuuta/dn42peering/agent/Persistent.java | 77 ++++++++++++++++++++++ .../dn42peering/agent/grpc/AgentServiceImpl.java | 42 ++---------- .../yuuta/dn42peering/agent/grpc/RPCVerticle.java | 28 ++++++-- docs/agent/Configuration.md | 19 +++++- 6 files changed, 174 insertions(+), 45 deletions(-) create mode 100644 agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java create mode 100644 agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java new file mode 100644 index 0000000..94f614b --- /dev/null +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java @@ -0,0 +1,51 @@ +package moe.yuuta.dn42peering.agent; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import moe.yuuta.dn42peering.agent.proto.DeployResult; +import moe.yuuta.dn42peering.agent.proto.NodeConfig; +import moe.yuuta.dn42peering.agent.provision.BGPProvisioner; +import moe.yuuta.dn42peering.agent.provision.Change; +import moe.yuuta.dn42peering.agent.provision.WireGuardCleanupProvisioner; +import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner; + +import javax.annotation.Nonnull; +import java.util.List; + +public class Deploy { + private static final Logger logger = LoggerFactory.getLogger(Deploy.class.getSimpleName()); + private static Future chainChanges(@Nonnull Vertx vertx, @Nonnull List changes) { + if(changes.isEmpty()) { + return Future.succeededFuture(); + } + Future last = changes.get(0).execute(vertx); + for (int i = 1; i < changes.size(); i ++) { + final Change current = changes.get(i); + last = last.compose(_v -> current.execute(vertx)); + } + return last; + } + + public static Future deploy(@Nonnull Vertx vertx, @Nonnull NodeConfig config) { + logger.info("Deployment started"); + final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx); + final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx); + final WireGuardCleanupProvisioner wireGuardCleanupProvisioner = new WireGuardCleanupProvisioner(vertx); + + // TODO: Currently all provisioning operations are non-fault-tolering. This means that + // TODO: if one operation fails, the following will fail. This may be changed in later. + // Changes in each provisioners are executed in sequence. + // Two provisioners are executed in sequence. + return wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()) + .compose(changes -> chainChanges(vertx, changes)) + .compose(_v -> bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()) + .compose(changes -> chainChanges(vertx, changes))) + .compose(_v -> wireGuardCleanupProvisioner.calculateChanges(config.getNode(), config.getWgsList()) + .compose(changes -> chainChanges(vertx, changes))) + .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above.")) + .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err)) + .compose(compositeFuture -> Future.succeededFuture(null)); + } +} diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java index fb9eead..9efca46 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java @@ -29,7 +29,7 @@ public class Main { final Vertx vertx = Vertx.vertx(new VertxOptions()); final DeploymentOptions options = new DeploymentOptions() .setConfig(config) - .setInstances(Runtime.getRuntime().availableProcessors() * 2); + .setInstances(1); Logger logger = LoggerFactory.getLogger("Main"); CompositeFuture.all(Arrays.asList( Future.future(f -> vertx.deployVerticle(RPCVerticle.class.getName(), options, f)) diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java new file mode 100644 index 0000000..0f29da4 --- /dev/null +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java @@ -0,0 +1,77 @@ +package moe.yuuta.dn42peering.agent; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import moe.yuuta.dn42peering.agent.proto.DeployResult; +import moe.yuuta.dn42peering.agent.proto.NodeConfig; + +import javax.annotation.Nonnull; +import java.io.*; + +public class Persistent { + private static final Logger logger = LoggerFactory.getLogger(Persistent.class.getSimpleName()); + + public static boolean enabled(@Nonnull Vertx vertx) { + return vertx.getOrCreateContext().config().getBoolean("persistent", false); + } + + @Nonnull + public static String getPath(@Nonnull Vertx vertx) { + return vertx.getOrCreateContext().config().getString("persistent_path", + "/var/lib/dn42peering/agent/config"); + } + + public static Future recover(@Nonnull Vertx vertx) { + if(!enabled(vertx)) { + logger.info("Persistent disabled."); + return Future.succeededFuture(null); + } + if(!new File(getPath(vertx)).exists()) { + logger.info("Persistent file is not found."); + return Future.succeededFuture(null); + } + return vertx.executeBlocking(f -> { + logger.info("Recovering from persistent state..."); + try(final InputStream in = new FileInputStream(getPath(vertx))) { + final NodeConfig config = NodeConfig.parseDelimitedFrom(in); + f.complete(config); + } catch (IOException e) { + f.fail(e); + } + }).compose(config -> Deploy.deploy(vertx, config)) + .onSuccess(res -> + logger.info("Recovered from persistent state.")); + } + + @Nonnull + public static Future persistent(@Nonnull Vertx vertx, @Nonnull NodeConfig config) { + if (!enabled(vertx)) return Future.succeededFuture(); + return vertx.fileSystem() + .open(getPath(vertx), + new OpenOptions() + .setWrite(true) + .setCreate(true)) + .compose(file -> { + return vertx.executeBlocking(f -> { + try { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + config.writeDelimitedTo(stream); + file.write(Buffer.buffer(stream.toByteArray())); + stream.close(); + f.complete(file); + } catch (IOException e) { + f.fail(e); + } + }); + }) + .compose(AsyncFile::close) + .compose(file -> + vertx.fileSystem().chmod(getPath(vertx), "rw-------")) + .onFailure(err -> logger.error("Cannot persistent node configuration", err)); + } +} diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java index 1e91020..3ceb9fb 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java @@ -4,16 +4,13 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; +import moe.yuuta.dn42peering.agent.Deploy; +import moe.yuuta.dn42peering.agent.Persistent; import moe.yuuta.dn42peering.agent.proto.DeployResult; import moe.yuuta.dn42peering.agent.proto.NodeConfig; import moe.yuuta.dn42peering.agent.proto.VertxAgentGrpc; -import moe.yuuta.dn42peering.agent.provision.BGPProvisioner; -import moe.yuuta.dn42peering.agent.provision.Change; -import moe.yuuta.dn42peering.agent.provision.WireGuardCleanupProvisioner; -import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner; import javax.annotation.Nonnull; -import java.util.List; class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); @@ -24,37 +21,12 @@ class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { this.vertx = vertx; } - private Future chainChanges(@Nonnull List changes) { - if(changes.isEmpty()) { - return Future.succeededFuture(); - } - Future last = changes.get(0).execute(vertx); - for (int i = 1; i < changes.size(); i ++) { - final Change current = changes.get(i); - last = last.compose(_v -> current.execute(vertx)); - } - return last; - } - @Override public Future deploy(NodeConfig config) { - logger.info("Deployment started"); - final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx); - final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx); - final WireGuardCleanupProvisioner wireGuardCleanupProvisioner = new WireGuardCleanupProvisioner(vertx); - - // TODO: Currently all provisioning operations are non-fault-tolering. This means that - // TODO: if one operation fails, the following will fail. This may be changed in later. - // Changes in each provisioners are executed in sequence. - // Two provisioners are executed in sequence. - return wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()) - .compose(this::chainChanges) - .compose(_v -> bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()) - .compose(this::chainChanges)) - .compose(_v -> wireGuardCleanupProvisioner.calculateChanges(config.getNode(), config.getWgsList()) - .compose(this::chainChanges)) - .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above.")) - .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err)) - .compose(compositeFuture -> Future.succeededFuture(null)); + return Deploy.deploy(vertx, config) + .compose(_v -> Future.future(f -> { + Persistent.persistent(vertx, config) + .onComplete(res -> f.complete(_v)); // Ignore errors + })); } } diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java index 3390b47..c24c29a 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java @@ -2,25 +2,39 @@ package moe.yuuta.dn42peering.agent.grpc; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.grpc.VertxServer; import io.vertx.grpc.VertxServerBuilder; import moe.yuuta.dn42peering.RPC; +import moe.yuuta.dn42peering.agent.Persistent; public class RPCVerticle extends AbstractVerticle { + private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); + private VertxServer server; @Override public void start(Promise startPromise) throws Exception { - server = VertxServerBuilder - .forAddress(vertx, vertx.getOrCreateContext().config().getString("internal_ip"), - RPC.AGENT_PORT) - .addService(new AgentServiceImpl(vertx)) - .build() - .start(startPromise); + Persistent.recover(vertx) + .onComplete(ar -> { + if(ar.succeeded()) { + server = VertxServerBuilder + .forAddress(vertx, vertx.getOrCreateContext().config().getString("internal_ip"), + RPC.AGENT_PORT) + .addService(new AgentServiceImpl(vertx)) + .build() + .start(startPromise); + } else { + logger.error("Cannot recover from persistent state, aborting.", ar.cause()); + startPromise.fail("Recover failed."); + } + }); } @Override public void stop(Promise stopPromise) throws Exception { - server.shutdown(stopPromise); + if(server != null) + server.shutdown(stopPromise); } } diff --git a/docs/agent/Configuration.md b/docs/agent/Configuration.md index 9d9af8f..ee3caaf 100644 --- a/docs/agent/Configuration.md +++ b/docs/agent/Configuration.md @@ -6,7 +6,9 @@ The configuration format of agent is JSON. ```json { - "internal_ip": "" + "internal_ip": "", + "persistent": false, + "persistent_path": "/var/lib/dn42peering/agent/config" } ``` @@ -18,4 +20,17 @@ The central communicates with agents using gRPC. An internal IP address is requi The only requirement for internal IP address is that the agent can communicate with central. Agents are not required to communicate with each other. -Though is it OK to use the dn42 IP address, it is strongly recommended to create a separete VPN tunnel between each agent and central combination, as some provision failure could cause the agent to disconnected. \ No newline at end of file +Though is it OK to use the dn42 IP address, it is strongly recommended to create a separete VPN tunnel between each agent and central combination, as some provision failure could cause the agent to disconnected. + +# Persistent + +Even though the agent is supposed to be stateless, it is inevitable to store the configuration for recovery. WireGuard interfaces will not be saved by the operating system across reboots, so +it is necessary to redeploy after rebooting. + +If `persistent` is set to true, the agent stores incoming change after deployments are finished and successful to the path given by `persistent_path`. + +Whether the saving process is successful or not, it will not affect the deployment result. In other words, persistent failures are not fetal. + +The persistent file will be created if not existing. Because it contains sensitive data (e.g. WireGuard keys), it must be saved with care. The agent will chmod 600 each time after closing the file so make sure the agent has ownership with it. + +Upon starting, the agent will reads the persistent file. If it does not exist or if `persistent` is set to false, the agent will start as normal. However, if the persistent file is corrupt or cannot be read, the agent will refuse to start. In this case, you may manually delete it and redeploy from the administrative panel. \ No newline at end of file -- cgit v1.2.3