aboutsummaryrefslogtreecommitdiff
path: root/agent
diff options
context:
space:
mode:
authorTrumeet <yuuta@yuuta.moe>2021-04-04 13:50:09 -0700
committerTrumeet <yuuta@yuuta.moe>2021-04-04 13:50:09 -0700
commit83d5b5431a1571147e5188fb3ad4254660ef86ae (patch)
tree072391981c36e0e063baffb758f4f3b5d2d66913 /agent
parente97ab628c39a8650e73897b3156ae3f7d1b43d67 (diff)
downloaddn42peering-83d5b5431a1571147e5188fb3ad4254660ef86ae.tar
dn42peering-83d5b5431a1571147e5188fb3ad4254660ef86ae.tar.gz
dn42peering-83d5b5431a1571147e5188fb3ad4254660ef86ae.tar.bz2
dn42peering-83d5b5431a1571147e5188fb3ad4254660ef86ae.zip
feat(agent): persistent configuration
Diffstat (limited to 'agent')
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java51
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java2
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java77
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java42
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java28
5 files changed, 157 insertions, 43 deletions
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java
new file mode 100644
index 0000000..94f614b
--- /dev/null
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Deploy.java
@@ -0,0 +1,51 @@
+package moe.yuuta.dn42peering.agent;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import moe.yuuta.dn42peering.agent.proto.DeployResult;
+import moe.yuuta.dn42peering.agent.proto.NodeConfig;
+import moe.yuuta.dn42peering.agent.provision.BGPProvisioner;
+import moe.yuuta.dn42peering.agent.provision.Change;
+import moe.yuuta.dn42peering.agent.provision.WireGuardCleanupProvisioner;
+import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner;
+
+import javax.annotation.Nonnull;
+import java.util.List;
+
+public class Deploy {
+ private static final Logger logger = LoggerFactory.getLogger(Deploy.class.getSimpleName());
+ private static Future<Void> chainChanges(@Nonnull Vertx vertx, @Nonnull List<Change> changes) {
+ if(changes.isEmpty()) {
+ return Future.succeededFuture();
+ }
+ Future<Void> last = changes.get(0).execute(vertx);
+ for (int i = 1; i < changes.size(); i ++) {
+ final Change current = changes.get(i);
+ last = last.compose(_v -> current.execute(vertx));
+ }
+ return last;
+ }
+
+ public static Future<DeployResult> deploy(@Nonnull Vertx vertx, @Nonnull NodeConfig config) {
+ logger.info("Deployment started");
+ final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx);
+ final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx);
+ final WireGuardCleanupProvisioner wireGuardCleanupProvisioner = new WireGuardCleanupProvisioner(vertx);
+
+ // TODO: Currently all provisioning operations are non-fault-tolering. This means that
+ // TODO: if one operation fails, the following will fail. This may be changed in later.
+ // Changes in each provisioners are executed in sequence.
+ // Two provisioners are executed in sequence.
+ return wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList())
+ .compose(changes -> chainChanges(vertx, changes))
+ .compose(_v -> bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList())
+ .compose(changes -> chainChanges(vertx, changes)))
+ .compose(_v -> wireGuardCleanupProvisioner.calculateChanges(config.getNode(), config.getWgsList())
+ .compose(changes -> chainChanges(vertx, changes)))
+ .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above."))
+ .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err))
+ .compose(compositeFuture -> Future.succeededFuture(null));
+ }
+}
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java
index fb9eead..9efca46 100644
--- a/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Main.java
@@ -29,7 +29,7 @@ public class Main {
final Vertx vertx = Vertx.vertx(new VertxOptions());
final DeploymentOptions options = new DeploymentOptions()
.setConfig(config)
- .setInstances(Runtime.getRuntime().availableProcessors() * 2);
+ .setInstances(1);
Logger logger = LoggerFactory.getLogger("Main");
CompositeFuture.all(Arrays.asList(
Future.<String>future(f -> vertx.deployVerticle(RPCVerticle.class.getName(), options, f))
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java
new file mode 100644
index 0000000..0f29da4
--- /dev/null
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/Persistent.java
@@ -0,0 +1,77 @@
+package moe.yuuta.dn42peering.agent;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import moe.yuuta.dn42peering.agent.proto.DeployResult;
+import moe.yuuta.dn42peering.agent.proto.NodeConfig;
+
+import javax.annotation.Nonnull;
+import java.io.*;
+
+public class Persistent {
+ private static final Logger logger = LoggerFactory.getLogger(Persistent.class.getSimpleName());
+
+ public static boolean enabled(@Nonnull Vertx vertx) {
+ return vertx.getOrCreateContext().config().getBoolean("persistent", false);
+ }
+
+ @Nonnull
+ public static String getPath(@Nonnull Vertx vertx) {
+ return vertx.getOrCreateContext().config().getString("persistent_path",
+ "/var/lib/dn42peering/agent/config");
+ }
+
+ public static Future<DeployResult> recover(@Nonnull Vertx vertx) {
+ if(!enabled(vertx)) {
+ logger.info("Persistent disabled.");
+ return Future.succeededFuture(null);
+ }
+ if(!new File(getPath(vertx)).exists()) {
+ logger.info("Persistent file is not found.");
+ return Future.succeededFuture(null);
+ }
+ return vertx.<NodeConfig>executeBlocking(f -> {
+ logger.info("Recovering from persistent state...");
+ try(final InputStream in = new FileInputStream(getPath(vertx))) {
+ final NodeConfig config = NodeConfig.parseDelimitedFrom(in);
+ f.complete(config);
+ } catch (IOException e) {
+ f.fail(e);
+ }
+ }).compose(config -> Deploy.deploy(vertx, config))
+ .onSuccess(res ->
+ logger.info("Recovered from persistent state."));
+ }
+
+ @Nonnull
+ public static Future<Void> persistent(@Nonnull Vertx vertx, @Nonnull NodeConfig config) {
+ if (!enabled(vertx)) return Future.succeededFuture();
+ return vertx.fileSystem()
+ .open(getPath(vertx),
+ new OpenOptions()
+ .setWrite(true)
+ .setCreate(true))
+ .<AsyncFile>compose(file -> {
+ return vertx.executeBlocking(f -> {
+ try {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ config.writeDelimitedTo(stream);
+ file.write(Buffer.buffer(stream.toByteArray()));
+ stream.close();
+ f.complete(file);
+ } catch (IOException e) {
+ f.fail(e);
+ }
+ });
+ })
+ .compose(AsyncFile::close)
+ .compose(file ->
+ vertx.fileSystem().chmod(getPath(vertx), "rw-------"))
+ .onFailure(err -> logger.error("Cannot persistent node configuration", err));
+ }
+}
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java
index 1e91020..3ceb9fb 100644
--- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java
@@ -4,16 +4,13 @@ import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
+import moe.yuuta.dn42peering.agent.Deploy;
+import moe.yuuta.dn42peering.agent.Persistent;
import moe.yuuta.dn42peering.agent.proto.DeployResult;
import moe.yuuta.dn42peering.agent.proto.NodeConfig;
import moe.yuuta.dn42peering.agent.proto.VertxAgentGrpc;
-import moe.yuuta.dn42peering.agent.provision.BGPProvisioner;
-import moe.yuuta.dn42peering.agent.provision.Change;
-import moe.yuuta.dn42peering.agent.provision.WireGuardCleanupProvisioner;
-import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner;
import javax.annotation.Nonnull;
-import java.util.List;
class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase {
private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName());
@@ -24,37 +21,12 @@ class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase {
this.vertx = vertx;
}
- private Future<Void> chainChanges(@Nonnull List<Change> changes) {
- if(changes.isEmpty()) {
- return Future.succeededFuture();
- }
- Future<Void> last = changes.get(0).execute(vertx);
- for (int i = 1; i < changes.size(); i ++) {
- final Change current = changes.get(i);
- last = last.compose(_v -> current.execute(vertx));
- }
- return last;
- }
-
@Override
public Future<DeployResult> deploy(NodeConfig config) {
- logger.info("Deployment started");
- final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx);
- final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx);
- final WireGuardCleanupProvisioner wireGuardCleanupProvisioner = new WireGuardCleanupProvisioner(vertx);
-
- // TODO: Currently all provisioning operations are non-fault-tolering. This means that
- // TODO: if one operation fails, the following will fail. This may be changed in later.
- // Changes in each provisioners are executed in sequence.
- // Two provisioners are executed in sequence.
- return wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList())
- .compose(this::chainChanges)
- .compose(_v -> bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList())
- .compose(this::chainChanges))
- .compose(_v -> wireGuardCleanupProvisioner.calculateChanges(config.getNode(), config.getWgsList())
- .compose(this::chainChanges))
- .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above."))
- .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err))
- .compose(compositeFuture -> Future.succeededFuture(null));
+ return Deploy.deploy(vertx, config)
+ .compose(_v -> Future.future(f -> {
+ Persistent.persistent(vertx, config)
+ .onComplete(res -> f.complete(_v)); // Ignore errors
+ }));
}
}
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java
index 3390b47..c24c29a 100644
--- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/RPCVerticle.java
@@ -2,25 +2,39 @@ package moe.yuuta.dn42peering.agent.grpc;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.grpc.VertxServer;
import io.vertx.grpc.VertxServerBuilder;
import moe.yuuta.dn42peering.RPC;
+import moe.yuuta.dn42peering.agent.Persistent;
public class RPCVerticle extends AbstractVerticle {
+ private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName());
+
private VertxServer server;
@Override
public void start(Promise<Void> startPromise) throws Exception {
- server = VertxServerBuilder
- .forAddress(vertx, vertx.getOrCreateContext().config().getString("internal_ip"),
- RPC.AGENT_PORT)
- .addService(new AgentServiceImpl(vertx))
- .build()
- .start(startPromise);
+ Persistent.recover(vertx)
+ .onComplete(ar -> {
+ if(ar.succeeded()) {
+ server = VertxServerBuilder
+ .forAddress(vertx, vertx.getOrCreateContext().config().getString("internal_ip"),
+ RPC.AGENT_PORT)
+ .addService(new AgentServiceImpl(vertx))
+ .build()
+ .start(startPromise);
+ } else {
+ logger.error("Cannot recover from persistent state, aborting.", ar.cause());
+ startPromise.fail("Recover failed.");
+ }
+ });
}
@Override
public void stop(Promise<Void> stopPromise) throws Exception {
- server.shutdown(stopPromise);
+ if(server != null)
+ server.shutdown(stopPromise);
}
}