chunk replication api
diff --git a/chunk/chunk.go b/chunk/chunk.go
index 8d55fe0..36efcce 100644
--- a/chunk/chunk.go
+++ b/chunk/chunk.go
@@ -1,6 +1,5 @@
package chunk
-import "bytes"
import "io"
type Chunk interface {
@@ -9,27 +8,6 @@
Writer() io.Writer
}
-type InMemoryChunk struct {
- payload *[]byte
-}
-
-func NewEmptyInMemoryChunk(sizeBytes int) Chunk {
- payload := make([]byte, sizeBytes)
- return &InMemoryChunk{payload: &payload}
-}
-
-func NewInMemoryChunk(p *[]byte) Chunk {
- return &InMemoryChunk{payload: p}
-}
-
-func (c *InMemoryChunk) SizeBytes() int {
- return len(*c.payload)
-}
-
-func (c *InMemoryChunk) ReadSeeker() io.ReadSeeker {
- return bytes.NewReader(*c.payload)
-}
-
-func (c *InMemoryChunk) Writer() io.Writer {
- return bytes.NewBuffer(*c.payload)
+type ChunkFactory interface {
+ New() Chunk
}
diff --git a/chunk/in_memory.go b/chunk/in_memory.go
new file mode 100644
index 0000000..e9c7f5c
--- /dev/null
+++ b/chunk/in_memory.go
@@ -0,0 +1,27 @@
+package chunk
+
+import "bytes"
+import "io"
+
+type InMemoryChunk struct {
+ payload bytes.Buffer
+}
+
+func (c *InMemoryChunk) SizeBytes() int {
+ return len(c.payload.Bytes())
+}
+
+func (c *InMemoryChunk) ReadSeeker() io.ReadSeeker {
+ return bytes.NewReader(c.payload.Bytes())
+}
+
+func (c *InMemoryChunk) Writer() io.Writer {
+ return &c.payload
+}
+
+type InMemoryChunkFactory struct {
+}
+
+func (f InMemoryChunkFactory) New() Chunk {
+ return &InMemoryChunk{}
+}
diff --git a/chunk/in_memory_test.go b/chunk/in_memory_test.go
new file mode 100644
index 0000000..04b8a09
--- /dev/null
+++ b/chunk/in_memory_test.go
@@ -0,0 +1,25 @@
+package chunk
+
+import "bytes"
+import "testing"
+
+func TestConcurrentReads(t *testing.T) {
+ c := InMemoryChunkFactory{}.New()
+ if _, err := c.Writer().Write([]byte("abcd")); err != nil {
+ panic(err)
+ }
+ d1 := make([]byte, 2)
+ d2 := make([]byte, 3)
+ if _, err := c.ReadSeeker().Read(d1); err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(d1, []byte("ab")) != 0 {
+ t.Errorf("Expected: %s\nActual: %s", "ab", d1)
+ }
+ if _, err := c.ReadSeeker().Read(d2); err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(d2, []byte("abc")) != 0 {
+ t.Errorf("Expected: %s\nActual: %s", "abc", d2)
+ }
+}
diff --git a/chunk/primary.go b/chunk/primary.go
new file mode 100644
index 0000000..7120eaa
--- /dev/null
+++ b/chunk/primary.go
@@ -0,0 +1,5 @@
+package chunk
+
+func NewPrimaryReplicaChunk(chunkId, primaryChunkServer string) Chunk {
+ return nil
+}
diff --git a/chunk/remote.go b/chunk/remote.go
new file mode 100644
index 0000000..fb0ce48
--- /dev/null
+++ b/chunk/remote.go
@@ -0,0 +1,58 @@
+package chunk
+
+import "context"
+import "errors"
+import "io"
+
+import "pcloud/api"
+
+type RemoteChunk struct {
+ chunkId string
+ client api.ChunkStorageClient
+}
+
+func (r *RemoteChunk) SizeBytes() int {
+ return 0
+}
+
+func (r *RemoteChunk) ReadSeeker() io.ReadSeeker {
+ return &remoteChunkReadSeeker{
+ chunkId: r.chunkId,
+ client: r.client}
+}
+
+func (r *RemoteChunk) Writer() io.Writer {
+ return nil
+}
+
+type remoteChunkReadSeeker struct {
+ chunkId string
+ client api.ChunkStorageClient
+ offset int64
+}
+
+func (c *remoteChunkReadSeeker) Seek(offset int64, whence int) (int64, error) {
+ if whence != io.SeekStart {
+ return 0, errors.New("Seek: RemoteChunk only supports SeekStart whence")
+ }
+ c.offset = offset
+ return offset, nil
+}
+
+func (c *remoteChunkReadSeeker) Read(p []byte) (n int, err error) {
+ req := api.ReadChunkRequest{
+ ChunkId: c.chunkId,
+ Offset: int32(c.offset), // TODO(lekva): must be int64
+ NumBytes: int32(len(p))}
+ resp, err := c.client.ReadChunk(context.Background(), &req)
+ if err != nil {
+ return
+ }
+ n = copy(p, resp.Data)
+ c.offset += int64(n)
+ return
+}
+
+type PrimaryReplicaChunk struct {
+ chunkId string
+}
diff --git a/chunk/replicator.go b/chunk/replicator.go
index f5543a0..5609266 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -1,10 +1,77 @@
package chunk
+import "context"
import "io"
-func Replicate(from, to Chunk) (n int, err error) {
- src := from.ReadSeeker()
- src.Seek(int64(to.SizeBytes()), io.SeekStart)
- m, err := io.Copy(to.Writer(), src)
- return int(m), err
+import "google.golang.org/grpc"
+
+import "pcloud/api"
+
+type PrimaryReplicaChangeListener interface {
+ ChunkId() string
+ Address() <-chan string
+}
+
+type NonChangingPrimaryReplicaChangeListener struct {
+ chunkId string
+ address string
+}
+
+func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
+ return l.chunkId
+}
+
+func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
+ ch := make(chan string, 1)
+ ch <- l.address
+ return ch
+}
+
+func replicate(ctx context.Context, dst, src Chunk) {
+ inp := src.ReadSeeker()
+ inp.Seek(int64(src.SizeBytes()), io.SeekStart)
+ out := dst.Writer()
+ for {
+ select {
+ default:
+ p := make([]byte, 100)
+ n, _ := inp.Read(p)
+ if n > 0 {
+ out.Write(p[:n])
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func dialConn(address string) (*grpc.ClientConn, error) {
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ return grpc.Dial(address, opts...)
+
+}
+
+func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+ var cancel context.CancelFunc = nil
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case address := <-l.Address():
+ if cancel != nil {
+ cancel()
+ }
+ conn, err := dialConn(address)
+ if err == nil {
+ continue
+ }
+ client := api.NewChunkStorageClient(conn)
+ src := RemoteChunk{l.ChunkId(), client}
+ replicatorCtx, cancelFn := context.WithCancel(context.Background())
+ cancel = cancelFn
+ go replicate(replicatorCtx, dst, &src)
+ }
+ }
}
diff --git a/chunk/server.go b/chunk/server.go
index 315ec00..2ec19b4 100644
--- a/chunk/server.go
+++ b/chunk/server.go
@@ -8,7 +8,9 @@
import "pcloud/api"
type ChunkServer struct {
- chunks sync.Map
+ factory ChunkFactory
+ chunks sync.Map
+ replicatorCancel sync.Map
}
func NewChunkServer() *ChunkServer {
@@ -17,13 +19,13 @@
func (s *ChunkServer) ListChunks(
ctx context.Context,
- req *api.ListChunksRequest) (*api.ListChunksResponse, error) {
- resp := api.ListChunksResponse{}
+ req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
+ resp = &api.ListChunksResponse{}
s.chunks.Range(func(k, v interface{}) bool {
resp.ChunkId = append(resp.ChunkId, k.(string))
return true
})
- return &resp, nil
+ return
}
func (s *ChunkServer) ReadChunk(
@@ -53,9 +55,36 @@
func (s *ChunkServer) StoreChunk(
ctx context.Context,
- req *api.StoreChunkRequest) (*api.StoreChunkResponse, error) {
- data := req.Data
- chunk := NewInMemoryChunk(&data)
+ req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
+ chunk := s.factory.New()
+ _, err = chunk.Writer().Write(req.Data)
s.chunks.Store(req.ChunkId, chunk)
- return &api.StoreChunkResponse{}, nil
+ if err == nil {
+ resp = &api.StoreChunkResponse{}
+ }
+ return
+}
+
+func (s *ChunkServer) ReplicateChunk(
+ ctx context.Context,
+ req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
+ chunk := s.factory.New()
+ s.chunks.Store(req.ChunkId, chunk)
+ ctx, cancel := context.WithCancel(context.Background())
+ s.replicatorCancel.Store(req.ChunkId, cancel)
+ go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
+ resp = &api.ReplicateChunkResponse{}
+ return
+
+}
+
+func (s *ChunkServer) RemoveChunk(
+ ctx context.Context,
+ req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
+ if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
+ cancel.(context.CancelFunc)()
+ s.replicatorCancel.Delete(req.ChunkId)
+ }
+ s.chunks.Delete(req.ChunkId)
+ return &api.RemoveChunkResponse{}, nil
}
diff --git a/chunk/server_test.go b/chunk/server_test.go
index 834de89..1a808d3 100644
--- a/chunk/server_test.go
+++ b/chunk/server_test.go
@@ -7,7 +7,7 @@
import "pcloud/api"
func TestStoreChunk(t *testing.T) {
- s := NewChunkServer()
+ s := ChunkServer{factory: &InMemoryChunkFactory{}}
_, err := s.StoreChunk(context.Background(), &api.StoreChunkRequest{
ChunkId: "foo",
Data: []byte("hello world")})
@@ -17,7 +17,7 @@
}
func TestStoreAndReadChunk(t *testing.T) {
- s := NewChunkServer()
+ s := ChunkServer{factory: &InMemoryChunkFactory{}}
_, err := s.StoreChunk(context.Background(), &api.StoreChunkRequest{
ChunkId: "foo",
Data: []byte("hello world")})
@@ -35,7 +35,7 @@
}
func TestReadWithOffsets(t *testing.T) {
- s := NewChunkServer()
+ s := ChunkServer{factory: &InMemoryChunkFactory{}}
_, err := s.StoreChunk(context.Background(), &api.StoreChunkRequest{
ChunkId: "foo",
Data: []byte("hello world")})