201 lines
4.4 KiB
Go
201 lines
4.4 KiB
Go
package upgrade
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/charmbracelet/bubbles/spinner"
|
|
tea "github.com/charmbracelet/bubbletea"
|
|
"github.com/charmbracelet/lipgloss"
|
|
"github.com/pkg/errors"
|
|
"github.com/siderolabs/talos/pkg/machinery/api/machine"
|
|
"github.com/siderolabs/talos/pkg/machinery/client"
|
|
)
|
|
|
|
var upgraderBoxStyle = lipgloss.NewStyle().
|
|
Border(lipgloss.RoundedBorder()).
|
|
Padding(1,1).
|
|
Width(60).
|
|
BorderTop(true).
|
|
BorderBottom(true).
|
|
BorderLeft(true).
|
|
BorderRight(true)
|
|
|
|
type Upgrader struct {
|
|
core *CoreState
|
|
node *Node
|
|
|
|
spinner spinner.Model
|
|
|
|
done bool
|
|
failed bool
|
|
|
|
Message string
|
|
Status string
|
|
}
|
|
|
|
func NewUpgrader(core *CoreState, node *Node) (*Upgrader, error) {
|
|
u := &Upgrader{
|
|
core: core,
|
|
node: node,
|
|
spinner: spinner.New(
|
|
spinner.WithSpinner(spinner.Dot),
|
|
),
|
|
}
|
|
|
|
ctx := client.WithNode(u.core.ctx, u.node.node.Metadata.ID())
|
|
|
|
resp, err := u.node.node.Client.MachineClient.Upgrade(ctx, &machine.UpgradeRequest{
|
|
Image: u.core.SelectedImage,
|
|
Preserve: false,
|
|
Stage: true,
|
|
Force: false,
|
|
RebootMode: machine.UpgradeRequest_RebootMode(machine.RebootRequest_DEFAULT),
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to upgrade node")
|
|
}
|
|
|
|
var actorID string
|
|
|
|
for _, m := range resp.GetMessages() {
|
|
if actorID = m.GetActorId(); actorID != "" {
|
|
break
|
|
}
|
|
}
|
|
|
|
if actorID == "" {
|
|
return nil, errors.New("failed to collect actor ID; cannot watch upgrade progress")
|
|
}
|
|
|
|
go u.EventWatcher(ctx, actorID)
|
|
|
|
u.Message = "launched upgrade"
|
|
u.Status = "init"
|
|
|
|
return u, nil
|
|
}
|
|
|
|
func (u *Upgrader) EventWatcher(ctx context.Context, actorID string) {
|
|
for ctx.Err() == nil {
|
|
if err := u.EventWatchInstance(ctx, actorID); err != nil {
|
|
u.Status = err.Error()
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
continue
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
func (u *Upgrader) EventWatchInstance(ctx context.Context, actorID string) error {
|
|
eventsClient, err := u.node.node.Client.MachineClient.Events(ctx, &machine.EventsRequest{
|
|
WithActorId: actorID,
|
|
})
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to get event watcher")
|
|
}
|
|
|
|
for ctx.Err() == nil {
|
|
rawEv, err := eventsClient.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return errors.New("stream closed")
|
|
}
|
|
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to receive event")
|
|
}
|
|
|
|
if rawEv.GetMetadata().GetStatus().GetMessage() == "" {
|
|
continue
|
|
}
|
|
|
|
ev, err := client.UnmarshalEvent(rawEv)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to decode received event %+v", rawEv)
|
|
}
|
|
|
|
switch msg := ev.Payload.(type) {
|
|
case *machine.PhaseEvent:
|
|
u.Message = fmt.Sprintf("phase: %s action: %v", msg.GetPhase(), msg.GetAction())
|
|
u.Status = "running"
|
|
case *machine.TaskEvent:
|
|
u.Message = fmt.Sprintf("task: %s action: %v", msg.GetTask(), msg.GetAction())
|
|
u.Status = "running"
|
|
case *machine.SequenceEvent:
|
|
var errMessage string
|
|
|
|
if msg.GetError().GetMessage() != "" {
|
|
errMessage = fmt.Sprintf("error: [code: %v message: %v]",
|
|
msg.GetError().GetCode(),
|
|
msg.GetError().GetMessage(),
|
|
)
|
|
}
|
|
|
|
u.Message = fmt.Sprintf("sequence: %s action: %v%v",
|
|
msg.GetSequence(), msg.GetAction(), errMessage)
|
|
u.Status = "running"
|
|
|
|
if errMessage != "" {
|
|
return errors.Errorf("sequence error: %s", errMessage)
|
|
}
|
|
case *machine.MachineStatusEvent:
|
|
if msg.GetStage() == machine.MachineStatusEvent_RUNNING {
|
|
u.Message = "upgraded"
|
|
u.Status = "finished"
|
|
|
|
return nil
|
|
}
|
|
u.Message = fmt.Sprintf("stage: %v ready: %v unmetCond: %v",
|
|
msg.GetStage(), msg.GetStatus().GetReady(), msg.GetStatus().GetUnmetConditions())
|
|
u.Status = "running"
|
|
case *machine.ServiceStateEvent:
|
|
u.Message = fmt.Sprintf("service: %v message: %v healthy: %v",
|
|
msg.GetService(), msg.GetMessage(), msg.GetHealth().GetHealthy())
|
|
u.Status = "running"
|
|
default:
|
|
u.Status = "unhandled event"
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *Upgrader) Init() tea.Cmd {
|
|
return u.spinner.Tick
|
|
}
|
|
|
|
func (u *Upgrader) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
|
switch msg := msg.(type) {
|
|
case tea.KeyMsg:
|
|
return u, tea.Quit
|
|
case spinner.TickMsg:
|
|
if u.failed {
|
|
return nil, tea.Quit
|
|
}
|
|
|
|
if u.done {
|
|
return NewSelectNode(u.core), nil
|
|
}
|
|
|
|
var cmd tea.Cmd
|
|
|
|
u.spinner, cmd = u.spinner.Update(msg)
|
|
|
|
return u, cmd
|
|
default:
|
|
return u, nil
|
|
}
|
|
}
|
|
|
|
func (u *Upgrader) View() string {
|
|
return upgraderBoxStyle.Render(fmt.Sprintf("%s %s: %s", u.spinner.View(), u.Status, u.Message))
|
|
}
|
|
|
|
func (u *Upgrader) Tick() (cmd tea.Cmd) {
|
|
return u.spinner.Tick
|
|
}
|