Move chunk/master data structures into libs
diff --git a/.gitignore b/.gitignore
index c61a5e8..5c58399 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,2 @@
+core
*.pb.go
diff --git a/Dockerfile b/Dockerfile
index 698a47b..40668eb 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -15,8 +15,6 @@
ENV GOBIN=$GOPATH/bin
ENV PATH=$GOBIN:$GOROOT/bin:$PATH
-RUN go version
-
RUN go get -u google.golang.org/grpc
WORKDIR /src/protoc
@@ -27,5 +25,6 @@
RUN go get -u github.com/golang/protobuf/protoc-gen-go
RUN go get -u google.golang.org/protobuf/encoding/prototext
+RUN go get -u github.com/google/uuid
WORKDIR /src/go/src/pcloud
diff --git a/pfs.proto b/api/api.proto
similarity index 96%
rename from pfs.proto
rename to api/api.proto
index 4c338f6..ecd9dde 100644
--- a/pfs.proto
+++ b/api/api.proto
@@ -1,6 +1,8 @@
syntax = "proto3";
-package pcloud;
+package pcloud.api;
+
+option go_package = "api";
message Chunk {
string chunk_id = 1;
diff --git a/chunk.go b/chunk.go
new file mode 100644
index 0000000..371db9e
--- /dev/null
+++ b/chunk.go
@@ -0,0 +1,51 @@
+package main
+
+import "context"
+import "flag"
+import "fmt"
+import "log"
+import "net"
+import "time"
+
+import "google.golang.org/grpc"
+
+import "pcloud/api"
+import "pcloud/chunk"
+
+var masterAddress = flag.String("master", "localhost:123", "Metadata storage address.")
+var selfAddress = flag.String("self", "", "Metadata storage address.")
+
+func main() {
+ flag.Parse()
+ log.Print("Chunk server starting")
+
+ // Create Master server client.
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(*masterAddress, opts...)
+ if err != nil {
+ log.Fatalf("Failed to dial %s: %v", *masterAddress, err)
+ }
+ defer conn.Close()
+ client := api.NewMetadataStorageClient(conn)
+
+ // Register current Chunk server with Master.
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ _, err = client.AddChunkServer(
+ ctx,
+ &api.AddChunkServerRequest{Address: *selfAddress})
+ if err != nil {
+ log.Fatalf("failed to register chunk server: %v", err)
+ }
+ log.Print("Registered myself")
+
+ // Start RPC server
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 234))
+ if err != nil {
+ log.Fatalf("failed to listen: %v", err)
+ }
+ server := grpc.NewServer()
+ api.RegisterChunkStorageServer(server, chunk.NewChunkServer())
+ server.Serve(lis)
+}
diff --git a/chunk/chunk.go b/chunk/chunk.go
deleted file mode 100644
index 1dde057..0000000
--- a/chunk/chunk.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package main
-
-import "context"
-import "flag"
-import "fmt"
-import "log"
-import "net"
-import "time"
-
-import "google.golang.org/grpc"
-
-import pc "pcloud"
-
-var masterAddress string
-var selfAddress string
-
-func init() {
- flag.StringVar(&masterAddress, "master", "localhost:123", "Metadata storage address.")
- flag.StringVar(&selfAddress, "self", "", "Metadata storage address.")
-}
-
-type chunkStorage struct {
-}
-
-func (s *chunkStorage) ListChunks(
- ctx context.Context,
- request *pc.ListChunksRequest) (*pc.ListChunksResponse, error) {
- return nil, nil
-}
-
-func (s *chunkStorage) ReadChunk(
- ctx context.Context,
- request *pc.ReadChunkRequest) (*pc.ReadChunkResponse, error) {
- return nil, nil
-}
-
-func (s *chunkStorage) StoreChunk(
- ctx context.Context,
- request *pc.StoreChunkRequest) (*pc.StoreChunkResponse, error) {
- return nil, nil
-}
-
-func main() {
- flag.Parse()
- log.Print("Chunk server starting")
-
- // Create Master server client.
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- conn, err := grpc.Dial(masterAddress, opts...)
- if err != nil {
- log.Fatalf("Failed to dial %s: %v", masterAddress, err)
- }
- defer conn.Close()
- client := pc.NewMetadataStorageClient(conn)
-
- // Register current Chunk server with Master.
- ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
- _, err = client.AddChunkServer(
- ctx,
- &pc.AddChunkServerRequest{Address: selfAddress})
- if err != nil {
- log.Fatalf("failed to register chunk server: %v", err)
- }
- log.Print("Registered myself")
-
- // Start RPC server
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 234))
- if err != nil {
- log.Fatalf("failed to listen: %v", err)
- }
- server := grpc.NewServer()
- pc.RegisterChunkStorageServer(server, &chunkStorage{})
- server.Serve(lis)
-}
diff --git a/chunk/server.go b/chunk/server.go
new file mode 100644
index 0000000..c35d761
--- /dev/null
+++ b/chunk/server.go
@@ -0,0 +1,41 @@
+package chunk
+
+import "context"
+import "sync"
+
+import "pcloud/api"
+
+type ChunkServer struct {
+ chunks sync.Map
+}
+
+func NewChunkServer() *ChunkServer {
+ return &ChunkServer{}
+}
+
+func (s *ChunkServer) ListChunks(
+ ctx context.Context,
+ request *api.ListChunksRequest) (*api.ListChunksResponse, error) {
+ resp := api.ListChunksResponse{}
+ s.chunks.Range(func(k, v interface{}) bool {
+ resp.ChunkId = append(resp.ChunkId, k.(string))
+ return true
+ })
+ return &resp, nil
+}
+
+func (s *ChunkServer) ReadChunk(
+ ctx context.Context,
+ request *api.ReadChunkRequest) (*api.ReadChunkResponse, error) {
+ if data, ok := s.chunks.Load(request.ChunkId); ok {
+ return &api.ReadChunkResponse{Data: data.([]byte)}, nil
+ }
+ return nil, nil
+}
+
+func (s *ChunkServer) StoreChunk(
+ ctx context.Context,
+ request *api.StoreChunkRequest) (*api.StoreChunkResponse, error) {
+ s.chunks.Store(request.ChunkId, request.Data)
+ return &api.StoreChunkResponse{}, nil
+}
diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml
index 449013e..983f892 100644
--- a/k8s/deployment.yaml
+++ b/k8s/deployment.yaml
@@ -36,7 +36,7 @@
- name: code
mountPath: /src/go/src/pcloud
command: ["/bin/sh"]
- args: ["-c", "protoc pfs.proto --go_out=plugins=grpc:. && cd master && go install master.go && master --port=123"]
+ args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go install master.go && master --port=123"]
volumes:
- name: code
hostPath:
@@ -71,7 +71,7 @@
- name: code
mountPath: /src/go/src/pcloud
command: ["/bin/sh"]
- args: ["-c", "protoc pfs.proto --go_out=plugins=grpc:. && cd chunk && go install chunk.go && chunk --master=pcloud-master-service:111 --self=$(SELF_IP):234"]
+ args: ["-c", "protoc api/api.proto --go_out=plugins=grpc:. && go install chunk.go && chunk --master=pcloud-master-service:111 --self=$(SELF_IP):234"]
volumes:
- name: code
hostPath:
diff --git a/master.go b/master.go
new file mode 100644
index 0000000..f973e85
--- /dev/null
+++ b/master.go
@@ -0,0 +1,27 @@
+package main
+
+import "flag"
+import "fmt"
+import "log"
+import "net"
+
+import "google.golang.org/grpc"
+
+import "pcloud/api"
+import "pcloud/master"
+
+var port = flag.Int("port", 123, "Port to listen on.")
+
+func main() {
+ flag.Parse()
+ log.Print("Master server starting")
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
+ if err != nil {
+ log.Fatalf("Failed to listen on port %d: %v", *port, err)
+ }
+ log.Printf("Listening on port: %d", *port)
+ server := grpc.NewServer()
+ api.RegisterMetadataStorageServer(server, master.NewMasterServer())
+ log.Print("Master serving")
+ server.Serve(lis)
+}
diff --git a/master/core b/master/core
deleted file mode 100644
index e69de29..0000000
--- a/master/core
+++ /dev/null
diff --git a/master/master.go b/master/master.go
deleted file mode 100644
index 73ac143..0000000
--- a/master/master.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package main
-
-import "context"
-import "flag"
-import "fmt"
-import "log"
-import "net"
-
-import "google.golang.org/grpc"
-
-import pc "pcloud"
-
-var port int
-
-func init() {
- flag.IntVar(&port, "port", 123, "Port to listen on.")
-}
-
-type chunkServer struct {
- address string
-}
-
-type metadataStorage struct {
- chunkServer []string
-}
-
-func (s *metadataStorage) AddChunkServer(
- ctx context.Context,
- request *pc.AddChunkServerRequest) (*pc.AddChunkServerResponse, error) {
- s.chunkServer = append(s.chunkServer, request.GetAddress())
- log.Printf("Registered Chunk server: %s", request.GetAddress())
- return &pc.AddChunkServerResponse{}, nil
-}
-
-func (s *metadataStorage) CreateBlob(
- ctx context.Context,
- request *pc.CreateBlobRequest) (*pc.CreateBlobResponse, error) {
- return nil, nil
-}
-
-func (s *metadataStorage) GetBlobMetadata(
- ctx context.Context,
- request *pc.GetBlobMetadataRequest) (*pc.GetBlobMetadataResponse, error) {
- return nil, nil
-}
-
-func main() {
- flag.Parse()
- log.Print("Master server starting")
-
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
- if err != nil {
- log.Fatalf("Failed to listen on port %d: %v", port, err)
- }
- log.Printf("Listening on port: %d", port)
- server := grpc.NewServer()
- pc.RegisterMetadataStorageServer(server, &metadataStorage{})
- log.Print("Master serving")
- server.Serve(lis)
-}
diff --git a/master/server.go b/master/server.go
new file mode 100644
index 0000000..2eb2898
--- /dev/null
+++ b/master/server.go
@@ -0,0 +1,85 @@
+package master
+
+import "context"
+import "log"
+import "math/rand"
+
+import "github.com/google/uuid"
+
+import "pcloud/api"
+
+type chunkServers struct {
+ address string
+}
+
+type BlobStatus int
+
+const (
+ NEW BlobStatus = iota
+)
+
+type ChunkStatus int
+
+const (
+ ASSIGNED ChunkStatus = iota
+ STORED
+)
+
+type chunkReplica struct {
+ chunkServer string
+ status ChunkStatus
+}
+
+type chunk struct {
+ id string
+ replica []chunkReplica
+}
+
+type blob struct {
+ id string
+ status BlobStatus
+ chunks []chunk
+}
+
+type MasterServer struct {
+ chunkServers []string
+ blobs []*blob
+}
+
+func NewMasterServer() *MasterServer {
+ return &MasterServer{}
+}
+
+func (s *MasterServer) AddChunkServer(
+ ctx context.Context,
+ request *api.AddChunkServerRequest) (*api.AddChunkServerResponse, error) {
+ s.chunkServers = append(s.chunkServers, request.Address)
+ log.Printf("Registered Chunk server: %s", request.Address)
+ return &api.AddChunkServerResponse{}, nil
+}
+
+func (s *MasterServer) CreateBlob(
+ ctx context.Context,
+ request *api.CreateBlobRequest) (*api.CreateBlobResponse, error) {
+ if int(request.NumReplicas) > len(s.chunkServers) {
+ return nil, nil
+ }
+ resp := api.CreateBlobResponse{
+ BlobId: uuid.New().String(),
+ Chunk: []*api.ChunkStorageMetadata{
+ {ChunkId: uuid.New().String()},
+ }}
+ ids := rand.Perm(len(s.chunkServers))
+ for i := 0; i < int(request.NumReplicas); i++ {
+ resp.Chunk[0].Server = append(
+ resp.Chunk[0].Server,
+ s.chunkServers[ids[i]])
+ }
+ return &resp, nil
+}
+
+func (s *MasterServer) GetBlobMetadata(
+ ctx context.Context,
+ request *api.GetBlobMetadataRequest) (*api.GetBlobMetadataResponse, error) {
+ return nil, nil
+}