chunk replication api
diff --git a/chunk/chunk.go b/chunk/chunk.go
index 8d55fe0..36efcce 100644
--- a/chunk/chunk.go
+++ b/chunk/chunk.go
@@ -1,6 +1,5 @@
 package chunk
 
-import "bytes"
 import "io"
 
 type Chunk interface {
@@ -9,27 +8,6 @@
 	Writer() io.Writer
 }
 
-type InMemoryChunk struct {
-	payload *[]byte
-}
-
-func NewEmptyInMemoryChunk(sizeBytes int) Chunk {
-	payload := make([]byte, sizeBytes)
-	return &InMemoryChunk{payload: &payload}
-}
-
-func NewInMemoryChunk(p *[]byte) Chunk {
-	return &InMemoryChunk{payload: p}
-}
-
-func (c *InMemoryChunk) SizeBytes() int {
-	return len(*c.payload)
-}
-
-func (c *InMemoryChunk) ReadSeeker() io.ReadSeeker {
-	return bytes.NewReader(*c.payload)
-}
-
-func (c *InMemoryChunk) Writer() io.Writer {
-	return bytes.NewBuffer(*c.payload)
+type ChunkFactory interface {
+	New() Chunk
 }
diff --git a/chunk/in_memory.go b/chunk/in_memory.go
new file mode 100644
index 0000000..e9c7f5c
--- /dev/null
+++ b/chunk/in_memory.go
@@ -0,0 +1,27 @@
+package chunk
+
+import "bytes"
+import "io"
+
+type InMemoryChunk struct {
+	payload bytes.Buffer
+}
+
+func (c *InMemoryChunk) SizeBytes() int {
+	return len(c.payload.Bytes())
+}
+
+func (c *InMemoryChunk) ReadSeeker() io.ReadSeeker {
+	return bytes.NewReader(c.payload.Bytes())
+}
+
+func (c *InMemoryChunk) Writer() io.Writer {
+	return &c.payload
+}
+
+type InMemoryChunkFactory struct {
+}
+
+func (f InMemoryChunkFactory) New() Chunk {
+	return &InMemoryChunk{}
+}
diff --git a/chunk/in_memory_test.go b/chunk/in_memory_test.go
new file mode 100644
index 0000000..04b8a09
--- /dev/null
+++ b/chunk/in_memory_test.go
@@ -0,0 +1,25 @@
+package chunk
+
+import "bytes"
+import "testing"
+
+func TestConcurrentReads(t *testing.T) {
+	c := InMemoryChunkFactory{}.New()
+	if _, err := c.Writer().Write([]byte("abcd")); err != nil {
+		panic(err)
+	}
+	d1 := make([]byte, 2)
+	d2 := make([]byte, 3)
+	if _, err := c.ReadSeeker().Read(d1); err != nil {
+		t.Error(err)
+	}
+	if bytes.Compare(d1, []byte("ab")) != 0 {
+		t.Errorf("Expected: %s\nActual: %s", "ab", d1)
+	}
+	if _, err := c.ReadSeeker().Read(d2); err != nil {
+		t.Error(err)
+	}
+	if bytes.Compare(d2, []byte("abc")) != 0 {
+		t.Errorf("Expected: %s\nActual: %s", "abc", d2)
+	}
+}
diff --git a/chunk/primary.go b/chunk/primary.go
new file mode 100644
index 0000000..7120eaa
--- /dev/null
+++ b/chunk/primary.go
@@ -0,0 +1,5 @@
+package chunk
+
+func NewPrimaryReplicaChunk(chunkId, primaryChunkServer string) Chunk {
+	return nil
+}
diff --git a/chunk/remote.go b/chunk/remote.go
new file mode 100644
index 0000000..fb0ce48
--- /dev/null
+++ b/chunk/remote.go
@@ -0,0 +1,58 @@
+package chunk
+
+import "context"
+import "errors"
+import "io"
+
+import "pcloud/api"
+
+type RemoteChunk struct {
+	chunkId string
+	client  api.ChunkStorageClient
+}
+
+func (r *RemoteChunk) SizeBytes() int {
+	return 0
+}
+
+func (r *RemoteChunk) ReadSeeker() io.ReadSeeker {
+	return &remoteChunkReadSeeker{
+		chunkId: r.chunkId,
+		client:  r.client}
+}
+
+func (r *RemoteChunk) Writer() io.Writer {
+	return nil
+}
+
+type remoteChunkReadSeeker struct {
+	chunkId string
+	client  api.ChunkStorageClient
+	offset  int64
+}
+
+func (c *remoteChunkReadSeeker) Seek(offset int64, whence int) (int64, error) {
+	if whence != io.SeekStart {
+		return 0, errors.New("Seek: RemoteChunk only supports SeekStart whence")
+	}
+	c.offset = offset
+	return offset, nil
+}
+
+func (c *remoteChunkReadSeeker) Read(p []byte) (n int, err error) {
+	req := api.ReadChunkRequest{
+		ChunkId:  c.chunkId,
+		Offset:   int32(c.offset), // TODO(lekva): must be int64
+		NumBytes: int32(len(p))}
+	resp, err := c.client.ReadChunk(context.Background(), &req)
+	if err != nil {
+		return
+	}
+	n = copy(p, resp.Data)
+	c.offset += int64(n)
+	return
+}
+
+type PrimaryReplicaChunk struct {
+	chunkId string
+}
diff --git a/chunk/replicator.go b/chunk/replicator.go
index f5543a0..5609266 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -1,10 +1,77 @@
 package chunk
 
+import "context"
 import "io"
 
-func Replicate(from, to Chunk) (n int, err error) {
-	src := from.ReadSeeker()
-	src.Seek(int64(to.SizeBytes()), io.SeekStart)
-	m, err := io.Copy(to.Writer(), src)
-	return int(m), err
+import "google.golang.org/grpc"
+
+import "pcloud/api"
+
+type PrimaryReplicaChangeListener interface {
+	ChunkId() string
+	Address() <-chan string
+}
+
+type NonChangingPrimaryReplicaChangeListener struct {
+	chunkId string
+	address string
+}
+
+func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
+	return l.chunkId
+}
+
+func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
+	ch := make(chan string, 1)
+	ch <- l.address
+	return ch
+}
+
+func replicate(ctx context.Context, dst, src Chunk) {
+	inp := src.ReadSeeker()
+	inp.Seek(int64(src.SizeBytes()), io.SeekStart)
+	out := dst.Writer()
+	for {
+		select {
+		default:
+			p := make([]byte, 100)
+			n, _ := inp.Read(p)
+			if n > 0 {
+				out.Write(p[:n])
+			}
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func dialConn(address string) (*grpc.ClientConn, error) {
+	var opts []grpc.DialOption
+	opts = append(opts, grpc.WithInsecure())
+	opts = append(opts, grpc.WithBlock())
+	return grpc.Dial(address, opts...)
+
+}
+
+func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+	var cancel context.CancelFunc = nil
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case address := <-l.Address():
+			if cancel != nil {
+				cancel()
+			}
+			conn, err := dialConn(address)
+			if err == nil {
+				continue
+			}
+			client := api.NewChunkStorageClient(conn)
+			src := RemoteChunk{l.ChunkId(), client}
+			replicatorCtx, cancelFn := context.WithCancel(context.Background())
+			cancel = cancelFn
+			go replicate(replicatorCtx, dst, &src)
+		}
+	}
 }
diff --git a/chunk/server.go b/chunk/server.go
index 315ec00..2ec19b4 100644
--- a/chunk/server.go
+++ b/chunk/server.go
@@ -8,7 +8,9 @@
 import "pcloud/api"
 
 type ChunkServer struct {
-	chunks sync.Map
+	factory          ChunkFactory
+	chunks           sync.Map
+	replicatorCancel sync.Map
 }
 
 func NewChunkServer() *ChunkServer {
@@ -17,13 +19,13 @@
 
 func (s *ChunkServer) ListChunks(
 	ctx context.Context,
-	req *api.ListChunksRequest) (*api.ListChunksResponse, error) {
-	resp := api.ListChunksResponse{}
+	req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
+	resp = &api.ListChunksResponse{}
 	s.chunks.Range(func(k, v interface{}) bool {
 		resp.ChunkId = append(resp.ChunkId, k.(string))
 		return true
 	})
-	return &resp, nil
+	return
 }
 
 func (s *ChunkServer) ReadChunk(
@@ -53,9 +55,36 @@
 
 func (s *ChunkServer) StoreChunk(
 	ctx context.Context,
-	req *api.StoreChunkRequest) (*api.StoreChunkResponse, error) {
-	data := req.Data
-	chunk := NewInMemoryChunk(&data)
+	req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
+	chunk := s.factory.New()
+	_, err = chunk.Writer().Write(req.Data)
 	s.chunks.Store(req.ChunkId, chunk)
-	return &api.StoreChunkResponse{}, nil
+	if err == nil {
+		resp = &api.StoreChunkResponse{}
+	}
+	return
+}
+
+func (s *ChunkServer) ReplicateChunk(
+	ctx context.Context,
+	req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
+	chunk := s.factory.New()
+	s.chunks.Store(req.ChunkId, chunk)
+	ctx, cancel := context.WithCancel(context.Background())
+	s.replicatorCancel.Store(req.ChunkId, cancel)
+	go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
+	resp = &api.ReplicateChunkResponse{}
+	return
+
+}
+
+func (s *ChunkServer) RemoveChunk(
+	ctx context.Context,
+	req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
+	if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
+		cancel.(context.CancelFunc)()
+		s.replicatorCancel.Delete(req.ChunkId)
+	}
+	s.chunks.Delete(req.ChunkId)
+	return &api.RemoveChunkResponse{}, nil
 }
diff --git a/chunk/server_test.go b/chunk/server_test.go
index 834de89..1a808d3 100644
--- a/chunk/server_test.go
+++ b/chunk/server_test.go
@@ -7,7 +7,7 @@
 import "pcloud/api"
 
 func TestStoreChunk(t *testing.T) {
-	s := NewChunkServer()
+	s := ChunkServer{factory: &InMemoryChunkFactory{}}
 	_, err := s.StoreChunk(context.Background(), &api.StoreChunkRequest{
 		ChunkId: "foo",
 		Data:    []byte("hello world")})
@@ -17,7 +17,7 @@
 }
 
 func TestStoreAndReadChunk(t *testing.T) {
-	s := NewChunkServer()
+	s := ChunkServer{factory: &InMemoryChunkFactory{}}
 	_, err := s.StoreChunk(context.Background(), &api.StoreChunkRequest{
 		ChunkId: "foo",
 		Data:    []byte("hello world")})
@@ -35,7 +35,7 @@
 }
 
 func TestReadWithOffsets(t *testing.T) {
-	s := NewChunkServer()
+	s := ChunkServer{factory: &InMemoryChunkFactory{}}
 	_, err := s.StoreChunk(context.Background(), &api.StoreChunkRequest{
 		ChunkId: "foo",
 		Data:    []byte("hello world")})