diff options
Diffstat (limited to 'agent/src/main/java')
5 files changed, 157 insertions, 43 deletions
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java new file mode 100644 index 0000000..94f614b --- /dev/null +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java @@ -0,0 +1,51 @@ +package moe.yuuta.dn42peering.agent; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import moe.yuuta.dn42peering.agent.proto.DeployResult; +import moe.yuuta.dn42peering.agent.proto.NodeConfig; +import moe.yuuta.dn42peering.agent.provision.BGPProvisioner; +import moe.yuuta.dn42peering.agent.provision.Change; +import moe.yuuta.dn42peering.agent.provision.WireGuardCleanupProvisioner; +import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner; + +import javax.annotation.Nonnull; +import java.util.List; + +public class Deploy { + private static final Logger logger = LoggerFactory.getLogger(Deploy.class.getSimpleName()); + private static Future<Void> chainChanges(@Nonnull Vertx vertx, @Nonnull List<Change> changes) { + if(changes.isEmpty()) { + return Future.succeededFuture(); + } + Future<Void> last = changes.get(0).execute(vertx); + for (int i = 1; i < changes.size(); i ++) { + final Change current = changes.get(i); + last = last.compose(_v -> current.execute(vertx)); + } + return last; + } + + public static Future<DeployResult> deploy(@Nonnull Vertx vertx, @Nonnull NodeConfig config) { + logger.info("Deployment started"); + final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx); + final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx); + final WireGuardCleanupProvisioner wireGuardCleanupProvisioner = new WireGuardCleanupProvisioner(vertx); + + // TODO: Currently all provisioning operations are non-fault-tolering. This means that + // TODO: if one operation fails, the following will fail. This may be changed in later. + // Changes in each provisioners are executed in sequence. + // Two provisioners are executed in sequence. + return wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()) + .compose(changes -> chainChanges(vertx, changes)) + .compose(_v -> bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()) + .compose(changes -> chainChanges(vertx, changes))) + .compose(_v -> wireGuardCleanupProvisioner.calculateChanges(config.getNode(), config.getWgsList()) + .compose(changes -> chainChanges(vertx, changes))) + .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above.")) + .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err)) + .compose(compositeFuture -> Future.succeededFuture(null)); + } +} diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java index fb9eead..9efca46 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java @@ -29,7 +29,7 @@ public class Main { final Vertx vertx = Vertx.vertx(new VertxOptions()); final DeploymentOptions options = new DeploymentOptions() .setConfig(config) - .setInstances(Runtime.getRuntime().availableProcessors() * 2); + .setInstances(1); Logger logger = LoggerFactory.getLogger("Main"); CompositeFuture.all(Arrays.asList( Future.<String>future(f -> vertx.deployVerticle(RPCVerticle.class.getName(), options, f)) diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java new file mode 100644 index 0000000..0f29da4 --- /dev/null +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java @@ -0,0 +1,77 @@ +package moe.yuuta.dn42peering.agent; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import moe.yuuta.dn42peering.agent.proto.DeployResult; +import moe.yuuta.dn42peering.agent.proto.NodeConfig; + +import javax.annotation.Nonnull; +import java.io.*; + +public class Persistent { + private static final Logger logger = LoggerFactory.getLogger(Persistent.class.getSimpleName()); + + public static boolean enabled(@Nonnull Vertx vertx) { + return vertx.getOrCreateContext().config().getBoolean("persistent", false); + } + + @Nonnull + public static String getPath(@Nonnull Vertx vertx) { + return vertx.getOrCreateContext().config().getString("persistent_path", + "/var/lib/dn42peering/agent/config"); + } + + public static Future<DeployResult> recover(@Nonnull Vertx vertx) { + if(!enabled(vertx)) { + logger.info("Persistent disabled."); + return Future.succeededFuture(null); + } + if(!new File(getPath(vertx)).exists()) { + logger.info("Persistent file is not found."); + return Future.succeededFuture(null); + } + return vertx.<NodeConfig>executeBlocking(f -> { + logger.info("Recovering from persistent state..."); + try(final InputStream in = new FileInputStream(getPath(vertx))) { + final NodeConfig config = NodeConfig.parseDelimitedFrom(in); + f.complete(config); + } catch (IOException e) { + f.fail(e); + } + }).compose(config -> Deploy.deploy(vertx, config)) + .onSuccess(res -> + logger.info("Recovered from persistent state.")); + } + + @Nonnull + public static Future<Void> persistent(@Nonnull Vertx vertx, @Nonnull NodeConfig config) { + if (!enabled(vertx)) return Future.succeededFuture(); + return vertx.fileSystem() + .open(getPath(vertx), + new OpenOptions() + .setWrite(true) + .setCreate(true)) + .<AsyncFile>compose(file -> { + return vertx.executeBlocking(f -> { + try { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + config.writeDelimitedTo(stream); + file.write(Buffer.buffer(stream.toByteArray())); + stream.close(); + f.complete(file); + } catch (IOException e) { + f.fail(e); + } + }); + }) + .compose(AsyncFile::close) + .compose(file -> + vertx.fileSystem().chmod(getPath(vertx), "rw-------")) + .onFailure(err -> logger.error("Cannot persistent node configuration", err)); + } +} diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java index 1e91020..3ceb9fb 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java @@ -4,16 +4,13 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; +import moe.yuuta.dn42peering.agent.Deploy; +import moe.yuuta.dn42peering.agent.Persistent; import moe.yuuta.dn42peering.agent.proto.DeployResult; import moe.yuuta.dn42peering.agent.proto.NodeConfig; import moe.yuuta.dn42peering.agent.proto.VertxAgentGrpc; -import moe.yuuta.dn42peering.agent.provision.BGPProvisioner; -import moe.yuuta.dn42peering.agent.provision.Change; -import moe.yuuta.dn42peering.agent.provision.WireGuardCleanupProvisioner; -import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner; import javax.annotation.Nonnull; -import java.util.List; class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); @@ -24,37 +21,12 @@ class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { this.vertx = vertx; } - private Future<Void> chainChanges(@Nonnull List<Change> changes) { - if(changes.isEmpty()) { - return Future.succeededFuture(); - } - Future<Void> last = changes.get(0).execute(vertx); - for (int i = 1; i < changes.size(); i ++) { - final Change current = changes.get(i); - last = last.compose(_v -> current.execute(vertx)); - } - return last; - } - @Override public Future<DeployResult> deploy(NodeConfig config) { - logger.info("Deployment started"); - final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx); - final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx); - final WireGuardCleanupProvisioner wireGuardCleanupProvisioner = new WireGuardCleanupProvisioner(vertx); - - // TODO: Currently all provisioning operations are non-fault-tolering. This means that - // TODO: if one operation fails, the following will fail. This may be changed in later. - // Changes in each provisioners are executed in sequence. - // Two provisioners are executed in sequence. - return wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()) - .compose(this::chainChanges) - .compose(_v -> bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()) - .compose(this::chainChanges)) - .compose(_v -> wireGuardCleanupProvisioner.calculateChanges(config.getNode(), config.getWgsList()) - .compose(this::chainChanges)) - .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above.")) - .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err)) - .compose(compositeFuture -> Future.succeededFuture(null)); + return Deploy.deploy(vertx, config) + .compose(_v -> Future.future(f -> { + Persistent.persistent(vertx, config) + .onComplete(res -> f.complete(_v)); // Ignore errors + })); } } diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java index 3390b47..c24c29a 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java @@ -2,25 +2,39 @@ package moe.yuuta.dn42peering.agent.grpc; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.grpc.VertxServer; import io.vertx.grpc.VertxServerBuilder; import moe.yuuta.dn42peering.RPC; +import moe.yuuta.dn42peering.agent.Persistent; public class RPCVerticle extends AbstractVerticle { + private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); + private VertxServer server; @Override public void start(Promise<Void> startPromise) throws Exception { - server = VertxServerBuilder - .forAddress(vertx, vertx.getOrCreateContext().config().getString("internal_ip"), - RPC.AGENT_PORT) - .addService(new AgentServiceImpl(vertx)) - .build() - .start(startPromise); + Persistent.recover(vertx) + .onComplete(ar -> { + if(ar.succeeded()) { + server = VertxServerBuilder + .forAddress(vertx, vertx.getOrCreateContext().config().getString("internal_ip"), + RPC.AGENT_PORT) + .addService(new AgentServiceImpl(vertx)) + .build() + .start(startPromise); + } else { + logger.error("Cannot recover from persistent state, aborting.", ar.cause()); + startPromise.fail("Recover failed."); + } + }); } @Override public void stop(Promise<Void> stopPromise) throws Exception { - server.shutdown(stopPromise); + if(server != null) + server.shutdown(stopPromise); } } |