chunk replication api
diff --git a/chunk/replicator.go b/chunk/replicator.go
index f5543a0..5609266 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -1,10 +1,77 @@
package chunk
+import "context"
import "io"
-func Replicate(from, to Chunk) (n int, err error) {
- src := from.ReadSeeker()
- src.Seek(int64(to.SizeBytes()), io.SeekStart)
- m, err := io.Copy(to.Writer(), src)
- return int(m), err
+import "google.golang.org/grpc"
+
+import "pcloud/api"
+
+type PrimaryReplicaChangeListener interface {
+ ChunkId() string
+ Address() <-chan string
+}
+
+type NonChangingPrimaryReplicaChangeListener struct {
+ chunkId string
+ address string
+}
+
+func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
+ return l.chunkId
+}
+
+func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
+ ch := make(chan string, 1)
+ ch <- l.address
+ return ch
+}
+
+func replicate(ctx context.Context, dst, src Chunk) {
+ inp := src.ReadSeeker()
+ inp.Seek(int64(src.SizeBytes()), io.SeekStart)
+ out := dst.Writer()
+ for {
+ select {
+ default:
+ p := make([]byte, 100)
+ n, _ := inp.Read(p)
+ if n > 0 {
+ out.Write(p[:n])
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func dialConn(address string) (*grpc.ClientConn, error) {
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ return grpc.Dial(address, opts...)
+
+}
+
+func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+ var cancel context.CancelFunc = nil
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case address := <-l.Address():
+ if cancel != nil {
+ cancel()
+ }
+ conn, err := dialConn(address)
+ if err == nil {
+ continue
+ }
+ client := api.NewChunkStorageClient(conn)
+ src := RemoteChunk{l.ChunkId(), client}
+ replicatorCtx, cancelFn := context.WithCancel(context.Background())
+ cancel = cancelFn
+ go replicate(replicatorCtx, dst, &src)
+ }
+ }
}