blob: 5fcabc744ef8c873433f8f9f858fcb1275c84d69 [file] [log] [blame]
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
4 "context"
5 "io"
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04006
giolekvac5126d92020-03-21 16:39:56 +04007 "google.golang.org/grpc"
giolekva7be17df2020-03-21 13:57:02 +04008
giolekvad2a029d2020-03-25 23:06:08 +04009 "github.com/giolekva/pcloud/api"
giolekvac5126d92020-03-21 16:39:56 +040010)
giolekva7be17df2020-03-21 13:57:02 +040011
12type PrimaryReplicaChangeListener interface {
13 ChunkId() string
14 Address() <-chan string
15}
16
17type NonChangingPrimaryReplicaChangeListener struct {
18 chunkId string
giolekva1f6577a2020-03-25 12:53:06 +040019 ch chan string
20}
21
22func NewNonChangingPrimaryReplicaChangeListener(chunkId, address string) PrimaryReplicaChangeListener {
23 ch := make(chan string, 1)
24 ch <- address
25 return &NonChangingPrimaryReplicaChangeListener{chunkId, ch}
giolekva7be17df2020-03-21 13:57:02 +040026}
27
28func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string {
29 return l.chunkId
30}
31
32func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string {
giolekva1f6577a2020-03-25 12:53:06 +040033 return l.ch
giolekva7be17df2020-03-21 13:57:02 +040034}
35
giolekva1f6577a2020-03-25 12:53:06 +040036func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
37 dstInfo, err := dst.Stats()
38 if err != nil {
39 panic(err)
40 }
41 inp := src.ReaderAt()
42 replicated := dstInfo.Committed
43 out := dst.WriterAt()
giolekva7be17df2020-03-21 13:57:02 +040044 for {
45 select {
46 default:
47 p := make([]byte, 100)
giolekva1f6577a2020-03-25 12:53:06 +040048 n, err := inp.ReadAt(p, int64(replicated))
giolekva7be17df2020-03-21 13:57:02 +040049 if n > 0 {
giolekva1f6577a2020-03-25 12:53:06 +040050 m, _ := out.WriteAt(p[:n], int64(replicated))
51 replicated += m
giolekva7be17df2020-03-21 13:57:02 +040052 }
giolekva1f6577a2020-03-25 12:53:06 +040053 if err == io.EOF {
54 done <- 1
55 return
56 }
57
giolekva7be17df2020-03-21 13:57:02 +040058 case <-ctx.Done():
59 return
60 }
61 }
62}
63
giolekva1f6577a2020-03-25 12:53:06 +040064func ReplicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) {
65 var done chan int
giolekva7be17df2020-03-21 13:57:02 +040066 var cancel context.CancelFunc = nil
67 for {
68 select {
giolekva1f6577a2020-03-25 12:53:06 +040069 case <-done:
70 return
giolekva7be17df2020-03-21 13:57:02 +040071 case <-ctx.Done():
72 return
73 case address := <-l.Address():
74 if cancel != nil {
75 cancel()
76 }
giolekva1f6577a2020-03-25 12:53:06 +040077 var opts []grpc.DialOption
78 opts = append(opts, grpc.WithInsecure())
79 opts = append(opts, grpc.WithBlock())
80 conn, err := grpc.Dial(address, opts...)
giolekva7be17df2020-03-21 13:57:02 +040081 if err == nil {
82 continue
83 }
84 client := api.NewChunkStorageClient(conn)
85 src := RemoteChunk{l.ChunkId(), client}
86 replicatorCtx, cancelFn := context.WithCancel(context.Background())
87 cancel = cancelFn
giolekva1f6577a2020-03-25 12:53:06 +040088 done = make(chan int, 1)
89 go replicate(replicatorCtx, dst, &src, done)
90 }
91 }
92}
93
94func WriteToPrimary(ctx context.Context, src Chunk, l PrimaryReplicaChangeListener) {
95 var done chan int
96 var cancel context.CancelFunc = nil
97 for {
98 select {
99 case <-done:
100 return
101 case <-ctx.Done():
102 return
103 case address := <-l.Address():
104 if cancel != nil {
105 cancel()
106 }
107 var opts []grpc.DialOption
108 opts = append(opts, grpc.WithInsecure())
109 opts = append(opts, grpc.WithBlock())
110 conn, err := grpc.Dial(address, opts...)
111 if err != nil {
112 continue
113 }
114 client := api.NewChunkStorageClient(conn)
115 dst := RemoteChunk{l.ChunkId(), client}
116 replicatorCtx, cancelFn := context.WithCancel(context.Background())
117 cancel = cancelFn
118 done = make(chan int, 1)
119 go replicate(replicatorCtx, &dst, src, done)
giolekva7be17df2020-03-21 13:57:02 +0400120 }
121 }
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +0400122}