introduce replica assignment change listener
diff --git a/chunk/replicator.go b/chunk/replicator.go
index 5fcabc7..f7f19ea 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -9,28 +9,22 @@
"github.com/giolekva/pcloud/api"
)
+type ReplicaAssignmentChangeListener interface {
+ Primary(chunkId, currentPrimary string) <-chan string
+}
+
type PrimaryReplicaChangeListener interface {
ChunkId() string
Address() <-chan string
}
-type NonChangingPrimaryReplicaChangeListener struct {
- chunkId string
- ch chan string
+type NonChangingReplicaAssignment struct {
}
-func NewNonChangingPrimaryReplicaChangeListener(chunkId, address string) PrimaryReplicaChangeListener {
+func (l *NonChangingReplicaAssignment) Primary(chunkId, address string) <-chan string {
ch := make(chan string, 1)
ch <- address
- return &NonChangingPrimaryReplicaChangeListener{chunkId, ch}
-}
-
-func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
- return l.chunkId
-}
-
-func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
- return l.ch
+ return ch
}
func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
@@ -61,7 +55,7 @@
}
}
-func ReplicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+func ReplicateFromPrimary(ctx context.Context, chunkId string, dst Chunk, primaryAddressCh <-chan string) {
var done chan int
var cancel context.CancelFunc = nil
for {
@@ -70,7 +64,7 @@
return
case <-ctx.Done():
return
- case address := <-l.Address():
+ case address := <-primaryAddressCh:
if cancel != nil {
cancel()
}
@@ -82,7 +76,7 @@
continue
}
client := api.NewChunkStorageClient(conn)
- src := RemoteChunk{l.ChunkId(), client}
+ src := RemoteChunk{chunkId, client}
replicatorCtx, cancelFn := context.WithCancel(context.Background())
cancel = cancelFn
done = make(chan int, 1)
@@ -91,7 +85,7 @@
}
}
-func WriteToPrimary(ctx context.Context, src Chunk, l PrimaryReplicaChangeListener) {
+func WriteToPrimary(ctx context.Context, chunkId string, src Chunk, primaryAddressCh <-chan string) {
var done chan int
var cancel context.CancelFunc = nil
for {
@@ -100,7 +94,7 @@
return
case <-ctx.Done():
return
- case address := <-l.Address():
+ case address := <-primaryAddressCh:
if cancel != nil {
cancel()
}
@@ -112,7 +106,7 @@
continue
}
client := api.NewChunkStorageClient(conn)
- dst := RemoteChunk{l.ChunkId(), client}
+ dst := RemoteChunk{chunkId, client}
replicatorCtx, cancelFn := context.WithCancel(context.Background())
cancel = cancelFn
done = make(chan int, 1)