blob: 2829471f6f97f11bcb231b69160b5fd3dc9652dd [file] [log] [blame]
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
4 "bytes"
5 "context"
6 "io"
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04007
giolekvac5126d92020-03-21 16:39:56 +04008 "sync"
9
10 "pcloud/api"
11)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040012
13type ChunkServer struct {
giolekva7be17df2020-03-21 13:57:02 +040014 factory ChunkFactory
15 chunks sync.Map
16 replicatorCancel sync.Map
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040017}
18
19func NewChunkServer() *ChunkServer {
20 return &ChunkServer{}
21}
22
23func (s *ChunkServer) ListChunks(
24 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040025 req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
26 resp = &api.ListChunksResponse{}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040027 s.chunks.Range(func(k, v interface{}) bool {
28 resp.ChunkId = append(resp.ChunkId, k.(string))
29 return true
30 })
giolekva7be17df2020-03-21 13:57:02 +040031 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040032}
33
34func (s *ChunkServer) ReadChunk(
35 ctx context.Context,
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040036 req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
37 if value, ok := s.chunks.Load(req.ChunkId); ok {
38 chunk := value.(Chunk)
39 src := chunk.ReadSeeker()
40 if req.Offset != 0 {
41 _, err = src.Seek(int64(req.Offset), io.SeekStart)
42 if err != nil {
43 return
44 }
45 }
46 var dst bytes.Buffer
47 if req.NumBytes != 0 {
48 _, err = io.CopyN(&dst, src, int64(req.NumBytes))
49 } else {
50 _, err = io.Copy(&dst, src)
51 }
52 if err == nil {
53 resp = &api.ReadChunkResponse{Data: dst.Bytes()}
54 }
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040055 }
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040056 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040057}
58
59func (s *ChunkServer) StoreChunk(
60 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040061 req *api.StoreChunkRequest) (resp *api.StoreChunkResponse, err error) {
62 chunk := s.factory.New()
63 _, err = chunk.Writer().Write(req.Data)
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040064 s.chunks.Store(req.ChunkId, chunk)
giolekva7be17df2020-03-21 13:57:02 +040065 if err == nil {
66 resp = &api.StoreChunkResponse{}
67 }
68 return
69}
70
71func (s *ChunkServer) ReplicateChunk(
72 ctx context.Context,
73 req *api.ReplicateChunkRequest) (resp *api.ReplicateChunkResponse, err error) {
74 chunk := s.factory.New()
75 s.chunks.Store(req.ChunkId, chunk)
76 ctx, cancel := context.WithCancel(context.Background())
77 s.replicatorCancel.Store(req.ChunkId, cancel)
78 go replicateFromPrimary(ctx, chunk, NonChangingPrimaryReplicaChangeListener{req.ChunkId, req.PrimaryChunkServer})
79 resp = &api.ReplicateChunkResponse{}
80 return
81
82}
83
84func (s *ChunkServer) RemoveChunk(
85 ctx context.Context,
86 req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
87 if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
88 cancel.(context.CancelFunc)()
89 s.replicatorCancel.Delete(req.ChunkId)
90 }
91 s.chunks.Delete(req.ChunkId)
92 return &api.RemoveChunkResponse{}, nil
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040093}