blob: d8990a7f6b40250e6e166e4d361865cbbfa9967c [file] [log] [blame]
giolekva892a4e22020-04-27 16:46:22 +04001package chunk
2
3import (
4 "context"
5 "io"
6
7 "google.golang.org/grpc"
8
9 "github.com/giolekva/pcloud/pfs/api"
10)
11
12type ReplicaAssignmentChangeListener interface {
13 Primary(chunkId, currentPrimary string) <-chan string
14}
15
16type PrimaryReplicaChangeListener interface {
17 ChunkId() string
18 Address() <-chan string
19}
20
21type NonChangingReplicaAssignment struct {
22}
23
24func (l *NonChangingReplicaAssignment) Primary(chunkId, address string) <-chan string {
25 ch := make(chan string, 1)
26 ch <- address
27 return ch
28}
29
30func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
31 dstInfo, err := dst.Stats()
32 if err != nil {
33 panic(err)
34 }
35 inp := src.ReaderAt()
36 replicated := dstInfo.Committed
37 out := dst.WriterAt()
38 for {
39 select {
40 default:
41 p := make([]byte, 100)
42 n, err := inp.ReadAt(p, int64(replicated))
43 if n > 0 {
44 m, _ := out.WriteAt(p[:n], int64(replicated))
45 replicated += m
46 }
47 if err == io.EOF {
48 done <- 1
49 return
50 }
51
52 case <-ctx.Done():
53 return
54 }
55 }
56}
57
58func ReplicateFromPrimary(ctx context.Context, chunkId string, dst Chunk, primaryAddressCh <-chan string) {
59 var done chan int
60 var cancel context.CancelFunc = nil
61 for {
62 select {
63 case <-done:
64 return
65 case <-ctx.Done():
66 return
67 case address := <-primaryAddressCh:
68 if cancel != nil {
69 cancel()
70 }
71 var opts []grpc.DialOption
72 opts = append(opts, grpc.WithInsecure())
73 opts = append(opts, grpc.WithBlock())
74 conn, err := grpc.Dial(address, opts...)
75 if err == nil {
76 continue
77 }
78 client := api.NewChunkStorageClient(conn)
79 src := RemoteChunk{chunkId, client}
80 replicatorCtx, cancelFn := context.WithCancel(context.Background())
81 cancel = cancelFn
82 done = make(chan int, 1)
83 go replicate(replicatorCtx, dst, &src, done)
84 }
85 }
86}
87
88func WriteToPrimary(ctx context.Context, chunkId string, src Chunk, primaryAddressCh <-chan string) {
89 var done chan int
90 var cancel context.CancelFunc = nil
91 for {
92 select {
93 case <-done:
94 return
95 case <-ctx.Done():
96 return
97 case address := <-primaryAddressCh:
98 if cancel != nil {
99 cancel()
100 }
101 var opts []grpc.DialOption
102 opts = append(opts, grpc.WithInsecure())
103 opts = append(opts, grpc.WithBlock())
104 conn, err := grpc.Dial(address, opts...)
105 if err != nil {
106 continue
107 }
108 client := api.NewChunkStorageClient(conn)
109 dst := RemoteChunk{chunkId, client}
110 replicatorCtx, cancelFn := context.WithCancel(context.Background())
111 cancel = cancelFn
112 done = make(chan int, 1)
113 go replicate(replicatorCtx, &dst, src, done)
114 }
115 }
116}