Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:Leap:15.5:Update
orthos-client
orthos2-1.2.31+git.5a38e82.obscpio
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File orthos2-1.2.31+git.5a38e82.obscpio of Package orthos-client
07070100000000000081ED00000000000000000000000161D832210000B64B000000000000000000000000000000000000002300000000orthos2-1.2.31+git.5a38e82/orthos2#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Author Jan Löser <jloeser@suse.de> # Published under the GNU Public Licence 2 import argparse import atexit import fcntl import getpass import json import logging import os import pwd import re import readline import struct import subprocess import sys import termios from datetime import date, datetime import pytz if sys.version_info.major == 3: import urllib.request as urllib_request from configparser import RawConfigParser from urllib.parse import urlencode PYTHON_VERSION = 3 else: print("Python version not supported!") sys.exit(1) VERSION = '2.0.0' DEFAULT_PROTOCOL = 'https' DEFAULT_SERVERNAME = 'localhost' DEFAULT_PORT = '443' API_URL_FORMAT = '{0}://{1}:{2}/api' USER_CONFIGFILE = '~/.config/orthosrc' SYSTEM_CONFIGFILE = '/etc/orthosrc' USER_HISTORYFILE = '~/.orthos_history' PROMPT = '(orthos {0}:{1})' PROMPT_LOADING = 'Please wait...' LEFT_MARGIN = 30 COLUMN_PADDING = 2 DEBUG = False UTF8 = True TIME_ZONE = 'Europe/Berlin' PLAIN_OUTPUT = False IFS = os.environ.get('OIFS', '') ALIASES = {} DATETIME_FORMAT = '%Y-%m-%d %H:%M' DATETIME_INPUT_FORMATS = [ '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ' ] orthos = None class Type: INFO = 'INFO' TABLE = 'TABLE' SELECT = 'SELECT' MESSAGE = 'MESSAGE' AUTHREQUIRED = 'AUTHREQUIRED' INPUT = 'INPUT' class InputType: INTEGER = 'INTEGER' STRING = 'STRING' BOOLEAN = 'BOOLEAN' DATE = 'DATE' SELECTION = 'SELECTION' class HTTPStatus: OK = 200 BADREQUEST = 400 NOTFOUND = 404 def format_value(value): """Format values human readable. Examples: Python type | Value | UTF8 | !UTF8 | Plain output ('-p') ------------|--------------|------------|------------|-------------------- str | '' | '-' | '-' | '' bool | True | '✓' | 'yes' | 'true' bool | False | 'x' | 'no' | 'false' (NoneType) | None | '-' | '-' | 'none' """ if (value is None): if PLAIN_OUTPUT: return 'none' return '-' # Python2: check for 'unicode' here if isinstance(value, str) or type(value).__name__ == 'unicode': if PLAIN_OUTPUT: return value # Expected formats: # a) `2017-08-09T11:22:33.123456Z` (with microseconds) # b) `2017-08-09T11:22:33Z` # `9999-12-31T00:00:00Z` (=infinte) datetime_value = None try: datetime_value = datetime.strptime(value, DATETIME_INPUT_FORMATS[0]) except ValueError: try: datetime_value = datetime.strptime(value, DATETIME_INPUT_FORMATS[1]) except ValueError: pass finally: if datetime_value: if datetime_value.date() == date.max: return 'infinite' try: # make tz aware (UTC), return as local time datetime_value = datetime_value.replace(tzinfo=pytz.UTC) target_timezone = pytz.timezone(orthos.config.get_timezone()) return datetime_value.astimezone(target_timezone).strftime(DATETIME_FORMAT) except OverflowError: pass if (not value): return '-' if isinstance(value, bool): if PLAIN_OUTPUT: return str(value).lower() elif value and UTF8: return '✓' elif value and (not UTF8): return 'yes' elif (not value) and UTF8: return 'x' else: return 'no' return value def get_prompt(user): return PROMPT.format(VERSION, user) class Command: def __init__(self, *args): self._command = args[0] self._help = args[1].get('help', None) self._docstring = args[1].get('docstring', None) self._tabcompletion = args[1].get('tabcompletion', None) self._url = args[1].get('url', None) self._method = args[1].get('method', 'GET') self._recent_arguments = '' self._argument_lists = args[1].get('arguments', None) def __repr__(self): return '<Command: {0}>'.format(str(self._command)) def __str__(self): return self._command.lower() def __lt__(self, other): return sorted([str(self), str(other)]) def __eq__(self, other): return str(self) == other.lower() def get_tabcompletion(self): return self._tabcompletion def get_help(self): return self._help if self._help else '-' def get_docstring(self): return self._docstring if self._docstring else 'No help available!' def get_url(self): return self._url def get_method(self): return self._method def get_matching_argument_list(self, length): """Return matching argument list. Return the respective argument list according to the number of given arguments, None otherwise. A trailing asterisk indicates a raw argument transfer. """ if self._argument_lists is not None: for argument_list in self._argument_lists: if len(argument_list) == 1 and argument_list[0][-1:] == '*': return argument_list elif len(argument_list) == length: return argument_list return None def get_argument_lists(self): return self._argument_lists def get_name(self): return self._command def send(self, orthos, raw_arguments=''): """Send a requst according to the command's HTTP method (set by the server). For re-executing a command with its arguments (e.g. after authorization was requested), all arguments get saved in the command object here. Example: 'command foo bar baz' -> http://.../command/foo?data=bar+baz 'command foo' -> http://.../command/foo 'command' -> http://.../command/ For POST requests, the following arguments get forwarded as entered into the API's post method. """ url = API_URL + self._url self._recent_arguments = raw_arguments.strip().split() matching_argument_list = self.get_matching_argument_list(len(self._recent_arguments)) if self._method == 'POST': if matching_argument_list and matching_argument_list[0][-1:] == '*': arguments = {matching_argument_list[0][:-1]: raw_arguments} else: arguments = dict(zip(matching_argument_list, self._recent_arguments)) return orthos.API.request('POST', url, data=arguments) elif self._method == 'GET': if matching_argument_list and matching_argument_list[0][-1:] == '*': arguments = urlencode({matching_argument_list[0][:-1]: raw_arguments}) else: arguments = urlencode(dict(zip(matching_argument_list, self._recent_arguments))) if arguments: url += '?{}'.format(arguments) return orthos.API.request('GET', url) def as_input(self): """Return the command with its arguments as a list.""" return [str(self)] + self._recent_arguments class APIResponse: def __init__(self, header, data): self._header = header self._data = data self._type = self._header['type'] logging.debug(self._header) logging.debug(self._data) if self._type == Type.SELECT: self.__class__ = Select elif self._type == Type.INFO: self.__class__ = Info self.__init__() elif self._type == Type.TABLE: self.__class__ = Table self.__init__() elif self._type == Type.MESSAGE: self.__class__ = Message elif self._type == Type.AUTHREQUIRED: self.__class__ = AuthRequired elif self._type == Type.INPUT: self.__class__ = Input self.__init__() @property def text(self): return self.output() class Input(APIResponse): def __init__(self): self._order = self._header['order'] self._target_url = self._header['target'] def _clean_value(self, field, input_value, required, items=None): """Validate and cast input values. Value needs to be the required type and needs to be set if value is required at all. For selections, either the number or the given is returned. Raises ValueError if value is corrupted. """ input_type = field['type'].upper() value = None if required and not input_value: raise ValueError("Value is required!") if input_type == InputType.INTEGER: if input_value.strip() == '': value = None else: try: value = int(input_value) except ValueError: raise ValueError("'{0}' is not an integer!".format(str(input_value))) max_value = field.get('max', None) min_value = field.get('min', None) if (max_value is not None) and (value > max_value): raise ValueError("Value must be equal less than {0}.".format(max_value)) if (min_value is not None) and (value < min_value): raise ValueError("Value must be equal greater than {0}.".format(min_value)) elif input_type == InputType.SELECTION: try: value = int(input_value) except ValueError: raise ValueError("'{0}' is not a number!".format(str(input_value))) if not (value >= 0 and value < len(items)): raise ValueError("Selection out of range!") item_values = [] for item in items: item_name = list(item.keys())[0] item_value = item[item_name].get('value', None) item_values.append(item_value) value = item_values[value] elif input_type == InputType.STRING: value = input_value elif input_type == InputType.DATE: value = input_value elif input_type == InputType.BOOLEAN: if input_value.lower() in {'1', 'y', 'yes', 'true'}: value = True elif input_value.lower() in {'0', 'n', 'no', 'false'}: value = False else: raise ValueError("Value {0} is not a boolean; use 'y' or 'n'!".format(str(input_value))) return value def process(self): """Execute API result.""" data = {} i = 0 while i < len(self._order): field_name = self._order[i] field = self._data[field_name] field_type = field['type'] item_values = {} # print selection if field_type == InputType.SELECTION: j = 0 for item in field['items']: item_name = list(item.keys())[0] item_attributes = item[item_name] item_value = item_attributes.get('value', None) if item_value: item_values[item_name] = item_value print('{0:>3}) {1:<30}'.format(j, item_attributes['label'])) j += 1 prompt = '{0}>'.format(field['prompt']) required = field['required'] hidden = field.get('hidden', None) logging.debug(field) if hidden: input_ = field.get('initial', None) else: try: input_ = orthos.LineReader.readline( prompt=prompt, default=field.get('initial', None), history=False ) input_ = self._clean_value( field, input_, required, items=field.get('items', None) ) except (KeyboardInterrupt, EOFError): print("\nAborted by user!") return None except ValueError as e: print("ERROR: {0}".format(str(e))) continue data[field_name] = input_ i = i + 1 response = orthos.API.request('POST', API_URL + self._target_url, {'form': data}) if response[0]: apiresponse = APIResponse(*response) apiresponse.process() class AuthRequired(APIResponse): def process(self): """Execute API result.""" if not orthos.is_authenticated(): orthos.authenticate() return orthos.recent_command class Select(APIResponse): def output(self): """Generate output.""" result = '{0}\n\n'.format(self._header['title']) i = 0 for entry in self._data: result += '{0:>3}) {1:<30}\n'.format(i, entry['value']) i += 1 return result def process(self): """Execute API result.""" orthos.Terminal.show(self.output()) class Message(APIResponse): def output(self): """Generate output.""" if not self._data.get('type', None) or not self._data['type']: result = '{0}'.format(self._data['message']) else: result = '{0}: {1}'.format(self._data['type'], self._data['message']) return result def process(self): """Execute API result.""" if (orthos.config.is_quiet() and self._data['type'] == 'INFO'): return orthos.Terminal.show(self.output()) class Table(APIResponse): def __init__(self): self._theader = self._header['theader'] self._captions = [list(item.values())[0] for item in self._theader] self._fields = [list(item.keys())[0] for item in self._theader] self._widths = [0] * len(self._captions) for i, caption in enumerate(self._captions): self._widths[i] = len(caption) for row in self._data: for i, field in enumerate(self._fields): if len(str(row[field])) > self._widths[i]: self._widths[i] = len(str(row[field])) def _get_line(self): """Draw a horizontal line with a correlating width.""" if not PLAIN_OUTPUT: return '-' * (sum(self._widths) + len(self._widths) * COLUMN_PADDING) + '\n' return '' def _get_theader(self): """Draw the table header.""" if not PLAIN_OUTPUT: theader = '' for i, caption in enumerate(self._captions): theader += ' {0:<{1}}'.format(caption, self._widths[i] + COLUMN_PADDING - 1) return theader + '\n' return '' def output(self): """Generate output.""" result = self._get_line() result += self._get_theader() result += self._get_line() for item in self._data: row = '' for i, field in enumerate(self._fields): value = format_value(item[field]) if PLAIN_OUTPUT: row += '{0}{1}'.format(value, IFS) else: row += ' {0:<{1}}'.format(value, self._widths[i] + COLUMN_PADDING - 1) result += row + '\n' result += self._get_line() result = result.rstrip('\n') return result def process(self): """Execute API result.""" orthos.Terminal.show(self.output()) class Info(APIResponse): def __init__(self): self._order = self._header['order'] def format_line(self, item, data=None): result = '' if isinstance(data, dict): value = format_value(data[item]['value']) if isinstance(value, str): value = value.replace('\n', ' ') result = "{0:<{1}}: {2}\n".format(data[item]['label'], LEFT_MARGIN, value) elif isinstance(data, list): i = 0 for element in data: j = 0 for subitem in item: value = format_value(element[subitem]['value']) if j > 0: result += " " * 3 + "{0:<{1}}: {2}\n".format( element[subitem]['label'], LEFT_MARGIN - 3, value ) else: result += "{0:<3}{1:<{2}}: {3}\n".format( i, element[subitem]['label'], LEFT_MARGIN - 3, value ) j += 1 i += 1 result += '\n' return result def output(self, order=None, data=None): if order is None: order = self._order if data is None: data = self._data result = '-' * Terminal().width + '\n' for item in order: if item is None: result += '-' * Terminal().width + '\n' else: if isinstance(item, list): result += '{0:-^{1}}\n'.format( ' ' + data[item[0]]['label'] + ' ', Terminal().width ) result += self.format_line(item[1], data[item[0]]['value']) else: if item not in data.keys(): continue result += self.format_line(item, data) return result def process(self): """Execute API result.""" orthos.Terminal.show(self.output()) class Terminal: """Access properties of the terminal.""" @property def width(self): s = struct.pack('HHHH', 0, 0, 0, 0) try: x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) except IOError: return 80 return struct.unpack('HHHH', x)[1] @property def height(self): s = struct.pack('HHHH', 0, 0, 0, 0) try: x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) except IOError: return 25 return struct.unpack('HHHH', x)[0] def pager(self, text): pager = subprocess.Popen( 'less -S', shell=True, stdin=subprocess.PIPE ) pager.stdin.write(bytes(text, 'utf-8')) pager.stdin.close() pager.wait() def need_pager(self, lines, columns): pager_policy = orthos.config.get_use_pager() if pager_policy == 'always': return True elif pager_policy == 'never': return False elif pager_policy == 'vertical': return self.height < lines elif pager_policy == 'horizontal': return self.width < columns else: return self.height < lines or self.width < columns def show(self, text, lines=None, columns=None): if not lines or not columns: columns = 0 lines = 0 for line in text.splitlines(): lines += 1 columns = max(columns, len(line)) if self.need_pager(lines, columns): self.pager(text) else: print(text) # from https://pymotw.com/2/readline/ class TabCompleter: def __init__(self, options): self._current_options = [] self._options = options def complete(self, text, state): """Return the completion.""" response = None if state == 0: line = readline.get_line_buffer() begin = readline.get_begidx() end = readline.get_endidx() being_completed = line[begin:end] words = line.split() if words: try: if begin == 0: options = self._options.keys() else: first = words[0] options = self._options[first] if being_completed: self._current_options = [ word for word in options if word.startswith(being_completed) ] else: self._current_options = options except (KeyError, IndexError): self._current_options = [] else: self._current_options = sorted(self._options.keys()) try: response = self._current_options[state] except IndexError: response = None return response class OrthosLineReader: def __init__(self, orthos, history=None): self.orthos = orthos self.prompt = '' self.completion = True try: if history: readline.read_history_file(history) except IOError: pass if history: atexit.register(readline.write_history_file, history) readline.set_completer_delims(' ') readline.set_completer(orthos.Tabcompleter.complete) readline.parse_and_bind('tab: complete') def disable_completion(self): self.completion = False def enable_completion(self): self.completion = True def hook(self): readline.insert_text(self.default) readline.redisplay() def set_prompt(self, prompt): self.prompt = prompt def readline(self, prompt=None, default=None, history=True, complete=True): old_completion = self.completion if not complete: self.disable_completion() prompt_suffix = '' if default: try: readline.set_pre_input_hook(self.hook) except AttributeError: prompt_suffix = ' [' + str(default) + ']' self.default = default if not prompt: prompt = self.prompt prompt += prompt_suffix if prompt: prompt += ' ' try: result = input(prompt) if not history and result: if sys.stdin.isatty(): readline.remove_history_item(readline.get_current_history_length() - 1) finally: if default: try: readline.set_pre_input_hook(None) except AttributeError: pass if not complete and old_completion: self.enable_completion() return result class Config: def __init__(self): self.__cp = RawConfigParser() self.__cp.read(SYSTEM_CONFIGFILE) self.__cp.read(os.path.expanduser(USER_CONFIGFILE)) self.__aliases = {} self.__protocol = None self.__port = None self.__server = None self.__user = None self.__auth_user = None self.__password = None self.__use_pager = None self.__timezone = None self.__token = None self.__quiet = not sys.stdin.isatty() for alias in ALIASES: self.__aliases[alias] = ALIASES[alias] if self.__cp.has_section('alias'): for key in self.__cp.options('alias'): self.__aliases[key] = self.__cp.get('alias', key) def set_quiet(self, quiet): self.__quiet = quiet def is_quiet(self): return self.__quiet def set_password(self, password): self.__password = password def get_password(self): if self.__password is not None: return self.__password if self.__cp.has_option('global', 'password'): return self.__cp.get('global', 'password') return None def set_server(self, server): self.__server = server def get_server(self): if self.__server: return self.__server else: if self.__cp.has_option('global', 'server'): return self.__cp.get('global', 'server') else: return DEFAULT_SERVERNAME def set_port(self, port): self.__port = port def get_port(self): if self.__port: return self.__port else: if self.__cp.has_option('global', 'port'): return self.__cp.getint('global', 'port') else: return DEFAULT_PORT def set_protocol(self, protocol): self.__protocol = protocol def get_protocol(self): if self.__protocol: return self.__protocol else: if self.__cp.has_option('global', 'protocol'): return self.__cp.get('global', 'protocol') else: return DEFAULT_PROTOCOL def get_history(self): if self.__cp.has_option('global', 'history'): hist = self.__cp.get('global', 'history') else: hist = USER_HISTORYFILE return os.path.expanduser(hist) def set_user(self, user): self.__user = user def get_user(self): if self.__user is not None: return self.__user if self.__cp.has_option('global', 'username'): return self.__cp.get('global', 'username') else: return pwd.getpwuid(os.getuid())[0] def set_auth_user(self, user): self.__auth_user = user def get_auth_user(self): if not self.__auth_user: return "Anonymous" return self.__auth_user def set_use_pager(self, use_pager): values = [ 'always', 'never', 'horizontal', 'vertical', 'both' ] if use_pager not in values: raise ValueError("use_pager must be in {0}".format(', '.join(values))) self.__use_pager = use_pager def get_use_pager(self): if self.__use_pager is not None: return self.__use_pager if sys.stdin.isatty(): if self.__cp.has_option('global', 'use_pager'): return self.__cp.get('global', 'use_pager') else: return 'both' else: return 'never' def get_aliases(self): return self.__aliases def set_alias(self, name, *values): value = ' '.join(values) self.__aliases[name] = value if not self.__cp.has_section('alias'): self.__cp.add_section('alias') for alias in self.__aliases: self.__cp.set('alias', alias, self.__aliases[alias]) self.write_config() def format_aliases(self, alias=None): output = '' if alias: if alias in self.get_aliases().keys(): output = '{} => {}'.format(alias, self.get_aliases()[alias]) else: output = "ERROR: Unknown alias '{}'!".format(alias) else: for alias in self.get_aliases().keys(): output += '{} => {}\n'.format(alias, self.get_aliases()[alias]) return output.rstrip('\n') def set_timezone(self, timezone): if timezone in pytz.all_timezones: self.__timezone = timezone else: logging.warning("Unknown timezone: {}".format(timezone)) def get_timezone(self): if self.__timezone: return self.__timezone else: if self.__cp.has_option('global', 'timezone'): timezone = self.__cp.get('global', 'timezone') if timezone in pytz.all_timezones: return timezone else: logging.warning("Unknown timezone: {}".format(timezone)) return TIME_ZONE else: return TIME_ZONE def set_token(self, token): self.__token = token def get_token(self): if self.__token: return self.__token else: if self.__cp.has_option('global', 'token'): return self.__cp.get('global', 'token') return None def write_config(self): with open(os.path.expanduser(USER_CONFIGFILE), 'w') as f: self.__cp.write(f) def read_config(self, f: str): self.__cp.read(f) class Orthos: class API: def request_token(self, username, password): """Request token from API for user authentication.""" url = API_URL + '/login' data = { 'username': username, 'password': password } header, data = self.request('POST', url, data, login=True) orthos.config.set_token(data.get('token', None)) def get(self, url): """Send GET request to given URL.""" response = None headers = {} if self.get_token(): headers['Authorization'] = 'Token ' + self.get_token() try: request = urllib_request.Request(url, headers=headers) response = urllib_request.urlopen(request) except Exception as e: logging.debug(e) print("ERROR: {0} ({1})".format(str(e), API_URL)) sys.exit(1) return response def post(self, url, data=None, login=False): """Send POST request to given URL.""" response = None if login: json_data = json.dumps(data) else: json_data = json.dumps(data) logging.debug(json_data) headers = {'Content-Type': 'application/json'} if self.get_token(): headers['Authorization'] = 'Token ' + self.get_token() try: request = urllib_request.Request( url, str(json_data).encode('utf-8'), headers=headers ) response = urllib_request.urlopen(request) except Exception as e: logging.debug(e) print("\nERROR: Internal server Error, please double check submitted data and report:") print("Bad Data:\n{0}\nURL:{1}\n".format(json_data, API_URL)) sys.exit(1) return response def request(self, method, url, data=None, login=False): logging.debug(url) if (not orthos.config.is_quiet()): print(PROMPT_LOADING, end='\r') if method == 'GET': response = self.get(url) else: response = self.post(url, data, login) if response: result = json.loads(response.read().decode('utf-8')) else: return (None, {}) if login: header = None data = result else: header = result.get('header', None) data = result.get('data', []) if (not orthos.config.is_quiet()): print(' ' * len(PROMPT_LOADING), end='\r') return (header, data) def get_token(self): return orthos.config.get_token() def __init__(self): """Initialize orthos object.""" self.API = self.API() self.config = Config() self.Terminal = Terminal() def init(self): """Collect available commands from server and print welcome message.""" global API_URL if orthos.config.get_protocol() != "https": logging.warning("No secure ssl connection, try -H https://<host> or -P 443") API_URL = API_URL_FORMAT.format( orthos.config.get_protocol(), orthos.config.get_server(), orthos.config.get_port() ) header, data = self.API.request('GET', API_URL) orthos.config.set_auth_user(data.get('user')) self._commands = [ Command(command, values) for command, values in data['commands'].items() ] self._add_client_command( 'alias', help='Define own aliases.', docstring="""Define or display aliases. The command can be called without any arguments, then it displays all available aliases. If it's called with one argument, then it displays the definition of a specific alias. If it is called with more than two arguments, then you can define new aliases. To execute an alias, type the alias name with a leading '@'. Usage: ALIAS [alias] [*args] Arguments: alias - Alias name. *args - Valid command string. Example: ALIAS ALIAS allmachines query name, ipv4 where name =~ foobar ALIAS allmachines @allmachines """ ) self._add_client_command( 'auth', help='Request authorisation manually.' ) self._add_client_command( 'exit', help='Exit program.' ) self._add_client_command( 'config', help='Show connection/shell configurations.' ) self._add_client_command( 'help', [str(command) for command in self._commands], help='Provides help.' ) self.username = self.config.get_user() if not self.username: print("Empty username - Use -u/--user or check %s or %s" % (USER_CONFIGFILE, SYSTEM_CONFIGFILE)) exit(1) self.recent_command = None commands = {} for command in self._commands: commands[str(command)] = command.get_tabcompletion() for alias in orthos.config.get_aliases(): commands['@' + alias] = [] self.Tabcompleter = TabCompleter(commands) message = data.get('message', None) if message and not orthos.config.is_quiet(): self.Terminal.show(message) def _add_client_command(self, command, tabcompletion=[], help=None, docstring=None): """Add client command for tab completion.""" attributes = {'tabcompletion': tabcompletion} if help: attributes['help'] = help if docstring: attributes['docstring'] = docstring self._commands.append(Command(command, attributes)) def get_next_user_command(self, prompt=None): try: input = '' while not input: input = self.LineReader.readline(prompt=prompt) except KeyboardInterrupt: pass except EOFError: if not orthos.config.is_quiet(): print() return ['QUIT'] result = [] for token in re.split(''' (?=(?:[^'"]|'[^']*'|"[^"]*")*$)''', input): result.append(token.replace(' ', '%20')) # handle command line redirection if '|' in result: idx = result.index('|') self.Terminal.set_output_filter(result[idx + 1:]) result = result[0:idx] elif '|>' in result: idx = result.index('|>') self.Terminal.set_output_file(result[idx + 1], False) result = result[0:idx] elif '|>>' in result: idx = result.index('|>>') self.Terminal.set_output_file(result[idx + 1], True) result = result[0:idx] result = list(filter(lambda x: x != '', result)) return result def get_password(self): """Prompt for password and return it.""" password = None try: password = getpass.getpass("Orthos password for {0}: ".format(self.username)) except (KeyboardInterrupt, EOFError): pass return password def authenticate(self): """Authenticate the user.""" password = orthos.config.get_password() if password is not None: self.API.request_token(self.username, password) if not self.API.get_token(): print("ERROR: Authorization error!") if self.recent_command: self.recent_command = None return None else: password = self.get_password() if not password: print("\nAborted by user!") if self.recent_command: self.recent_command = None return None self.API.request_token(self.username, password) if not self.API.get_token(): print("ERROR: Authorization error!") return self.config.set_auth_user(self.username) self.LineReader.set_prompt(get_prompt(self.config.get_auth_user())) def is_authenticated(self): """Check if the user is already authenticated (token set).""" return True if self.API.get_token() else False def print_help(self, arguments=None): """Print help.""" arguments = arguments.strip() if arguments: argument = arguments.split()[0] if argument and argument in self._commands: command = self._commands[self._commands.index(argument)] print(command.get_docstring()) else: print("ERROR: Unknown command!") else: print("Commands are:") print() for command in self._commands: print("\t{0:<20} {1}".format(str(command).upper(), command.get_help())) print() def print_config(self): print("") print("User:\t\t%s" % self.config.get_auth_user()) print("Server:\t\t%s" % self.config.get_server()) print("Port:\t\t%s" % self.config.get_port()) print("Protocol:\t%s" % self.config.get_protocol()) print("Timezone:\t%s" % self.config.get_timezone()) print("") def run(self): self.LineReader = OrthosLineReader( self, history=self.config.get_history() ) if sys.stdin.isatty(): self.LineReader.set_prompt(get_prompt(self.config.get_auth_user())) while True: try: if self.recent_command: if isinstance(self.recent_command, list): input_ = self.recent_command else: input_ = self.recent_command.as_input() self.recent_command = None else: input_ = self.get_next_user_command() if '$USERNAME' in input_: input_ = list( map(lambda x: x.replace('$USERNAME', self.config.get_user()), input_) ) raw_command = input_[0].lstrip('@') command = raw_command.upper() arguments = input_[1:] raw_arguments = ' '.join(input_[1:]) arguments_length = len(input_[1:]) if command == 'AUTH': self.authenticate() continue elif command == 'HELP': self.print_help(raw_arguments) continue elif command in {'QUIT', 'EXIT'}: return elif command in {'CONFIG'}: self.print_config() continue elif command == 'ALIAS': if arguments_length == 0: self.Terminal.show(orthos.config.format_aliases()) elif arguments_length == 1: self.Terminal.show(orthos.config.format_aliases(arguments[0])) else: orthos.config.set_alias(arguments[0], *arguments[1:]) continue elif command in self._commands: command = self._commands[self._commands.index(command)] if command.get_matching_argument_list(arguments_length) is None: raise AttributeError(command.get_name()) response = command.send(self, raw_arguments) if response: apiresponse = APIResponse(*response) self.recent_command = command result = apiresponse.process() if not isinstance(result, Command): self.recent_command = None continue elif raw_command in orthos.config.get_aliases(): self.recent_command = orthos.config.get_aliases()[raw_command].split(' ') self.recent_command += arguments continue elif command == '': print() else: print("Invalid command.") except AttributeError as e: print("ERROR: Invalid number of arguments. " "Type 'help {}' for more information.".format(e)) except KeyboardInterrupt: pass def main(): """Main function.""" global orthos global PLAIN_OUTPUT global IFS orthos = Orthos() parser = argparse.ArgumentParser(description='Orthos command line interface.') parser.add_argument( '-H', '--host', dest='servername', metavar='HOST', help='use the hostname specified on the command line instead of the one in the config file' ) parser.add_argument( '-c', '--config', dest='config_file', help='Additionally read this file as config file after reading %s and %s' % (SYSTEM_CONFIGFILE, USER_CONFIGFILE) ) parser.add_argument( '-P', '--port', dest='port', metavar='PORT', help='use the port specified on the command line instead of the one in the config file' ) parser.add_argument( '-U', '--user', dest='user', metavar='USER', help='use the username specified' ) parser.add_argument( '--password', dest='password', metavar='PASSWORD', help='use this password for automatic authentication (e.g. for scripting)' ) parser.add_argument( '--token', dest='token', metavar='TOKEN', help='use this token for automatic authentication (e.g. for scripting); ' '-U/--password options will be ignored' ) parser.add_argument( '-D', '--debug', dest='debug', action='store_const', const=True, default=DEBUG, help='write debugging output' ) parser.add_argument( '-L', '--logfile', dest='logfile', default=False, metavar='FILE', help='use that together with -D to log the debug output in a file rather than the console' ) parser.add_argument( '--no-pager', dest='no_pager', const=True, default=False, action='store_const', help='do not use pager when showing results' ) parser.add_argument( '-p', '--plain-output', dest='plain_output', const=True, default=PLAIN_OUTPUT, action='store_const', help='print plain output (e.g. for scripting)' ) parser.add_argument( '-F', '--ifs', dest='ifs', default=IFS, metavar='IFS', help='set internal field separator (only useful in combination with -p; default is $OIFS)' ) parser.add_argument( '-q', '--quiet', dest='quiet', const=True, action='store_const', help='makes command line client quiet' ) parser.add_argument( '-v', '--version', dest='version', action='store_const', const=True, default=False, help='print version output' ) parser.add_argument( '--timezone', dest='timezone', metavar='TZ', help='set the local time zone (default is "Europe/Berlin")' ) options = parser.parse_args() if options.config_file: orthos.config.read_config(options.config_file) if options.debug: if options.logfile: logging.basicConfig( filename=options.logfile, level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s' ) else: logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s' ) logging.info('Debugging enabled') if options.servername: servername = options.servername.split('://') if len(servername) == 2: orthos.config.set_protocol(servername[0].lower()) orthos.config.set_server(servername[1].lower()) if servername[0].lower() == 'https': orthos.config.set_port(443) elif servername[0].lower() == 'http': orthos.config.set_port(80) elif len(servername) == 1: orthos.config.set_server(servername[0].lower()) if options.port: port = int(options.port) orthos.config.set_port(port) if port == 443: orthos.config.set_protocol("https") else: orthos.config.set_protocol("http") if options.user: orthos.config.set_user(options.user) if options.password: orthos.config.set_password(options.password) if options.token: orthos.config.set_token(options.token) if options.no_pager: orthos.config.set_use_pager('never') if options.plain_output: PLAIN_OUTPUT = True if options.ifs: IFS = options.ifs if options.quiet: orthos.config.set_quiet(True) if options.version: print(VERSION) sys.exit(0) if options.timezone: orthos.config.set_timezone(options.timezone) orthos.init() try: orthos.run() except Exception as e: print(e) finally: if orthos.config.is_quiet(): print() else: print("Good bye, have a lot of fun...") if __name__ == '__main__': main() 07070100000001000081A400000000000000000000000161D8322100000044000000000000000000000000000000000000002400000000orthos2-1.2.31+git.5a38e82/orthosrc[global] username = server = port = 443 protocol = https token = 07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!93 blocks
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor