From 4a018c6f27a70a6723f1bb835560d83322cc374f Mon Sep 17 00:00:00 2001 From: Trumeet Date: Sat, 3 Apr 2021 15:57:22 -0700 Subject: fix(agent): some tasks cannot execute in parallel --- .../dn42peering/agent/grpc/AgentServiceImpl.java | 37 ++++++++++++++-------- .../agent/provision/BGPProvisioner.java | 37 ++++++++++++++-------- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java index 4c4deb2..97daf61 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java @@ -15,7 +15,6 @@ import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner; import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); @@ -26,24 +25,34 @@ class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase { this.vertx = vertx; } + private Future chainChanges(@Nonnull List changes) { + if(changes.isEmpty()) { + return Future.succeededFuture(); + } + Future last = changes.get(0).execute(vertx); + for (int i = 1; i < changes.size(); i ++) { + final Change current = changes.get(i); + last = last.compose(_v -> current.execute(vertx)); + } + return last; + } + @Override public Future deploy(NodeConfig config) { logger.info("Deployment started"); final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx); final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx); - final List calcFutures = new ArrayList<>(); - calcFutures.add(bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList())); - calcFutures.add(wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList())); - - return CompositeFuture.all(calcFutures) - .compose(compositeFuture -> { - final List changes = new ArrayList<>(calcFutures.size()); - for (int i = 0; i < calcFutures.size(); i ++) { - final List list = compositeFuture.resultAt(i); - changes.addAll(list.stream().map(change -> change.execute(vertx)).collect(Collectors.toList())); - } - return CompositeFuture.all(changes); - }) + + final List execFutures = new ArrayList<>(2); + execFutures.add(bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()) + .compose(this::chainChanges)); + execFutures.add(wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()) + .compose(this::chainChanges)); + + // Changes in each provisioners are executed in sequence. + // Two provisioners are executed in parallel. + // We must wait all calculations done. + return CompositeFuture.join(execFutures) .onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above.")) .onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err)) .compose(compositeFuture -> Future.succeededFuture(null)); diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java index 913f109..74cf49c 100644 --- a/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java +++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java @@ -121,25 +121,34 @@ public class BGPProvisioner implements IProvisioner { @Nonnull @Override public Future> calculateChanges(@Nonnull Node node, @Nonnull List allDesired) { - final List>> addOrModifyChanges = - allDesired.stream().map(this::calculateSingleChange).collect(Collectors.toList()); + // All of these calculations can be done in parallel but we must wait all of them to finish. + final Future> addOrModifyChanges = + CompositeFuture.join(allDesired.stream() + .map(this::calculateSingleChange) + .collect(Collectors.toList())) + .compose(compositeFuture -> { + final List changes = new ArrayList<>(); + for(int i = 0; i < compositeFuture.size(); i ++) + changes.addAll(compositeFuture.resultAt(i)); + return Future.succeededFuture(changes); + }); final Future> deleteChanges = calculateDeleteChanges(allDesired); final Future> reloadChange = Future.succeededFuture(Collections.singletonList( new CommandChange(new String[] { "birdc", "configure" }))); - final List futures = new ArrayList<>(addOrModifyChanges.size() + 2); - futures.addAll(addOrModifyChanges); - futures.add(deleteChanges); - futures.add(reloadChange); - return CompositeFuture.all(futures) - .compose(compositeFuture -> { - final List changes = new ArrayList<>(futures.size()); - for(int i = 0; i < futures.size(); i ++) { - changes.addAll(compositeFuture.resultAt(i)); - } - return Future.succeededFuture(changes); - }); + // The three major steps above must be done in sequence. + return addOrModifyChanges.compose(changes -> { + return deleteChanges.compose(deleteChangeList -> { + changes.addAll(deleteChangeList); + return Future.succeededFuture(changes); + }); + }).compose(changes -> { + return reloadChange.compose(reloadChangeList -> { + changes.addAll(reloadChangeList); + return Future.succeededFuture(changes); + }); + }); } } -- cgit v1.2.3