Basic file uploader implemetation. Does not wait for replication to finish.
diff --git a/chunk/replicator.go b/chunk/replicator.go
index 1c77534..fe49862 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -16,7 +16,13 @@
type NonChangingPrimaryReplicaChangeListener struct {
chunkId string
- address string
+ ch chan string
+}
+
+func NewNonChangingPrimaryReplicaChangeListener(chunkId, address string) PrimaryReplicaChangeListener {
+ ch := make(chan string, 1)
+ ch <- address
+ return &NonChangingPrimaryReplicaChangeListener{chunkId, ch}
}
func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
@@ -24,48 +30,54 @@
}
func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
- ch := make(chan string, 1)
- ch <- l.address
- return ch
+ return l.ch
}
-func replicate(ctx context.Context, dst, src Chunk) {
- inp := src.ReadSeeker()
- inp.Seek(int64(src.SizeBytes()), io.SeekStart)
- out := dst.Writer()
+func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
+ dstInfo, err := dst.Stats()
+ if err != nil {
+ panic(err)
+ }
+ inp := src.ReaderAt()
+ replicated := dstInfo.Committed
+ out := dst.WriterAt()
for {
select {
default:
p := make([]byte, 100)
- n, _ := inp.Read(p)
+ n, err := inp.ReadAt(p, int64(replicated))
if n > 0 {
- out.Write(p[:n])
+ m, _ := out.WriteAt(p[:n], int64(replicated))
+ replicated += m
}
+ if err == io.EOF {
+ done <- 1
+ return
+ }
+
case <-ctx.Done():
return
}
}
}
-func dialConn(address string) (*grpc.ClientConn, error) {
- var opts []grpc.DialOption
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithBlock())
- return grpc.Dial(address, opts...)
-
-}
-
-func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+func ReplicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+ var done chan int
var cancel context.CancelFunc = nil
for {
select {
+ case <-done:
+ return
case <-ctx.Done():
return
case address := <-l.Address():
if cancel != nil {
cancel()
}
- conn, err := dialConn(address)
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
if err == nil {
continue
}
@@ -73,7 +85,38 @@
src := RemoteChunk{l.ChunkId(), client}
replicatorCtx, cancelFn := context.WithCancel(context.Background())
cancel = cancelFn
- go replicate(replicatorCtx, dst, &src)
+ done = make(chan int, 1)
+ go replicate(replicatorCtx, dst, &src, done)
+ }
+ }
+}
+
+func WriteToPrimary(ctx context.Context, src Chunk, l PrimaryReplicaChangeListener) {
+ var done chan int
+ var cancel context.CancelFunc = nil
+ for {
+ select {
+ case <-done:
+ return
+ case <-ctx.Done():
+ return
+ case address := <-l.Address():
+ if cancel != nil {
+ cancel()
+ }
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithBlock())
+ conn, err := grpc.Dial(address, opts...)
+ if err != nil {
+ continue
+ }
+ client := api.NewChunkStorageClient(conn)
+ dst := RemoteChunk{l.ChunkId(), client}
+ replicatorCtx, cancelFn := context.WithCancel(context.Background())
+ cancel = cancelFn
+ done = make(chan int, 1)
+ go replicate(replicatorCtx, &dst, src, done)
}
}
}