aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrumeet <yuuta@yuuta.moe>2021-04-03 15:57:22 -0700
committerTrumeet <yuuta@yuuta.moe>2021-04-03 15:57:22 -0700
commit4a018c6f27a70a6723f1bb835560d83322cc374f (patch)
treeed3506200bb06484e32193fe8ebc4ffc399a1529
parentfb9fb86002178f9e8b4693d2cf50cacb55978424 (diff)
downloaddn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.tar
dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.tar.gz
dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.tar.bz2
dn42peering-4a018c6f27a70a6723f1bb835560d83322cc374f.zip
fix(agent): some tasks cannot execute in parallel
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java37
-rw-r--r--agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java37
2 files changed, 46 insertions, 28 deletions
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java
index 4c4deb2..97daf61 100644
--- a/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/grpc/AgentServiceImpl.java
@@ -15,7 +15,6 @@ import moe.yuuta.dn42peering.agent.provision.WireGuardProvisioner;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase {
private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName());
@@ -26,24 +25,34 @@ class AgentServiceImpl extends VertxAgentGrpc.AgentVertxImplBase {
this.vertx = vertx;
}
+ private Future<Void> chainChanges(@Nonnull List<Change> changes) {
+ if(changes.isEmpty()) {
+ return Future.succeededFuture();
+ }
+ Future<Void> last = changes.get(0).execute(vertx);
+ for (int i = 1; i < changes.size(); i ++) {
+ final Change current = changes.get(i);
+ last = last.compose(_v -> current.execute(vertx));
+ }
+ return last;
+ }
+
@Override
public Future<DeployResult> deploy(NodeConfig config) {
logger.info("Deployment started");
final BGPProvisioner bgpProvisioner = new BGPProvisioner(vertx);
final WireGuardProvisioner wireGuardProvisioner = new WireGuardProvisioner(vertx);
- final List<Future> calcFutures = new ArrayList<>();
- calcFutures.add(bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList()));
- calcFutures.add(wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList()));
-
- return CompositeFuture.all(calcFutures)
- .compose(compositeFuture -> {
- final List<Future> changes = new ArrayList<>(calcFutures.size());
- for (int i = 0; i < calcFutures.size(); i ++) {
- final List<Change> list = compositeFuture.resultAt(i);
- changes.addAll(list.stream().map(change -> change.execute(vertx)).collect(Collectors.toList()));
- }
- return CompositeFuture.all(changes);
- })
+
+ final List<Future> execFutures = new ArrayList<>(2);
+ execFutures.add(bgpProvisioner.calculateChanges(config.getNode(), config.getBgpsList())
+ .compose(this::chainChanges));
+ execFutures.add(wireGuardProvisioner.calculateChanges(config.getNode(), config.getWgsList())
+ .compose(this::chainChanges));
+
+ // Changes in each provisioners are executed in sequence.
+ // Two provisioners are executed in parallel.
+ // We must wait all calculations done.
+ return CompositeFuture.join(execFutures)
.onSuccess(res -> logger.info("Deployment finished. Detailed log can be traced above."))
.onFailure(err -> logger.error("Deployment failed. Detailed log can be traced above.", err))
.compose(compositeFuture -> Future.succeededFuture(null));
diff --git a/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java b/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java
index 913f109..74cf49c 100644
--- a/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java
+++ b/agent/src/main/java/moe/yuuta/dn42peering/agent/provision/BGPProvisioner.java
@@ -121,25 +121,34 @@ public class BGPProvisioner implements IProvisioner<BGPConfig> {
@Nonnull
@Override
public Future<List<Change>> calculateChanges(@Nonnull Node node, @Nonnull List<BGPConfig> allDesired) {
- final List<Future<List<Change>>> addOrModifyChanges =
- allDesired.stream().map(this::calculateSingleChange).collect(Collectors.toList());
+ // All of these calculations can be done in parallel but we must wait all of them to finish.
+ final Future<List<Change>> addOrModifyChanges =
+ CompositeFuture.join(allDesired.stream()
+ .map(this::calculateSingleChange)
+ .collect(Collectors.toList()))
+ .compose(compositeFuture -> {
+ final List<Change> changes = new ArrayList<>();
+ for(int i = 0; i < compositeFuture.size(); i ++)
+ changes.addAll(compositeFuture.resultAt(i));
+ return Future.succeededFuture(changes);
+ });
final Future<List<Change>> deleteChanges =
calculateDeleteChanges(allDesired);
final Future<List<Change>> reloadChange =
Future.succeededFuture(Collections.singletonList(
new CommandChange(new String[] { "birdc", "configure" })));
- final List<Future> futures = new ArrayList<>(addOrModifyChanges.size() + 2);
- futures.addAll(addOrModifyChanges);
- futures.add(deleteChanges);
- futures.add(reloadChange);
- return CompositeFuture.all(futures)
- .compose(compositeFuture -> {
- final List<Change> changes = new ArrayList<>(futures.size());
- for(int i = 0; i < futures.size(); i ++) {
- changes.addAll(compositeFuture.resultAt(i));
- }
- return Future.succeededFuture(changes);
- });
+ // The three major steps above must be done in sequence.
+ return addOrModifyChanges.compose(changes -> {
+ return deleteChanges.compose(deleteChangeList -> {
+ changes.addAll(deleteChangeList);
+ return Future.succeededFuture(changes);
+ });
+ }).compose(changes -> {
+ return reloadChange.compose(reloadChangeList -> {
+ changes.addAll(reloadChangeList);
+ return Future.succeededFuture(changes);
+ });
+ });
}
}