#!/usr/bin/python import os import sys import getopt import socket import subprocess import yaml data_path_base = ".config/host-control.yaml" class host_control(): host = None verbose = False shell = False usage = "Usage: host-control [options] [action]\n" \ "\n" \ "Options:\n" \ " -h [host]: specify host\n" \ " -v: tell more details\n" \ " -s: produce minimal shell output\n" \ "\n" \ "Actions:\n" \ " system-info show kernel info\n" \ " system-shell access system shell\n" \ " system-ping ping host system\n" \ " system-update update system\n" \ " system-restart restart system\n" \ " system-initramfs-update update system initramfs\n" \ " system-daemon-status show systemd status\n" \ " system-daemon-log show systemd log\n" \ " system-daemon-reload reload systemd units\n" \ " system-docker-status show docker status\n" \ " config-list list host config items\n" \ " config-update [config] update host config\n" \ " unit-list list host units\n" \ " unit-status [unit] show unit status\n" \ " unit-start [unit] start unit\n" \ " unit-stop [unit] stop unit\n" \ " unit-restart [unit] restart unit\n" \ " unit-log [unit] show unit log\n" \ " unit-update [unit] update unit\n" \ " unit-docker-shell [unit] access unit docker shell" # trace def print_message(self, message): if not self.shell: print(message) def print_command(self, command): if self.verbose: print("Running: " + " ".join(command)) # ssh def ssh_address(self, host): address = "" if "user" in host: address += host["user"]+"@" address += self.host_address(host) return address def ssh_command(self, address, arguments): return [ "ssh", address ] + arguments # host def host_address(self, host): if "address" in host: return host["address"] else: return host["name"] def host_lookup(self, name): for h in self.hosts: if not "name" in h: continue if h["name"] == name: return h return None # hook def hook_lookup(self, host, name): if not "hooks" in host: self.print_message("No hooks in host: " + self.host) return None for h in host["hooks"]: if not "target" in h: continue if h["target"] == name: return h self.print_message("No hook for " + name + " in host: " + self.host) return None def hook_command(self, hook): if "command" not in hook: return None return [ hook["command"] ] # config def config_path(self): return self.config["path"] def config_lookup(self, host, name): if not "config" in host: self.print_message("No config in host: " + self.host) return None for c in host["config"]: if not "name" in c: continue elif not "files" in c: continue if c["name"] == name: return c self.print_message("No config for " + name + " in host: " + self.host) return None def config_copy_command(self, host, source, sink): source_path = os.path.join(self.config_path(), source) if self.host_address(host) == self.hostname: self.print_message("Copy " + source + " to " + sink) command = [ "cp", source_path, sink ] else: sink_address = self.ssh_address(host) + ":" + sink self.print_message("Copy " + source + " to " + sink_address) command = [ "scp", source_path, sink_address ] return command def config_owner_command(self, host, sink, owner): self.print_message("Ownership of " + sink + " to " + owner) command_base = [ "chown", owner, sink ] if self.host_address(host) == self.hostname: command = command_base else: address = self.ssh_address(host) command = self.ssh_command(address, command_base) return command def config_mode_command(self, host, sink, mode): self.print_message("Mode of " + sink + " to " + mode) command_base = [ "chmod", mode, sink ] if self.host_address(host) == self.hostname: command = command_base else: address = self.ssh_address(host) command = self.ssh_command(address, command_base) return command def config_update(self, host, name): config = self.config_lookup(host, name) if config is None: return 1 if not self.shell: self.print_message("Updating host " + self.host + " config: " + name) for f in config["files"]: source = f.split(":")[0] sink = f.split(":")[1] command = self.config_copy_command(host, source, sink) self.print_command(command) subprocess.call(command) if "owner" in config: command = self.config_owner_command(host, sink, config["owner"]) self.print_command(command) subprocess.call(command) if "mode" in config: command = self.config_mode_command(host, sink, str(config["mode"])) self.print_command(command) subprocess.call(command) return 0 def config_list(self, host): if not "config" in host: self.print_message("No config in host: " + self.host) return 1 self.print_message("Host " + self.host + " config:") for c in host["config"]: if not "name" in c: continue elif not "files" in c: continue if not self.shell: print("- " + c["name"]) else: print(c["name"]) return 0 # unit def unit_lookup(self, host, name): if not "units" in host: self.print_message("No units in host: " + self.host) return 1 for u in host["units"]: if not "name" in u: continue if u["name"] == name: return u return None def unit_docker_container(self, unit): if "docker-container" in unit: return unit["docker-container"] else: return unit["name"] def unit_systemctl(self, host, name, option): address = self.ssh_address(host) unit = self.unit_lookup(host, name) if unit is None: self.print_message("No " + name + " unit in host: " + self.host) return 1 if not "systemd-unit" in unit: return 1 command = self.ssh_command(address, [ "SYSTEMD_COLORS=1", "systemctl", option, unit["systemd-unit"] ]) self.print_command(command) subprocess.call(command) return 0 def unit_status(self, host, name): return self.unit_systemctl(host, name, "status") def unit_start(self, host, name): self.print_message("Starting unit: " + name) return self.unit_systemctl(host, name, "start") def unit_stop(self, host, name): self.print_message("Stopping unit: " + name) return self.unit_systemctl(host, name, "stop") def unit_restart(self, host, name): self.print_message("Restarting unit: " + name) return self.unit_systemctl(host, name, "restart") def unit_log(self, host, name): address = self.ssh_address(host) unit = self.unit_lookup(host, name) if unit is None: self.print_message("No " + name + " unit in host: " + self.host) return 1 if not "systemd-unit" in unit: return 1 if "log-path" in unit: arguments = [ "tail", "-n", "50", "-f", unit["log-path"] ] else: arguments = [ "SYSTEMD_COLORS=1", "journalctl", "-n", "50", "-fu", unit["systemd-unit"] ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def unit_update(self, host, name): daemon_reload = False unit = self.unit_lookup(host, name) if unit is None: self.print_message("No " + name + " unit in host: " + self.host) return 1 if not "config" in unit: return 1 self.print_message("Updating unit: " + name) if isinstance(unit["config"], list): for c in unit["config"]: self.config_update(host, c) if c.startswith("systemd"): daemon_reload = True else: self.config_update(host, unit["config"]) if unit["config"].startswith("systemd"): daemon_reload = True if daemon_reload: self.system_daemon_reload(host) self.unit_restart(host, name) return 0 def unit_docker_shell(self, host, name): unit = self.unit_lookup(host, name) if unit is None: self.print_message("No " + name + " unit in host: " + self.host) return 1 container = self.unit_docker_container(unit) address = self.ssh_address(host) command = self.ssh_command(address, [ "-t", "docker", "exec", "-ti", container, "sh" ]) self.print_command(command) subprocess.call(command) def unit_list(self, host): if not "units" in host: self.print_message("No units in host: " + self.host) return 1 self.print_message("Host " + self.host + " units:") for u in host["units"]: if not "name" in u: continue if not self.shell: print("- " + u["name"]) else: print(u["name"]) # system def system_info(self, host): address = self.ssh_address(host) arguments = [ "uname", "-a" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_shell(self, host): address = self.ssh_address(host) command = self.ssh_command(address, []) self.print_command(command) subprocess.call(command) def system_ping(self, host): address = self.host_address(host) command = [ "ping", address ] self.print_command(command) subprocess.call(command) def system_update(self, host): address = self.ssh_address(host) # TODO: Move update command in config. arguments = [ "apt", "update", "&&", "apt", "upgrade" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_restart(self, host): address = self.ssh_address(host) arguments = [ "reboot" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_initramfs_update(self, host): address = self.ssh_address(host) hook = self.hook_lookup(host, "initramfs-update") if hook == None: return 1 arguments = self.hook_command(hook) if arguments == None: return 1 command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_daemon_status(self, host): address = self.ssh_address(host) arguments = [ "SYSTEMD_COLORS=1", "systemctl", "status" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_daemon_log(self, host): address = self.ssh_address(host) arguments = [ "SYSTEMD_COLORS=1", "journalctl", "-n", "50", "-fb" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_daemon_reload(self, host): address = self.ssh_address(host) arguments = [ "SYSTEMD_COLORS=1", "systemctl", "daemon-reload" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) def system_docker_status(self, host): address = self.ssh_address(host) arguments = [ "docker", "ps" ] command = self.ssh_command(address, arguments) self.print_command(command) subprocess.call(command) # main def data_load(self): if "SUDO_USER" in os.environ: user = os.environ["SUDO_USER"] else: user = os.environ["USER"] data_path_user = os.path.expanduser("~" + user) data_path = os.path.join(data_path_user, data_path_base) s = open(data_path, "r") y = yaml.load(s, Loader = yaml.SafeLoader) s.close() self.hostname = socket.gethostname() self.config = y["config"] self.hosts = y["hosts"] def main(self): action = None self.data_load() try: options, arguments = getopt.getopt(sys.argv[1:], "h:vs") except getopt.GetoptError: self.print_message(self.usage) return 1 for key, value in options: if key == "-h": self.host = value elif key == "-v": self.verbose = True elif key == "-s": self.shell = True if len(arguments) < 1: self.print_message(self.usage) return 1 if self.host is None: self.host = self.hostname host = self.host_lookup(self.host) if host is None: self.print_message("Unknown host: " + self.host) return 1 self.print_message("Target host: " + self.host) action = arguments[0] if action == "system-info": return self.system_info(host) elif action == "system-shell": return self.system_shell(host) elif action == "system-ping": return self.system_ping(host) elif action == "system-update": return self.system_update(host) elif action == "system-restart": return self.system_restart(host) elif action == "system-initramfs-update": return self.system_initramfs_update(host) elif action == "system-daemon-status": return self.system_daemon_status(host) elif action == "system-daemon-log": return self.system_daemon_log(host) elif action == "system-daemon-reload": return self.system_daemon_reload(host) elif action == "system-docker-status": return self.system_docker_status(host) elif action == "config-list": return self.config_list(host) elif action == "unit-list": return self.unit_list(host) if len(arguments) < 2: self.print_message(self.usage) return 1 if action == "config-update": return self.config_update(host, arguments[1]) elif action == "unit-status": return self.unit_status(host, arguments[1]) elif action == "unit-start": return self.unit_start(host, arguments[1]) elif action == "unit-stop": return self.unit_stop(host, arguments[1]) elif action == "unit-restart": return self.unit_restart(host, arguments[1]) elif action == "unit-log": return self.unit_log(host, arguments[1]) elif action == "unit-update": return self.unit_update(host, arguments[1]) elif action == "unit-docker-shell": return self.unit_docker_shell(host, arguments[1]) else: self.print_message("Unknown action: " + action) return 1 return 0 if __name__ == "__main__": sys.exit(host_control().main())