blob: 885ad558f1ab3e436bfa5a72096423a40e7dda29 [file] [log] [blame]
giolekva892a4e22020-04-27 16:46:22 +04001package controller
2
3import (
4 "context"
5 "log"
6 "math/rand"
7
8 "github.com/google/uuid"
9 "google.golang.org/grpc"
10
11 "github.com/giolekva/pcloud/pfs/api"
12)
13
14type chunkServers struct {
15 address string
16}
17
18type BlobStatus int
19
20const (
21 NEW BlobStatus = iota
22)
23
24type ChunkStatus int
25
26const (
27 ASSIGNED ChunkStatus = iota
28 STORED
29)
30
31type chunkReplica struct {
32 chunkServer string
33 status ChunkStatus
34}
35
36type chunk struct {
37 id string
38 replica []chunkReplica
39}
40
41type blob struct {
42 id string
43 status BlobStatus
44 chunks []chunk
45}
46
47type MasterServer struct {
48 chunkServers []*chunkServer
49 blobs []*blob
50}
51
52func NewMasterServer() *MasterServer {
53 return &MasterServer{}
54}
55
56func (s *MasterServer) AddChunkServer(
57 ctx context.Context,
58 req *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
59 s.chunkServers = append(s.chunkServers, &chunkServer{
60 address: req.Address,
61 status: Healthy})
62 log.Printf("Registered Chunk server: %s", req.Address)
63 return &api.AddChunkServerResponse{}, nil
64}
65
66func (s *MasterServer) CreateBlob(
67 ctx context.Context,
68 req *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
69 if int(req.NumReplicas) > len(s.chunkServers) {
70 return nil, nil
71 }
72 resp := api.CreateBlobResponse{
73 BlobId: uuid.New().String(),
74 Chunk: []*api.ChunkStorageMetadata{
75 {ChunkId: uuid.New().String()},
76 }}
77 assigned := 0
78 chunkId := resp.Chunk[0].ChunkId
79 for i := range rand.Perm(len(s.chunkServers)) {
80 if assigned == int(req.NumReplicas) {
81 break
82 }
83 address := s.chunkServers[i].address
84 log.Printf(address)
85 var opts []grpc.DialOption
86 opts = append(opts, grpc.WithInsecure())
87 opts = append(opts, grpc.WithBlock())
88 conn, err := grpc.Dial(address, opts...)
89 if err != nil {
90 continue
91 }
92 defer conn.Close()
93 client := api.NewChunkStorageClient(conn)
94 createChunkReq := api.CreateChunkRequest{
95 ChunkId: chunkId,
96 Size: req.SizeBytes}
97 if assigned == 0 {
98 createChunkReq.Role = api.ReplicaRole_PRIMARY
99 } else {
100 createChunkReq.Role = api.ReplicaRole_SECONDARY
101 createChunkReq.PrimaryAddress = resp.Chunk[0].Server[0]
102 }
103 _, err = client.CreateChunk(ctx, &createChunkReq)
104 if err == nil {
105 resp.Chunk[0].Server = append(resp.Chunk[0].Server, address)
106 assigned++
107 }
108 }
109 return &resp, nil
110}
111
112func (s *MasterServer) GetBlobMetadata(
113 ctx context.Context,
114 req *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
115 return nil, nil
116}