# # mocha-shell - the 'latte' python REPL shell for mocha # Copyright (C) 2024 Michael Becker # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from mocha.core import InstanceKey, InstanceReference from mocha.oms import Oms from mocha.definitions import KnownClassGuids, KnownInstanceGuids, KnownAttributeGuids, KnownRelationshipGuids from framework import REPLApplication, Guid, format_cwd, parse_cwd import atexit import colorama import os import readline import sys from colorama import Fore, Style from getpass import getpass configpath = os.path.join(os.path.expanduser("~"), ".config", "latte") if not os.path.exists(configpath): os.mkdir(configpath) histfile = os.path.join(configpath, "history") try: readline.read_history_file(histfile) # default history len is -1 (infinite), which may grow unruly readline.set_history_length(1000) except FileNotFoundError: pass atexit.register(readline.write_history_file, histfile) colorama.init() class MochaShell (REPLApplication): def print_instance_list(self, instances : list): print("") print("instance key global identifier comment ") print("--------------- -------------------------------------- -------------------------") for inst in instances: print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " " + self.oms.get_instance_text(inst)) print("") def print_instance_list_with_attribute_values(self, instances : list, oms : Oms, refobj : InstanceReference): print("") print("instance key global identifier value") print("--------------- -------------------------------------- ---------") for inst in instances: print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " " + str(oms.get_attribute_value(refobj, inst))) print("") def print_instance_list_with_relationship_values(self, instances : list, oms : Oms, refobj : InstanceReference): print("") print("instance key global identifier value") print("--------------- -------------------------------------- ---------") for inst in instances: print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " ", end = "") rels = oms.get_related_instances(refobj, inst) for rel in rels: if rels.index(rel) > 0: print (" ", end = "") print(str(rel.get_instance_key())) print("") def print_error(self, text : str): print (f"{Fore.RED}error: {Style.RESET_ALL}" + text) def print_error_not_open(self): self.print_error ("no database specified, use `create` or `open` first") def __init__(self): self.current_user = None self.current_file = None from mocha.oms.db.sqlite import SQLiteDatabaseOms self.oms = SQLiteDatabaseOms() def open_file(self, oms : Oms, filename : str): if not os.path.exists(filename): print ("file not found") return False oms.open(filename) print("opening '" + filename + "'") un = input("user name: ") pw = getpass("password: ") user = oms.get_user_by_username(un) if user is not None: hash = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordHash)) salt = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordSalt)) passsalt = str(pw + salt).encode("utf-8") import hashlib passhash = hashlib.sha512(passsalt).hexdigest() if (hash == passhash): self.current_user = un self.current_file = filename login = oms.create_instance_of(oms.get_instance_by_global_identifier(KnownClassGuids.UserLogin)) oms.assign_relationship(login, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User_Login__has__User), user) # oms.assign_relationship(user, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User__for__User_Login), login) token = Guid.create() oms.set_attribute_value(login, oms.get_instance_by_global_identifier(KnownAttributeGuids.Token), str(token)) return True print ("invalid user name or password") self.current_file = None oms.close() return False def strip(self, iid_or_gid : str) -> str: if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]": return iid_or_gid[1:-1] return iid_or_gid def parse(self, iid_or_gid : str) -> InstanceReference: if len(iid_or_gid) < 3: return None if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]": ik = InstanceKey.parse(self.strip(iid_or_gid)) return self.oms.get_instance_by_key(ik) elif iid_or_gid[0] == "{" and iid_or_gid[-1] == "}": ik = Guid.parse(iid_or_gid) return self.oms.get_instance_by_global_identifier(ik) def create_file(self, oms : Oms, filename : str): self.oms.open(filename) self.oms.init() self.oms.close() def before_start_internal(self): print (f"latte v0.3 - {Fore.GREEN}Local Application Testing and Troubleshooting Environment{Style.RESET_ALL}") if len(sys.argv) > 1: if os.path.exists(sys.argv[1]): self.current_file = sys.argv[1] else: print(f"{Fore.RED}error:{Style.RESET_ALL} file not found '" + sys.argv[1] + "'") if input("would you like to create it? (Y/N) [N] > ").lower() == "y": self.create_file(self.oms, sys.argv[1]) self.current_file = sys.argv[1] if self.current_file is None: self.print_error_not_open() else: self.open_file(self.oms, self.current_file) print ("") def get_prompt(self): prompt = f"{Fore.GREEN}{Style.BRIGHT}" + os.getlogin() + f"@localhost{Style.RESET_ALL}:{Fore.BLUE}{Style.BRIGHT}" + format_cwd(os.getcwd()) + f"{Style.RESET_ALL}\n↳ latte " if self.current_file is not None: if self.current_user is not None: prompt += f"{Fore.GREEN}" + self.current_user + f"{Style.RESET_ALL}" + "@" prompt += f"{Fore.CYAN}" + self.current_file + f"{Style.RESET_ALL}" + " " prompt += "> " return prompt def interactive_create_attribute(self, attr_class : InstanceReference): att_Name = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.Name) name = input("name: ") if name == "": print("please specify a name") return None attr = self.oms.create_instance_of(attr_class) self.oms.set_attribute_value(attr, att_Name, name) return attr def process_input(self, value, quotes): if len(value) > 0: if value[0] == "cd": os.chdir(parse_cwd(value[1])) elif value[0] == "ls": os.system("ls --color=always") elif value[0] == "open": if len(value) == 2: if value[1] == "database": if len(value) > 2: self.open_file(self.oms, value[2]) return else: print("usage: open database|tenant 'filename'") elif value[1] == "tenant": print("error: not implemented") return else: # default to "open database ..." self.open_file(self.oms, value[1]) return else: print("usage: open database|tenant 'filename'") elif value[0] == "create": if len (value) > 1: self.create_file(self.oms, value[1]) self.open_file(self.oms, value[1]) else: print("usage: create filename.mql") # print ("error: no database open, use `open filename.mql` first") elif value[0] == "instance": if len(value) > 1: if value[1] == "create": if len(value) == 3: iid = self.parse(value[2]) inst = self.oms.create_instance_of(iid) print(inst) else: print("usage: instance create class_iid [index]") elif value[1] == "get": if len(value) == 3: iid = self.parse(value[2]) print(iid) elif value[1] == "list": if not self.oms.is_open(): self.print_error_not_open() return ik_of = None if len(value) > 2: ik_of = self.parse(value[2]) instances = self.oms.get_instances(ik_of) self.print_instance_list(instances) else: print("usage: instance create|get|list") elif value[0] == "attribute": if len(value) > 1: if value[1] == "create": if len(value) > 2: if value[2] == "text" or value[2] == "richtext": attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.TextAttribute) if value[2] == "richtext": attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.RichTextAttribute) attr = self.interactive_create_attribute(attrClass) if attr is None: return att_MaximumLength = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MaximumLength) maxlen = input("maximum length (leave empty for infinite): ") if maxlen != "": self.oms.set_attribute_value(attr, att_MaximumLength, int(maxlen)) elif value[2] == "boolean": attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.BooleanAttribute) attr = self.interactive_create_attribute(attrClass) if attr is None: return elif value[2] == "numeric": attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.NumericAttribute) attr = self.interactive_create_attribute(attrClass) if attr is None: return att_MinimumValue = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MinimumValue) att_MaximumValue = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MaximumValue) if att_MinimumValue is None: print("warning: numeric attribute `Minimum Value` is not defined") else: minimum_value = input("minimum value (empty for infinite): ") if minimum_value != "": self.oms.set_attribute_value(attr, att_MinimumValue, int(minimum_value)) if att_MaximumValue is None: print("warning: numeric attribute `Maximum Value` is not defined") else: maximum_value = input("maximum value (empty for infinite): ") if maximum_value != "": self.oms.set_attribute_value(attr, att_MaximumValue, int(maximum_value)) else: print("usage: attribute create text|boolean|numeric|date|xml|richtext") else: print("usage: attribute create text|boolean|numeric|date|xml|richtext") if value[1] == "list": if len(value) == 3: inst_id = self.parse(value[2]) if inst_id is None: print("invalid inst id " + value[2] + " (inst not found)") return attrs = self.oms.get_attributes(inst_id) self.print_instance_list_with_attribute_values(attrs, self.oms, inst_id) else: print ("usage: attribute list instance_id") elif value[1] == "get": if len(value) == 4: if not self.oms.is_open(): self.print_error_not_open() return inst_id = self.parse(value[2]) att_id = self.parse(value[3]) if inst_id is None: print("invalid inst id " + self.strip(value[2]) + " (inst not found)") return if att_id is None: print("invalid inst id " + self.strip(value[3]) + " (inst not found)") return value = self.oms.get_attribute_value(inst_id, att_id) print(value) else: print("usage: attribute get inst_iid|inst_gid att_iid|att_gid") elif value[1] == "set": if len(value) == 5: if not self.oms.is_open(): self.print_error_not_open() return inst_id = self.parse(value[2]) att_id = self.parse(value[3]) value = value[4] if inst_id is None: print("invalid inst id " + self.strip(value[2]) + " (inst not found)") return if att_id is None: print("invalid inst id " + self.strip(value[3]) + " (inst not found)") return result = self.oms.set_attribute_value(inst_id, att_id, value) if result.success: print("ok") else: print ("error: " + result.message) else: print("usage: attribute set inst_iid|inst_gid att_iid|att_gid value") else: print("usage: attribute list|get|set") elif value[0] == "relationship": if value[1] == "list": if len(value) == 3: inst_id = self.parse(value[2]) if inst_id is None: print("invalid inst id " + value[2] + " (inst not found)") return rels = self.oms.get_relationships(inst_id) self.print_instance_list_with_relationship_values(rels, self.oms, inst_id) else: print ("usage: relationship list instance_id") elif value[0] == "user": if value[1] == "list": ik_User = self.oms.get_instance_by_global_identifier(KnownClassGuids.User) ik_UserName = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.UserName) users = self.oms.get_instances(ik_User) if users is None: self.print_error_not_open() return print ("user name ") print ("---------------------") for user in users: user_name = self.oms.get_attribute_value(user, ik_UserName) print(user_name) return elif value[1] == "set-password": if len(value) > 2: my_username = value[2] my_user = self.oms.get_user_by_username(my_username) if my_user is None: print("passwd: user '" + my_username + "' does not exist") return print("Changing password for " + my_username + ".") pw = getpass("New password: ") pw2 = getpass("Confirm new password: ") if pw == pw2: self.oms.set_user_password(my_user, pw) print("passwd: password updated successfully") else: print("Sorry, passwords do not match.") return elif value[0] == "close": self.oms.close() else: print("invalid cmd '" + value[0] + "'") if __name__ == '__main__': prog = MochaShell() prog.start()