blob: 1c77534e2bbd5c59451e344e3841969e841c9764 [file] [log] [blame]
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
4 "context"
5 "io"
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04006
giolekvac5126d92020-03-21 16:39:56 +04007 "google.golang.org/grpc"
giolekva7be17df2020-03-21 13:57:02 +04008
giolekvac5126d92020-03-21 16:39:56 +04009 "pcloud/api"
10)
giolekva7be17df2020-03-21 13:57:02 +040011
12type PrimaryReplicaChangeListener interface {
13 ChunkId() string
14 Address() <-chan string
15}
16
17type NonChangingPrimaryReplicaChangeListener struct {
18 chunkId string
19 address string
20}
21
22func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
23 return l.chunkId
24}
25
26func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
27 ch := make(chan string, 1)
28 ch <- l.address
29 return ch
30}
31
32func replicate(ctx context.Context, dst, src Chunk) {
33 inp := src.ReadSeeker()
34 inp.Seek(int64(src.SizeBytes()), io.SeekStart)
35 out := dst.Writer()
36 for {
37 select {
38 default:
39 p := make([]byte, 100)
40 n, _ := inp.Read(p)
41 if n > 0 {
42 out.Write(p[:n])
43 }
44 case <-ctx.Done():
45 return
46 }
47 }
48}
49
50func dialConn(address string) (*grpc.ClientConn, error) {
51 var opts []grpc.DialOption
52 opts = append(opts, grpc.WithInsecure())
53 opts = append(opts, grpc.WithBlock())
54 return grpc.Dial(address, opts...)
55
56}
57
58func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
59 var cancel context.CancelFunc = nil
60 for {
61 select {
62 case <-ctx.Done():
63 return
64 case address := <-l.Address():
65 if cancel != nil {
66 cancel()
67 }
68 conn, err := dialConn(address)
69 if err == nil {
70 continue
71 }
72 client := api.NewChunkStorageClient(conn)
73 src := RemoteChunk{l.ChunkId(), client}
74 replicatorCtx, cancelFn := context.WithCancel(context.Background())
75 cancel = cancelFn
76 go replicate(replicatorCtx, dst, &src)
77 }
78 }
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040079}