Basic file uploader implemetation. Does not wait for replication to finish.
diff --git a/master/chunk.go b/master/chunk.go
new file mode 100644
index 0000000..00798c9
--- /dev/null
+++ b/master/chunk.go
@@ -0,0 +1,18 @@
+package master
+
+import (
+ "pcloud/api"
+)
+
+type chunkServerStatus int
+
+const (
+ Healthy chunkServerStatus = iota
+ UNREACHABLE
+)
+
+type chunkServer struct {
+ address string
+ status chunkServerStatus
+ chunks map[string]api.ChunkStatus
+}
diff --git a/master/server.go b/master/server.go
index 70c6d29..b6e76fa 100644
--- a/master/server.go
+++ b/master/server.go
@@ -6,6 +6,7 @@
"math/rand"
"github.com/google/uuid"
+ "google.golang.org/grpc"
"pcloud/api"
)
@@ -44,7 +45,7 @@
}
type MasterServer struct {
- chunkServers []string
+ chunkServers []*chunkServer
blobs []*blob
}
@@ -54,16 +55,18 @@
func (s *MasterServer) AddChunkServer(
ctx context.Context,
- request *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
- s.chunkServers = append(s.chunkServers, request.Address)
- log.Printf("Registered Chunk server: %s", request.Address)
+ req *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
+ s.chunkServers = append(s.chunkServers, &chunkServer{
+ address: req.Address,
+ status: Healthy})
+ log.Printf("Registered Chunk server: %s", req.Address)
return &api.AddChunkServerResponse{}, nil
}
func (s *MasterServer) CreateBlob(
ctx context.Context,
- request *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
- if int(request.NumReplicas) > len(s.chunkServers) {
+ req *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
+ if int(req.NumReplicas) > len(s.chunkServers) {
return nil, nil
}
resp := api.CreateBlobResponse{
@@ -71,17 +74,43 @@
Chunk: []*api.ChunkStorageMetadata{
{ChunkId: uuid.New().String()},
}}
- ids := rand.Perm(len(s.chunkServers))
- for i := 0; i < int(request.NumReplicas); i++ {
- resp.Chunk[0].Server = append(
- resp.Chunk[0].Server,
- s.chunkServers[ids[i]])
+ assigned := 0
+ chunkId := resp.Chunk[0].ChunkId
+ for i := range rand.Perm(len(s.chunkServers)) {
+ if assigned == int(req.NumReplicas) {
+ break
+ }
+ address := s.chunkServers[i].address
+ log.Printf(address)
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err != nil {
+ continue
+ }
+ defer conn.Close()
+ client := api.NewChunkStorageClient(conn)
+ createChunkReq := api.CreateChunkRequest{
+ ChunkId: chunkId,
+ Size: req.SizeBytes}
+ if assigned == 0 {
+ createChunkReq.Role = api.ReplicaRole_PRIMARY
+ } else {
+ createChunkReq.Role = api.ReplicaRole_SECONDARY
+ createChunkReq.PrimaryAddress = resp.Chunk[0].Server[0]
+ }
+ _, err = client.CreateChunk(ctx, &createChunkReq)
+ if err == nil {
+ resp.Chunk[0].Server = append(resp.Chunk[0].Server, address)
+ assigned++
+ }
}
return &resp, nil
}
func (s *MasterServer) GetBlobMetadata(
ctx context.Context,
- request *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
+ req *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
return nil, nil
}