introduce replica assignment change listener
diff --git a/chunk/replicator.go b/chunk/replicator.go
index 5fcabc7..f7f19ea 100644
--- a/chunk/replicator.go
+++ b/chunk/replicator.go
@@ -9,28 +9,22 @@
"github.com/giolekva/pcloud/api"
)
+type ReplicaAssignmentChangeListener interface {
+ Primary(chunkId, currentPrimary string) <-chan string
+}
+
type PrimaryReplicaChangeListener interface {
ChunkId() string
Address() <-chan string
}
-type NonChangingPrimaryReplicaChangeListener struct {
- chunkId string
- ch chan string
+type NonChangingReplicaAssignment struct {
}
-func NewNonChangingPrimaryReplicaChangeListener(chunkId, address string) PrimaryReplicaChangeListener {
+func (l *NonChangingReplicaAssignment) Primary(chunkId, address string) <-chan string {
ch := make(chan string, 1)
ch <- address
- return &NonChangingPrimaryReplicaChangeListener{chunkId, ch}
-}
-
-func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
- return l.chunkId
-}
-
-func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
- return l.ch
+ return ch
}
func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
@@ -61,7 +55,7 @@
}
}
-func ReplicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
+func ReplicateFromPrimary(ctx context.Context, chunkId string, dst Chunk, primaryAddressCh <-chan string) {
var done chan int
var cancel context.CancelFunc = nil
for {
@@ -70,7 +64,7 @@
return
case <-ctx.Done():
return
- case address := <-l.Address():
+ case address := <-primaryAddressCh:
if cancel != nil {
cancel()
}
@@ -82,7 +76,7 @@
continue
}
client := api.NewChunkStorageClient(conn)
- src := RemoteChunk{l.ChunkId(), client}
+ src := RemoteChunk{chunkId, client}
replicatorCtx, cancelFn := context.WithCancel(context.Background())
cancel = cancelFn
done = make(chan int, 1)
@@ -91,7 +85,7 @@
}
}
-func WriteToPrimary(ctx context.Context, src Chunk, l PrimaryReplicaChangeListener) {
+func WriteToPrimary(ctx context.Context, chunkId string, src Chunk, primaryAddressCh <-chan string) {
var done chan int
var cancel context.CancelFunc = nil
for {
@@ -100,7 +94,7 @@
return
case <-ctx.Done():
return
- case address := <-l.Address():
+ case address := <-primaryAddressCh:
if cancel != nil {
cancel()
}
@@ -112,7 +106,7 @@
continue
}
client := api.NewChunkStorageClient(conn)
- dst := RemoteChunk{l.ChunkId(), client}
+ dst := RemoteChunk{chunkId, client}
replicatorCtx, cancelFn := context.WithCancel(context.Background())
cancel = cancelFn
done = make(chan int, 1)
diff --git a/chunk/server.go b/chunk/server.go
index c262509..7f07227 100644
--- a/chunk/server.go
+++ b/chunk/server.go
@@ -11,13 +11,17 @@
)
type ChunkServer struct {
- factory ChunkFactory
- chunks sync.Map
- replicatorCancel sync.Map
+ factory ChunkFactory
+ assignmentChangeLis ReplicaAssignmentChangeListener
+ chunks sync.Map
+ replicatorCancel sync.Map
}
-func NewChunkServer(factory ChunkFactory) *ChunkServer {
- return &ChunkServer{factory: factory}
+func NewChunkServer(factory ChunkFactory,
+ assignmentChangeLis ReplicaAssignmentChangeListener) *ChunkServer {
+ return &ChunkServer{
+ factory: factory,
+ assignmentChangeLis: assignmentChangeLis}
}
func (s *ChunkServer) ListChunks(
@@ -40,10 +44,9 @@
case api.ReplicaRole_SECONDARY:
ctx, cancel := context.WithCancel(context.Background())
s.replicatorCancel.Store(req.ChunkId, cancel)
- primaryListener := NewNonChangingPrimaryReplicaChangeListener(
- req.ChunkId,
- req.PrimaryAddress)
- go ReplicateFromPrimary(ctx, chunk, primaryListener)
+ primaryAddressCh := s.assignmentChangeLis.Primary(
+ req.ChunkId, req.PrimaryAddress)
+ go ReplicateFromPrimary(ctx, req.ChunkId, chunk, primaryAddressCh)
case api.ReplicaRole_PRIMARY:
{
}
diff --git a/chunk.go b/chunk_server.go
similarity index 89%
rename from chunk.go
rename to chunk_server.go
index d8e9f21..8238a73 100644
--- a/chunk.go
+++ b/chunk_server.go
@@ -48,6 +48,8 @@
log.Fatalf("failed to listen: %v", err)
}
server := grpc.NewServer()
- api.RegisterChunkStorageServer(server, chunk.NewChunkServer())
+ api.RegisterChunkStorageServer(server, chunk.NewChunkServer(
+ &chunk.InMemoryChunkFactory{},
+ &chunk.NonChangingReplicaAssignment{}))
server.Serve(lis)
}
diff --git a/client/client.go b/client/client.go
index 99c30fa..974c3e8 100644
--- a/client/client.go
+++ b/client/client.go
@@ -31,11 +31,13 @@
if len(resp.Chunk) != 1 {
panic(resp)
}
- primaryListener := chunk.NewNonChangingPrimaryReplicaChangeListener(
+ lis := &chunk.NonChangingReplicaAssignment{}
+ primaryAddressCh := lis.Primary(
resp.Chunk[0].ChunkId,
resp.Chunk[0].Server[0])
chunk.WriteToPrimary(
context.Background(),
+ resp.Chunk[0].ChunkId,
chunk.NewReadOnlyFileChunk(f, 0, int(info.Size())),
- primaryListener)
+ primaryAddressCh)
}
diff --git a/testing/in_memory_env.go b/testing/in_memory_env.go
index ff6485f..e2aace3 100644
--- a/testing/in_memory_env.go
+++ b/testing/in_memory_env.go
@@ -50,7 +50,9 @@
return nil, err
}
server := grpc.NewServer()
- api.RegisterChunkStorageServer(server, chunk.NewChunkServer(&chunk.InMemoryChunkFactory{}))
+ api.RegisterChunkStorageServer(server, chunk.NewChunkServer(
+ &chunk.InMemoryChunkFactory{},
+ &chunk.NonChangingReplicaAssignment{}))
go server.Serve(lis)
env.c[i] = server
}