aboutsummaryrefslogtreecommitdiff
path: root/central/src/main/java/moe/yuuta/dn42peering/provision/ProvisionRemoteServiceImpl.java
blob: 3e432bd6ce7611a358568a4619c99f75412f484d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package moe.yuuta.dn42peering.provision;

import io.grpc.ManagedChannel;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.grpc.VertxChannelBuilder;
import moe.yuuta.dn42peering.agent.proto.NodeConfig;
import moe.yuuta.dn42peering.agent.proto.VertxAgentGrpc;
import moe.yuuta.dn42peering.jaba.Pair;
import moe.yuuta.dn42peering.node.INodeService;
import moe.yuuta.dn42peering.node.Node;
import moe.yuuta.dn42peering.peer.IPeerService;
import moe.yuuta.dn42peering.peer.Peer;

import javax.annotation.Nonnull;
import java.util.List;

class ProvisionRemoteServiceImpl implements IProvisionRemoteService {
    private final Vertx vertx;
    private final INodeService nodeService;
    private final IPeerService peerService;

    ProvisionRemoteServiceImpl(@Nonnull Vertx vertx) {
        this.vertx = vertx;
        this.nodeService = INodeService.createProxy(vertx);
        this.peerService = IPeerService.createProxy(vertx);
    }

    private @Nonnull ManagedChannel toChannel(@Nonnull Node node) {
        return VertxChannelBuilder.forAddress(vertx, node.getInternalIp(), node.getInternalPort())
                .usePlaintext()
                .build();
    }

    @Nonnull
    @Override
    public IProvisionRemoteService deploy(long nodeId,
                                          @Nonnull Handler<AsyncResult<Void>> handler) {
        vertx.sharedData().getLockWithTimeout("deploy_" + nodeId, 30 * 1000)
                .<Void>compose(lock -> {
                    return Future.<moe.yuuta.dn42peering.node.Node>future(f -> nodeService.getNode((int)nodeId, f))
                            .compose(node -> {
                                if (node == null) {
                                    return Future.failedFuture("Invalid node");
                                } else {
                                    return Future.succeededFuture(node);
                                }
                            })
                            .compose(node -> {
                                final NodeConfig.Builder builder = NodeConfig.newBuilder();
                                builder.setNode(node.toRPCNode().build());
                                return Future.succeededFuture(new Pair<>(node, builder));
                            })
                            .compose(pair -> {
                                final Node node = pair.a;
                                final NodeConfig.Builder builder = pair.b;
                                return Future.<List<Peer>>future(f -> peerService.listUnderNode(node.getId(), f))
                                        .compose(peers -> {
                                            peers.forEach(peer -> {
                                                builder.addBgps(peer.toBGPConfig());
                                                switch (peer.getType()) {
                                                    case WIREGUARD:
                                                        builder.addWgs(peer.toWireGuardConfig());
                                                        break;
                                                    default:
                                                        throw new IllegalArgumentException("Bug: Unsupported VPN type");
                                                }
                                            });
                                            return Future.succeededFuture(pair);
                                        });
                            })
                            .compose(pair -> {
                                final ManagedChannel channel = toChannel(pair.a);
                                final VertxAgentGrpc.AgentVertxStub stub = VertxAgentGrpc.newVertxStub(channel);
                                return stub.deploy(pair.b.build())
                                        .<Void>compose(reply -> Future.succeededFuture(null))
                                        .onComplete(res -> channel.shutdown());
                            })
                            .compose(_v -> {
                                lock.release();
                                return Future.succeededFuture();
                            });
                })
                .onComplete(handler);
        return this;
    }
}