diff options
author | Trumeet <yuuta@yuuta.moe> | 2021-04-03 15:57:22 -0700 |
---|---|---|
committer | Trumeet <yuuta@yuuta.moe> | 2021-04-03 15:57:22 -0700 |
commit | 4a018c6f27a70a6723f1bb835560d83322cc374f (patch) | |
tree | ed3506200bb06484e32193fe8ebc4ffc399a1529 | |
parent | fb9fb86002178f9e8b4693d2cf50cacb55978424 (diff) | |
download | dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.tar dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.tar.gz dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.tar.bz2 dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.zip |
fix(agent): some tasks cannot execute in parallel
-rw-r--r-- | agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java | 37 | ||||
-rw-r--r-- | agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java | 37 |
2 files changed, 46 insertions, 28 deletions
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java index 4c4deb2..97daf61 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java @@ -15,7 +15,6 @@ import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner; import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); @@ -26,24 +25,34 @@ class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { this.vertx = vertx; } + private Future<Void> chainChanges(@Nonnull List<Change> changes) { + if(changes.isEmpty()) { + return Future.succeededFuture(); + } + Future<Void> last = changes.get(0).execute(vertx); + for (int i = 1; i < changes.size(); i ++) { + final Change current = changes.get(i); + last = last.compose(_v -> current.execute(vertx)); + } + return last; + } + @Override public Future<DeployResult> deploy(NodeConfig config) { logger.info("Deployment started"); final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx); final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx); - final List<Future> calcFutures = new ArrayList<>(); - calcFutures.add(bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList())); - calcFutures.add(wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList())); - - return CompositeFuture.all(calcFutures) - .compose(compositeFuture -> { - final List<Future> changes = new ArrayList<>(calcFutures.size()); - for (int i = 0; i < calcFutures.size(); i ++) { - final List<Change> list = compositeFuture.resultAt(i); - changes.addAll(list.stream().map(change -> change.execute(vertx)).collect(Collectors.toList())); - } - return CompositeFuture.all(changes); - }) + + final List<Future> execFutures = new ArrayList<>(2); + execFutures.add(bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()) + .compose(this::chainChanges)); + execFutures.add(wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()) + .compose(this::chainChanges)); + + // Changes in each provisioners are executed in sequence. + // Two provisioners are executed in parallel. + // We must wait all calculations done. + return CompositeFuture.join(execFutures) .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above.")) .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err)) .compose(compositeFuture -> Future.succeededFuture(null)); diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java index 913f109..74cf49c 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java @@ -121,25 +121,34 @@ public class BGPProvisioner implements IProvisioner<BGPConfig> { @Nonnull @Override public Future<List<Change>> calculateChanges(@Nonnull Node node, @Nonnull List<BGPConfig> allDesired) { - final List<Future<List<Change>>> addOrModifyChanges = - allDesired.stream().map(this::calculateSingleChange).collect(Collectors.toList()); + // All of these calculations can be done in parallel but we must wait all of them to finish. + final Future<List<Change>> addOrModifyChanges = + CompositeFuture.join(allDesired.stream() + .map(this::calculateSingleChange) + .collect(Collectors.toList())) + .compose(compositeFuture -> { + final List<Change> changes = new ArrayList<>(); + for(int i = 0; i < compositeFuture.size(); i ++) + changes.addAll(compositeFuture.resultAt(i)); + return Future.succeededFuture(changes); + }); final Future<List<Change>> deleteChanges = calculateDeleteChanges(allDesired); final Future<List<Change>> reloadChange = Future.succeededFuture(Collections.singletonList( new CommandChange(new String[] { "birdc", "configure" }))); - final List<Future> futures = new ArrayList<>(addOrModifyChanges.size() + 2); - futures.addAll(addOrModifyChanges); - futures.add(deleteChanges); - futures.add(reloadChange); - return CompositeFuture.all(futures) - .compose(compositeFuture -> { - final List<Change> changes = new ArrayList<>(futures.size()); - for(int i = 0; i < futures.size(); i ++) { - changes.addAll(compositeFuture.resultAt(i)); - } - return Future.succeededFuture(changes); - }); + // The three major steps above must be done in sequence. + return addOrModifyChanges.compose(changes -> { + return deleteChanges.compose(deleteChangeList -> { + changes.addAll(deleteChangeList); + return Future.succeededFuture(changes); + }); + }).compose(changes -> { + return reloadChange.compose(reloadChangeList -> { + changes.addAll(reloadChangeList); + return Future.succeededFuture(changes); + }); + }); } } |