8000 fix: show allocated resources in TaskStatus by nikonov1101 · Pull Request #1368 · sonm-io/core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: show allocated resources in TaskStatus #1368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions insonmnia/resource/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
)

type Scheduler struct {
OS *hardware.Hardware
mu sync.Mutex
pool *pool
OS *hardware.Hardware
mu sync.Mutex
pool *pool
// taskToAskPlan maps task ID to ask plan ID
taskToAskPlan map[string]string
askPlanPools map[string]*pool
log *zap.SugaredLogger
// askPlanPools maps ask plan' ID to allocated resource pool
askPlanPools map[string]*pool
log *zap.SugaredLogger
}

func NewScheduler(ctx context.Context, hardware *hardware.Hardware) *Scheduler {
Expand Down Expand Up @@ -163,6 +165,28 @@ func (m *Scheduler) ReleaseTask(taskID string) error {
return nil
}

func (m *Scheduler) ResourceByTask(taskID string) (*sonm.AskPlanResources, error) {
m.mu.Lock()
defer m.mu.Unlock()

askID, ok := m.taskToAskPlan[taskID]
if !ok {
return nil, fmt.Errorf("failed to get ask plan id for task %s: no such ask plan", taskID)
}

pool, ok := m.askPlanPools[askID]
if !ok {
return nil, fmt.Errorf("failed to get ask plan pool by id %s: no such pool", askID)
}

res, ok := pool.used[taskID]
if !ok {
return nil, fmt.Errorf("failed to get resources for task %s: no such task", taskID)
}

return res, nil
}

func (m *Scheduler) OnDealFinish(taskID string) error {
_, ok := m.taskToAskPlan[taskID]

Expand All @@ -176,7 +200,8 @@ func (m *Scheduler) OnDealFinish(taskID string) error {
}

type pool struct {
all *sonm.AskPlanResources
all *sonm.AskPlanResources
// used maps resource ID (usually task id) to allocated resources
used map[string]*sonm.AskPlanResources
}

Expand Down
2 changes: 1 addition & 1 deletion insonmnia/worker/overseer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type ContainerInfo struct {
Cgroup string
CgroupParent string
NetworkIDs []string
DealID string
DealID *pb.BigInt
TaskId string
Tag *pb.TaskTag
AskID string
Expand Down
2 changes: 2 additions & 0 deletions insonmnia/worker/overseer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func TestMarshalContainerInfo(t *testing.T) {

c := ContainerInfo{
PublicKey: pkey,
DealID: sonm.NewBigIntFromInt(1234),
}

data, err := json.Marshal(c)
Expand All @@ -290,4 +291,5 @@ func TestMarshalContainerInfo(t *testing.T) {
require.NoError(t, err)

assert.True(t, ssh.KeysEqual(c.PublicKey, n.PublicKey))
assert.Equal(t, n.DealID, sonm.NewBigIntFromInt(1234))
}
17 changes: 11 additions & 6 deletions insonmnia/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,11 @@ func (m *Worker) listenDeals(dealsCh <-chan *pb.Deal) {
}

func (m *Worker) cancelDealTasks(deal *pb.Deal) error {
dealID := deal.GetId().Unwrap().String()
var toDelete []*ContainerInfo

m.mu.Lock()
for key, container := range m.containers {
if container.DealID == dealID {
if container.DealID == deal.GetId() {
toDelete = append(toDelete, container)
delete(m.containers, key)
}
Expand Down Expand Up @@ -731,7 +730,7 @@ func (m *Worker) StartTask(ctx context.Context, request *pb.StartTaskRequest) (*
containerInfo.PublicKey = publicKey
containerInfo.StartAt = time.Now()
containerInfo.ImageName = ref.String()
containerInfo.DealID = dealID.Unwrap().String()
containerInfo.DealID = dealID
containerInfo.Tag = request.GetSpec().GetTag()
containerInfo.TaskId = taskID
containerInfo.AskID = ask.ID
Expand Down Expand Up @@ -893,6 +892,7 @@ func (m *Worker) TaskStatus(ctx context.Context, req *pb.ID) (*pb.TaskStatusRepl
}

var metric ContainerMetrics
var resources *pb.AskPlanResources
// If a container has been stoped, ovs.Info has no metrics for such container
if info.status == pb.TaskStatusReply_RUNNING {
metrics, err := m.ovs.Info(ctx)
Expand All @@ -902,13 +902,18 @@ func (m *Worker) TaskStatus(ctx context.Context, req *pb.ID) (*pb.TaskStatusRepl

metric, ok = metrics[info.ID]
if !ok {
return nil, status.Errorf(codes.NotFound, "Cannot get metrics for container %s", req.GetId())
return nil, status.Errorf(codes.NotFound, "cannot get metrics for container %s", req.GetId())
}

resources, err = m.resources.ResourceByTask(req.GetId())
if err != nil {
return nil, status.Errorf(codes.NotFound, "cannot get resources for container %s", req.GetId())
}
}

reply := info.IntoProto(m.ctx)
reply.Usage = metric.Marshal()
// todo: fill `reply.AllocatedResources` field.
reply.AllocatedResources = resources

return reply, nil
}
Expand Down Expand Up @@ -1446,7 +1451,7 @@ func (m *Worker) getDealInfo(dealID *pb.BigInt) (*pb.DealInfoReply, error) {

for id, c := range m.containers {
// task is ours
if c.DealID == dealID.Unwrap().String() {
if c.DealID == dealID {
task := c.IntoProto(m.ctx)

// task is running or preparing to start
Expand Down
0