blob: 56092666c7e0db79a6a671827893a74146a1697d [file] [log] [blame]
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04001package chunk
2
giolekva7be17df2020-03-21 13:57:02 +04003import "context"
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04004import "io"
5
giolekva7be17df2020-03-21 13:57:02 +04006import "google.golang.org/grpc"
7
8import "pcloud/api"
9
10type PrimaryReplicaChangeListener interface {
11 ChunkId() string
12 Address() <-chan string
13}
14
15type NonChangingPrimaryReplicaChangeListener struct {
16 chunkId string
17 address string
18}
19
20func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
21 return l.chunkId
22}
23
24func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
25 ch := make(chan string, 1)
26 ch <- l.address
27 return ch
28}
29
30func replicate(ctx context.Context, dst, src Chunk) {
31 inp := src.ReadSeeker()
32 inp.Seek(int64(src.SizeBytes()), io.SeekStart)
33 out := dst.Writer()
34 for {
35 select {
36 default:
37 p := make([]byte, 100)
38 n, _ := inp.Read(p)
39 if n > 0 {
40 out.Write(p[:n])
41 }
42 case <-ctx.Done():
43 return
44 }
45 }
46}
47
48func dialConn(address string) (*grpc.ClientConn, error) {
49 var opts []grpc.DialOption
50 opts = append(opts, grpc.WithInsecure())
51 opts = append(opts, grpc.WithBlock())
52 return grpc.Dial(address, opts...)
53
54}
55
56func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
57 var cancel context.CancelFunc = nil
58 for {
59 select {
60 case <-ctx.Done():
61 return
62 case address := <-l.Address():
63 if cancel != nil {
64 cancel()
65 }
66 conn, err := dialConn(address)
67 if err == nil {
68 continue
69 }
70 client := api.NewChunkStorageClient(conn)
71 src := RemoteChunk{l.ChunkId(), client}
72 replicatorCtx, cancelFn := context.WithCancel(context.Background())
73 cancel = cancelFn
74 go replicate(replicatorCtx, dst, &src)
75 }
76 }
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040077}