blob: d8990a7f6b40250e6e166e4d361865cbbfa9967c [file] [log] [blame]
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
4 "context"
5 "io"
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +04006
giolekvac5126d92020-03-21 16:39:56 +04007 "google.golang.org/grpc"
giolekva7be17df2020-03-21 13:57:02 +04008
giolekvab47be772020-04-11 15:23:37 +04009 "github.com/giolekva/pcloud/pfs/api"
giolekvac5126d92020-03-21 16:39:56 +040010)
giolekva7be17df2020-03-21 13:57:02 +040011
giolekva2babef22020-03-25 23:27:29 +040012type ReplicaAssignmentChangeListener interface {
13 Primary(chunkId, currentPrimary string) <-chan string
14}
15
giolekva7be17df2020-03-21 13:57:02 +040016type PrimaryReplicaChangeListener interface {
17 ChunkId() string
18 Address() <-chan string
19}
20
giolekva2babef22020-03-25 23:27:29 +040021type NonChangingReplicaAssignment struct {
giolekva1f6577a2020-03-25 12:53:06 +040022}
23
giolekva2babef22020-03-25 23:27:29 +040024func (l *NonChangingReplicaAssignment) Primary(chunkId, address string) <-chan string {
giolekva1f6577a2020-03-25 12:53:06 +040025 ch := make(chan string, 1)
26 ch <- address
giolekva2babef22020-03-25 23:27:29 +040027 return ch
giolekva7be17df2020-03-21 13:57:02 +040028}
29
giolekva1f6577a2020-03-25 12:53:06 +040030func replicate(ctx context.Context, dst, src Chunk, done chan<- int) {
31 dstInfo, err := dst.Stats()
32 if err != nil {
33 panic(err)
34 }
35 inp := src.ReaderAt()
36 replicated := dstInfo.Committed
37 out := dst.WriterAt()
giolekva7be17df2020-03-21 13:57:02 +040038 for {
39 select {
40 default:
41 p := make([]byte, 100)
giolekva1f6577a2020-03-25 12:53:06 +040042 n, err := inp.ReadAt(p, int64(replicated))
giolekva7be17df2020-03-21 13:57:02 +040043 if n > 0 {
giolekva1f6577a2020-03-25 12:53:06 +040044 m, _ := out.WriteAt(p[:n], int64(replicated))
45 replicated += m
giolekva7be17df2020-03-21 13:57:02 +040046 }
giolekva1f6577a2020-03-25 12:53:06 +040047 if err == io.EOF {
48 done <- 1
49 return
50 }
51
giolekva7be17df2020-03-21 13:57:02 +040052 case <-ctx.Done():
53 return
54 }
55 }
56}
57
giolekva2babef22020-03-25 23:27:29 +040058func ReplicateFromPrimary(ctx context.Context, chunkId string, dst Chunk, primaryAddressCh <-chan string) {
giolekva1f6577a2020-03-25 12:53:06 +040059 var done chan int
giolekva7be17df2020-03-21 13:57:02 +040060 var cancel context.CancelFunc = nil
61 for {
62 select {
giolekva1f6577a2020-03-25 12:53:06 +040063 case <-done:
64 return
giolekva7be17df2020-03-21 13:57:02 +040065 case <-ctx.Done():
66 return
giolekva2babef22020-03-25 23:27:29 +040067 case address := <-primaryAddressCh:
giolekva7be17df2020-03-21 13:57:02 +040068 if cancel != nil {
69 cancel()
70 }
giolekva1f6577a2020-03-25 12:53:06 +040071 var opts []grpc.DialOption
72 opts = append(opts, grpc.WithInsecure())
73 opts = append(opts, grpc.WithBlock())
74 conn, err := grpc.Dial(address, opts...)
giolekva7be17df2020-03-21 13:57:02 +040075 if err == nil {
76 continue
77 }
78 client := api.NewChunkStorageClient(conn)
giolekva2babef22020-03-25 23:27:29 +040079 src := RemoteChunk{chunkId, client}
giolekva7be17df2020-03-21 13:57:02 +040080 replicatorCtx, cancelFn := context.WithCancel(context.Background())
81 cancel = cancelFn
giolekva1f6577a2020-03-25 12:53:06 +040082 done = make(chan int, 1)
83 go replicate(replicatorCtx, dst, &src, done)
84 }
85 }
86}
87
giolekva2babef22020-03-25 23:27:29 +040088func WriteToPrimary(ctx context.Context, chunkId string, src Chunk, primaryAddressCh <-chan string) {
giolekva1f6577a2020-03-25 12:53:06 +040089 var done chan int
90 var cancel context.CancelFunc = nil
91 for {
92 select {
93 case <-done:
94 return
95 case <-ctx.Done():
96 return
giolekva2babef22020-03-25 23:27:29 +040097 case address := <-primaryAddressCh:
giolekva1f6577a2020-03-25 12:53:06 +040098 if cancel != nil {
99 cancel()
100 }
101 var opts []grpc.DialOption
102 opts = append(opts, grpc.WithInsecure())
103 opts = append(opts, grpc.WithBlock())
104 conn, err := grpc.Dial(address, opts...)
105 if err != nil {
106 continue
107 }
108 client := api.NewChunkStorageClient(conn)
giolekva2babef22020-03-25 23:27:29 +0400109 dst := RemoteChunk{chunkId, client}
giolekva1f6577a2020-03-25 12:53:06 +0400110 replicatorCtx, cancelFn := context.WithCancel(context.Background())
111 cancel = cancelFn
112 done = make(chan int, 1)
113 go replicate(replicatorCtx, &dst, src, done)
giolekva7be17df2020-03-21 13:57:02 +0400114 }
115 }
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +0400116}