blob: 7f07227cb0d03fc7e8c7091eec914800db2ba455 [file] [log] [blame]
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04001package chunk
2
giolekvac5126d92020-03-21 16:39:56 +04003import (
giolekvac5126d92020-03-21 16:39:56 +04004 "context"
giolekva1f6577a2020-03-25 12:53:06 +04005 "fmt"
6 "log"
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +04007
giolekvac5126d92020-03-21 16:39:56 +04008 "sync"
9
giolekvad2a029d2020-03-25 23:06:08 +040010 "github.com/giolekva/pcloud/api"
giolekvac5126d92020-03-21 16:39:56 +040011)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040012
13type ChunkServer struct {
giolekva2babef22020-03-25 23:27:29 +040014 factory ChunkFactory
15 assignmentChangeLis ReplicaAssignmentChangeListener
16 chunks sync.Map
17 replicatorCancel sync.Map
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040018}
19
giolekva2babef22020-03-25 23:27:29 +040020func NewChunkServer(factory ChunkFactory,
21 assignmentChangeLis ReplicaAssignmentChangeListener) *ChunkServer {
22 return &ChunkServer{
23 factory: factory,
24 assignmentChangeLis: assignmentChangeLis}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040025}
26
27func (s *ChunkServer) ListChunks(
28 ctx context.Context,
giolekva7be17df2020-03-21 13:57:02 +040029 req *api.ListChunksRequest) (resp *api.ListChunksResponse, err error) {
30 resp = &api.ListChunksResponse{}
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040031 s.chunks.Range(func(k, v interface{}) bool {
32 resp.ChunkId = append(resp.ChunkId, k.(string))
33 return true
34 })
giolekva7be17df2020-03-21 13:57:02 +040035 return
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040036}
37
giolekva1f6577a2020-03-25 12:53:06 +040038func (s *ChunkServer) CreateChunk(
39 ctx context.Context,
40 req *api.CreateChunkRequest) (resp *api.CreateChunkResponse, err error) {
41 chunk := s.factory.New(int(req.Size))
42 s.chunks.Store(req.ChunkId, chunk)
43 switch req.Role {
44 case api.ReplicaRole_SECONDARY:
45 ctx, cancel := context.WithCancel(context.Background())
46 s.replicatorCancel.Store(req.ChunkId, cancel)
giolekva2babef22020-03-25 23:27:29 +040047 primaryAddressCh := s.assignmentChangeLis.Primary(
48 req.ChunkId, req.PrimaryAddress)
49 go ReplicateFromPrimary(ctx, req.ChunkId, chunk, primaryAddressCh)
giolekva1f6577a2020-03-25 12:53:06 +040050 case api.ReplicaRole_PRIMARY:
51 {
52 }
53 }
54 resp = &api.CreateChunkResponse{}
55 log.Printf("Created chunk: %s\n", req.ChunkId)
56 return
57
58}
59
60func (s *ChunkServer) GetChunkStatus(
61 ctx context.Context,
62 req *api.GetChunkStatusRequest) (resp *api.GetChunkStatusResponse, err error) {
63 if chunk, ok := s.chunks.Load(req.ChunkId); ok {
64 c := chunk.(Chunk)
65 var info ChunkInfo
66 info, err = c.Stats()
67 if err != nil {
68 return
69 }
70 resp = &api.GetChunkStatusResponse{
71 Status: info.Status,
72 TotalBytes: int32(info.Size),
73 CommittedBytes: int32(info.Committed)}
74 return
75 }
76 return nil, fmt.Errorf("Could not fund chunk: %s", req.ChunkId)
77}
78
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040079func (s *ChunkServer) ReadChunk(
80 ctx context.Context,
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040081 req *api.ReadChunkRequest) (resp *api.ReadChunkResponse, err error) {
82 if value, ok := s.chunks.Load(req.ChunkId); ok {
83 chunk := value.(Chunk)
giolekva1f6577a2020-03-25 12:53:06 +040084 b := make([]byte, req.NumBytes)
85 var n int
86 n, err = chunk.ReaderAt().ReadAt(b, int64(req.Offset))
87 if n == 0 {
88 return
Giorgi Lekveishvili45b4d522020-03-19 21:11:18 +040089 }
giolekva1f6577a2020-03-25 12:53:06 +040090 return &api.ReadChunkResponse{Data: b[:n]}, nil
91
92 } else {
93 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +040094 }
giolekva1f6577a2020-03-25 12:53:06 +040095}
96
97func (s *ChunkServer) WriteChunk(
98 ctx context.Context,
99 req *api.WriteChunkRequest) (resp *api.WriteChunkResponse, err error) {
100 if value, ok := s.chunks.Load(req.ChunkId); ok {
101 chunk := value.(Chunk)
102 var n int
103 n, err = chunk.WriterAt().WriteAt(req.Data, int64(req.Offset))
104 if n == 0 {
105 return
106 }
107 return &api.WriteChunkResponse{BytesWritten: int32(n)}, nil
108
109 } else {
110 return nil, fmt.Errorf("Chunk not found: %s", req.ChunkId)
111 }
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +0400112}
113
giolekva7be17df2020-03-21 13:57:02 +0400114func (s *ChunkServer) RemoveChunk(
115 ctx context.Context,
116 req *api.RemoveChunkRequest) (resp *api.RemoveChunkResponse, err error) {
117 if cancel, ok := s.replicatorCancel.Load(req.ChunkId); ok {
118 cancel.(context.CancelFunc)()
119 s.replicatorCancel.Delete(req.ChunkId)
120 }
121 s.chunks.Delete(req.ChunkId)
122 return &api.RemoveChunkResponse{}, nil
Giorgi Lekveishvilib8f089f2020-03-18 23:28:12 +0400123}