| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 1 | package chunk |
| 2 | |
| giolekva | c5126d9 | 2020-03-21 16:39:56 +0400 | [diff] [blame^] | 3 | import ( |
| 4 | "context" |
| 5 | "io" |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 6 | |
| giolekva | c5126d9 | 2020-03-21 16:39:56 +0400 | [diff] [blame^] | 7 | "google.golang.org/grpc" |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 8 | |
| giolekva | c5126d9 | 2020-03-21 16:39:56 +0400 | [diff] [blame^] | 9 | "pcloud/api" |
| 10 | ) |
| giolekva | 7be17df | 2020-03-21 13:57:02 +0400 | [diff] [blame] | 11 | |
| 12 | type PrimaryReplicaChangeListener interface { |
| 13 | ChunkId() string |
| 14 | Address() <-chan string |
| 15 | } |
| 16 | |
| 17 | type NonChangingPrimaryReplicaChangeListener struct { |
| 18 | chunkId string |
| 19 | address string |
| 20 | } |
| 21 | |
| 22 | func (l NonChangingPrimaryReplicaChangeListener) ChunkId() string { |
| 23 | return l.chunkId |
| 24 | } |
| 25 | |
| 26 | func (l NonChangingPrimaryReplicaChangeListener) Address() <-chan string { |
| 27 | ch := make(chan string, 1) |
| 28 | ch <- l.address |
| 29 | return ch |
| 30 | } |
| 31 | |
| 32 | func replicate(ctx context.Context, dst, src Chunk) { |
| 33 | inp := src.ReadSeeker() |
| 34 | inp.Seek(int64(src.SizeBytes()), io.SeekStart) |
| 35 | out := dst.Writer() |
| 36 | for { |
| 37 | select { |
| 38 | default: |
| 39 | p := make([]byte, 100) |
| 40 | n, _ := inp.Read(p) |
| 41 | if n > 0 { |
| 42 | out.Write(p[:n]) |
| 43 | } |
| 44 | case <-ctx.Done(): |
| 45 | return |
| 46 | } |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | func dialConn(address string) (*grpc.ClientConn, error) { |
| 51 | var opts []grpc.DialOption |
| 52 | opts = append(opts, grpc.WithInsecure()) |
| 53 | opts = append(opts, grpc.WithBlock()) |
| 54 | return grpc.Dial(address, opts...) |
| 55 | |
| 56 | } |
| 57 | |
| 58 | func replicateFromPrimary(ctx context.Context, dst Chunk, l PrimaryReplicaChangeListener) { |
| 59 | var cancel context.CancelFunc = nil |
| 60 | for { |
| 61 | select { |
| 62 | case <-ctx.Done(): |
| 63 | return |
| 64 | case address := <-l.Address(): |
| 65 | if cancel != nil { |
| 66 | cancel() |
| 67 | } |
| 68 | conn, err := dialConn(address) |
| 69 | if err == nil { |
| 70 | continue |
| 71 | } |
| 72 | client := api.NewChunkStorageClient(conn) |
| 73 | src := RemoteChunk{l.ChunkId(), client} |
| 74 | replicatorCtx, cancelFn := context.WithCancel(context.Background()) |
| 75 | cancel = cancelFn |
| 76 | go replicate(replicatorCtx, dst, &src) |
| 77 | } |
| 78 | } |
| Giorgi Lekveishvili | 45b4d52 | 2020-03-19 21:11:18 +0400 | [diff] [blame] | 79 | } |