| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 1 | package chunk |
| 2 | |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 3 | import "context" |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 4 | import "io" |
| 5 | |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame^] | 6 | import "google.golang.org/grpc" |
| 7 | |
| 8 | import "pcloud/api" |
| 9 | |
| 10 | type PrimaryReplicaChangeListener interface { |
| 11 | ChunkId() string |
| 12 | Address() <-chan string |
| 13 | } |
| 14 | |
| 15 | type NonChangingPrimaryReplicaChangeListener struct { |
| 16 | chunkId string |
| 17 | address string |
| 18 | } |
| 19 | |
| 20 | func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string { |
| 21 | return l.chunkId |
| 22 | } |
| 23 | |
| 24 | func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string { |
| 25 | ch := make(chan string, 1) |
| 26 | ch <- l.address |
| 27 | return ch |
| 28 | } |
| 29 | |
| 30 | func replicate(ctx context.Context, dst, src Chunk) { |
| 31 | inp := src.ReadSeeker() |
| 32 | inp.Seek(int64(src.SizeBytes()), io.SeekStart) |
| 33 | out := dst.Writer() |
| 34 | for { |
| 35 | select { |
| 36 | default: |
| 37 | p := make([]byte, 100) |
| 38 | n, _ := inp.Read(p) |
| 39 | if n > 0 { |
| 40 | out.Write(p[:n]) |
| 41 | } |
| 42 | case <-ctx.Done(): |
| 43 | return |
| 44 | } |
| 45 | } |
| 46 | } |
| 47 | |
| 48 | func dialConn(address string) (*grpc.ClientConn, error) { |
| 49 | var opts []grpc.DialOption |
| 50 | opts = append(opts, grpc.WithInsecure()) |
| 51 | opts = append(opts, grpc.WithBlock()) |
| 52 | return grpc.Dial(address, opts...) |
| 53 | |
| 54 | } |
| 55 | |
| 56 | func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) { |
| 57 | var cancel context.CancelFunc = nil |
| 58 | for { |
| 59 | select { |
| 60 | case <-ctx.Done(): |
| 61 | return |
| 62 | case address := <-l.Address(): |
| 63 | if cancel != nil { |
| 64 | cancel() |
| 65 | } |
| 66 | conn, err := dialConn(address) |
| 67 | if err == nil { |
| 68 | continue |
| 69 | } |
| 70 | client := api.NewChunkStorageClient(conn) |
| 71 | src := RemoteChunk{l.ChunkId(), client} |
| 72 | replicatorCtx, cancelFn := context.WithCancel(context.Background()) |
| 73 | cancel = cancelFn |
| 74 | go replicate(replicatorCtx, dst, &src) |
| 75 | } |
| 76 | } |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 77 | } |