diff options
author | Sean E. Russell <ser@ser1.net> | 2021-03-02 03:16:40 -0600 |
---|---|---|
committer | Sean E. Russell <ser@ser1.net> | 2021-03-02 03:17:53 -0600 |
commit | a44ced4bba471325bba98cabce5709d52c90a60c (patch) | |
tree | b628af5fc503ab90ba7d3f1c8ab47811dc624fbf /devices | |
parent | 94d5c2e33da1af733644932003d619a7ba537207 (diff) |
Bring extensions under the umbrella.
Bring extensions under the umbrella.
Diffstat (limited to 'devices')
-rw-r--r-- | devices/nvidia.go | 184 | ||||
-rw-r--r-- | devices/remote.go | 271 |
2 files changed, 455 insertions, 0 deletions
diff --git a/devices/nvidia.go b/devices/nvidia.go new file mode 100644 index 0000000..0e50dba --- /dev/null +++ b/devices/nvidia.go @@ -0,0 +1,184 @@ +package devices + +import ( + "bytes" + "encoding/csv" + "errors" + "fmt" + "os/exec" + "strconv" + "sync" + "time" + + "github.com/xxxserxxx/opflag" +) + +// Set up variables and register this plug-in with the main code. +// The functions Register*(f) tell gotop which of these plugin functions to +// call to update data; the RegisterStartup() function sets the function +// that gotop will call when everything else has been done and the plugin +// should start collecting data. +// +// In this plugin, one call to the nvidia program returns *all* the data +// we're looking for, but gotop will call each update function during each +// cycle. This means that the nvidia program would be called 3 (or more) +// times per update, which isn't very efficient. Therefore, we make this +// code more complex to run a job in the background that runs the nvidia +// tool periodically and puts the results into hashes; the update functions +// then just sync data from those hashes into the return data. +func init() { + opflag.BoolVarP(&nvidia, "nvidia", "", false, "Enable NVidia GPU support") + RegisterStartup(startNVidia) +} + +// updateNvidiaTemp copies data from the local _temps cache into the passed-in +// return-value map. It is called once per cycle by gotop. +func updateNvidiaTemp(temps map[string]int) map[string]error { + nvidiaLock.Lock() + defer nvidiaLock.Unlock() + for k, v := range _temps { + temps[k] = v + } + return _errors +} + +// updateNvidiaMem copies data from the local _mems cache into the passed-in +// return-value map. It is called once per cycle by gotop. +func updateNvidiaMem(mems map[string]MemoryInfo) map[string]error { + nvidiaLock.Lock() + defer nvidiaLock.Unlock() + for k, v := range _mems { + mems[k] = v + } + return _errors +} + +// updateNvidiaUsage copies data from the local _cpus cache into the passed-in +// return-value map. It is called once per cycle by gotop. +func updateNvidiaUsage(cpus map[string]int, _ bool) map[string]error { + nvidiaLock.Lock() + defer nvidiaLock.Unlock() + for k, v := range _cpus { + cpus[k] = v + } + return _errors +} + +// startNVidia is called once by gotop, and forks a thread to call the nvidia +// tool periodically and update the cached cpu, memory, and temperature +// values that are used by the update*() functions to return data to gotop. +// +// The vars argument contains command-line arguments to allow the plugin +// to change runtime options; the only option currently supported is the +// `nvidia-refresh` arg, which is expected to be a time.Duration value and +// sets how frequently the nvidia tool is called to refresh the date. +func startNVidia(vars map[string]string) error { + if !nvidia { + return nil + } + _, err := exec.Command("nvidia-smi", "-L").Output() + if err != nil { + return errors.New(fmt.Sprintf("NVidia GPU error: %s", err)) + } + _errors = make(map[string]error) + _temps = make(map[string]int) + _mems = make(map[string]MemoryInfo) + _cpus = make(map[string]int) + _errors = make(map[string]error) + RegisterTemp(updateNvidiaTemp) + RegisterMem(updateNvidiaMem) + RegisterCPU(updateNvidiaUsage) + + nvidiaLock = sync.Mutex{} + // Get the refresh period from the passed-in command-line/config + // file options + refresh := time.Second + if v, ok := vars["nvidia-refresh"]; ok { + if refresh, err = time.ParseDuration(v); err != nil { + return err + } + } + // update once to populate the device names, for the widgets. + update() + // Fork off a long-running job to call the nvidia tool periodically, + // parse out the values, and put them in the cache. + go func() { + timer := time.Tick(refresh) + for range timer { + update() + } + }() + return nil +} + +// Caches for the output from the nvidia tool; the update() functions pull +// from these and return the values to gotop when requested. +var ( + _temps map[string]int + _mems map[string]MemoryInfo + _cpus map[string]int + // A cache of errors generated by the background job running the nvidia tool; + // these errors are returned to gotop when it calls the update() functions. + _errors map[string]error +) + +var nvidiaLock sync.Mutex + +// update calls the nvidia tool, parses the output, and caches the results +// in the various _* maps. The metric data parsed is: name, index, +// temperature.gpu, utilization.gpu, utilization.memory, memory.total, +// memory.free, memory.used +// +// If this function encounters an error calling `nvidia-smi`, it caches the +// error and returns immediately. We expect exec errors only when the tool +// isn't available, or when it fails for some reason; no exec error cases +// are recoverable. This does **not** stop the cache job; that will continue +// to run and continue to call update(). +func update() { + bs, err := exec.Command( + "nvidia-smi", + "--query-gpu=name,index,temperature.gpu,utilization.gpu,memory.total,memory.used", + "--format=csv,noheader,nounits").Output() + if err != nil { + _errors["nvidia"] = err + //bs = []byte("GeForce GTX 1080 Ti, 0, 31, 9, 11175, 206") + return + } + csvReader := csv.NewReader(bytes.NewReader(bs)) + csvReader.TrimLeadingSpace = true + records, err := csvReader.ReadAll() + if err != nil { + _errors["nvidia"] = err + return + } + + // Ensure we're not trying to modify the caches while they're being read by the update() functions. + nvidiaLock.Lock() + defer nvidiaLock.Unlock() + // Errors during parsing are recorded, but do not stop parsing. + for _, row := range records { + // The name of the devices is the nvidia-smi "<name>.<index>" + name := row[0] + "." + row[1] + if _temps[name], err = strconv.Atoi(row[2]); err != nil { + _errors[name] = err + } + if _cpus[name], err = strconv.Atoi(row[3]); err != nil { + _errors[name] = err + } + t, err := strconv.Atoi(row[4]) + if err != nil { + _errors[name] = err + } + u, err := strconv.Atoi(row[5]) + if err != nil { + _errors[name] = err + } + _mems[name] = MemoryInfo{ + Total: 1048576 * uint64(t), + Used: 1048576 * uint64(u), + UsedPercent: (float64(u) / float64(t)) * 100.0, + } + } +} + +var nvidia bool diff --git a/devices/remote.go b/devices/remote.go new file mode 100644 index 0000000..a86f3ee --- /dev/null +++ b/devices/remote.go @@ -0,0 +1,271 @@ +package devices + +import ( + "bufio" + "log" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/xxxserxxx/opflag" +) + +var name string +var remote_url string +var sleep time.Duration +var remoteLock sync.Mutex + +// FIXME Widgets don't align values +// TODO remote network & disk aren't reported +// TODO network resiliency; I believe it currently crashes gotop when the network goes down +// TODO Replace custom decoder with https://github.com/prometheus/common/blob/master/expfmt/decode.go +// TODO MQTT / Stomp / MsgPack +func init() { + opflag.StringVarP(&name, "remote-name", "", "", "Remote: name of remote gotop") + opflag.StringVarP(&remote_url, "remote-url", "", "", "Remote: URL of remote gotop") + opflag.DurationVarP(&sleep, "remote-refresh", "", 0, "Remote: Frequency to refresh data, in seconds") + + RegisterStartup(startup) +} + +type Remote struct { + url string + refresh time.Duration +} + +func startup(vars map[string]string) error { + // Don't set anything up if there's nothing to do + if name == "" || remote_url == "" { + return nil + } + _cpuData = make(map[string]int) + _tempData = make(map[string]int) + _netData = make(map[string]float64) + _diskData = make(map[string]float64) + _memData = make(map[string]MemoryInfo) + + remoteLock = sync.Mutex{} + remotes := parseConfig(vars) + if remote_url != "" { + r := Remote{ + url: remote_url, + refresh: 2 * time.Second, + } + if name == "" { + name = "Remote" + } + if sleep != 0 { + r.refresh = sleep + } + remotes[name] = r + } + if len(remotes) == 0 { + log.Println("Remote: no remote URL provided; disabling extension") + return nil + } + RegisterTemp(updateTemp) + RegisterMem(updateMem) + RegisterCPU(updateUsage) + + // We need to know what we're dealing with, so the following code does two + // things, one of them sneakily. It forks off background processes + // to periodically pull data from remote sources and cache the results for + // when the UI wants it. When it's run the first time, it sets up a WaitGroup + // so that it can hold off returning until it's received data from the remote + // so that the rest of the program knows how many cores, disks, etc. it needs + // to set up UI elements for. After the first run, each process discards the + // the wait group. + w := &sync.WaitGroup{} + for n, r := range remotes { + n = n + "-" + r.url = r.url + var u *url.URL + w.Add(1) + go func(name string, remote Remote, wg *sync.WaitGroup) { + for { + res, err := http.Get(remote.url) + if err == nil { + u, err = url.Parse(remote.url) + if err == nil { + if res.StatusCode == http.StatusOK { + bi := bufio.NewScanner(res.Body) + process(name, bi) + } else { + u.User = nil + log.Printf("unsuccessful connection to %s: http status %s", u.String(), res.Status) + } + } else { + log.Print("error processing remote URL") + } + } else { + } + res.Body.Close() + if wg != nil { + wg.Done() + wg = nil + } + time.Sleep(remote.refresh) + } + }(n, r, w) + } + w.Wait() + return nil +} + +var ( + _cpuData map[string]int + _tempData map[string]int + _netData map[string]float64 + _diskData map[string]float64 + _memData map[string]MemoryInfo +) + +func process(host string, data *bufio.Scanner) { + remoteLock.Lock() + for data.Scan() { + line := data.Text() + if line[0] == '#' { + continue + } + if line[0:6] != _gotop { + continue + } + sub := line[6:] + switch { + case strings.HasPrefix(sub, _cpu): // int gotop_cpu_CPU0 + procInt(host, line, sub[4:], _cpuData) + case strings.HasPrefix(sub, _temp): // int gotop_temp_acpitz + procInt(host, line, sub[5:], _tempData) + case strings.HasPrefix(sub, _net): // int gotop_net_recv + parts := strings.Split(sub[5:], " ") + if len(parts) < 2 { + log.Printf(`bad data; not enough columns in "%s"`, line) + continue + } + val, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + log.Print(err) + continue + } + _netData[host+parts[0]] = val + case strings.HasPrefix(sub, _disk): // float % gotop_disk_:dev:mmcblk0p1 + parts := strings.Split(sub[5:], " ") + if len(parts) < 2 { + log.Printf(`bad data; not enough columns in "%s"`, line) + continue + } + val, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + log.Print(err) + continue + } + _diskData[host+parts[0]] = val + case strings.HasPrefix(sub, _mem): // float % gotop_memory_Main + parts := strings.Split(sub[7:], " ") + if len(parts) < 2 { + log.Printf(`bad data; not enough columns in "%s"`, line) + continue + } + val, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + log.Print(err) + continue + } + _memData[host+parts[0]] = MemoryInfo{ + Total: 100, + Used: uint64(100.0 / val), + UsedPercent: val, + } + default: + // NOP! This is a metric we don't care about. + } + } + remoteLock.Unlock() +} + +func procInt(host, line, sub string, data map[string]int) { + parts := strings.Split(sub, " ") + if len(parts) < 2 { + log.Printf(`bad data; not enough columns in "%s"`, line) + return + } + val, err := strconv.Atoi(parts[1]) + if err != nil { + log.Print(err) + return + } + data[host+parts[0]] = val +} + +func updateTemp(temps map[string]int) map[string]error { + remoteLock.Lock() + for name, val := range _tempData { + temps[name] = val + } + remoteLock.Unlock() + return nil +} + +// FIXME The units are wrong: getting bytes, assuming they're % +func updateMem(mems map[string]MemoryInfo) map[string]error { + remoteLock.Lock() + for name, val := range _memData { + mems[name] = val + } + remoteLock.Unlock() + return nil +} + +func updateUsage(cpus map[string]int, _ bool) map[string]error { + remoteLock.Lock() + for name, val := range _cpuData { + cpus[name] = val + } + remoteLock.Unlock() + return nil +} + +func parseConfig(vars map[string]string) map[string]Remote { + rv := make(map[string]Remote) + for key, value := range vars { + if strings.HasPrefix(key, "remote-") { + parts := strings.Split(key, "-") + if len(parts) == 2 { + log.Printf("malformed Remote extension configuration '%s'; must be 'remote-NAME-url' or 'remote-NAME-refresh'", key) + continue + } + name := parts[1] + remote, ok := rv[name] + if !ok { + remote = Remote{} + } + if parts[2] == "url" { + remote.url = value + } else if parts[2] == "refresh" { + sleep, err := strconv.Atoi(value) + if err != nil { + log.Printf("illegal Remote extension value for %s: '%s'. Must be a duration in seconds, e.g. '2'", key, value) + continue + } + remote.refresh = time.Duration(sleep) * time.Second + } else { + log.Printf("bad configuration option for Remote extension: '%s'; must be 'remote-NAME-url' or 'remote-NAME-refresh'", key) + continue + } + rv[name] = remote + } + } + return rv +} + +const ( + _gotop = "gotop_" + _cpu = "cpu_" + _temp = "temp_" + _net = "net_" + _disk = "disk_" + _mem = "memory_" +) |