chunk replication api
diff --git a/chunk/server.go b/chunk/server.go
index 315ec00..2ec19b4 100644
--- a/chunk/server.go
+++ b/chunk/server.go
@@ -8,7 +8,9 @@
import "pcloud/api"
type ChunkServer struct {
- chunks sync.Map
+ factory ChunkFactory
+ chunks sync.Map
+ replicatorCancel sync.Map
}
func NewChunkServer() *ChunkServer {
@@ -17,13 +19,13 @@
func (s *ChunkServer) ListChunks(
ctx context.Context,
- req *api.ListChunksRequest) (*api.ListChunksResponse, error) {
- resp := api.ListChunksResponse{}
+ req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
+ resp = &api.ListChunksResponse{}
s.chunks.Range(func(k, v interface{}) bool {
resp.ChunkId = append(resp.ChunkId, k.(string))
return true
})
- return &resp, nil
+ return
}
func (s *ChunkServer) ReadChunk(
@@ -53,9 +55,36 @@
func (s *ChunkServer) StoreChunk(
ctx context.Context,
- req *api.StoreChunkRequest) (*api.StoreChunkResponse, error) {
- data := req.Data
- chunk := NewInMemoryChunk(&data)
+ req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
+ chunk := s.factory.New()
+ _, err = chunk.Writer().Write(req.Data)
s.chunks.Store(req.ChunkId, chunk)
- return &api.StoreChunkResponse{}, nil
+ if err == nil {
+ resp = &api.StoreChunkResponse{}
+ }
+ return
+}
+
+func (s *ChunkServer) ReplicateChunk(
+ ctx context.Context,
+ req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
+ chunk := s.factory.New()
+ s.chunks.Store(req.ChunkId, chunk)
+ ctx, cancel := context.WithCancel(context.Background())
+ s.replicatorCancel.Store(req.ChunkId, cancel)
+ go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
+ resp = &api.ReplicateChunkResponse{}
+ return
+
+}
+
+func (s *ChunkServer) RemoveChunk(
+ ctx context.Context,
+ req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
+ if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
+ cancel.(context.CancelFunc)()
+ s.replicatorCancel.Delete(req.ChunkId)
+ }
+ s.chunks.Delete(req.ChunkId)
+ return &api.RemoveChunkResponse{}, nil
}