mocha/python/mocha-shell.py

504 lines
14 KiB
Python

#
# mocha-shell - the 'latte' python REPL shell for mocha
# Copyright (C) 2024 Michael Becker
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mocha.core import InstanceKey, InstanceReference
from mocha.oms import Oms
from mocha.definitions import KnownClassGuids, KnownInstanceGuids, KnownAttributeGuids, KnownRelationshipGuids
from framework import REPLApplication, Guid, format_cwd, parse_cwd
import atexit
import colorama
import os
import readline
import sys
from colorama import Fore, Style
from getpass import getpass
configpath = os.path.join(os.path.expanduser("~"), ".config", "latte")
if not os.path.exists(configpath):
os.mkdir(configpath)
histfile = os.path.join(configpath, "history")
try:
readline.read_history_file(histfile)
# default history len is -1 (infinite), which may grow unruly
readline.set_history_length(1000)
except FileNotFoundError:
pass
atexit.register(readline.write_history_file, histfile)
colorama.init()
class MochaShell (REPLApplication):
def print_instance_list(self, instances : list):
print("")
print("instance key global identifier comment ")
print("--------------- -------------------------------------- -------------------------")
for inst in instances:
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " " + self.oms.get_instance_text(inst))
print("")
def print_instance_list_with_attribute_values(self, instances : list, oms : Oms, refobj : InstanceReference):
print("")
print("instance key global identifier value")
print("--------------- -------------------------------------- ---------")
for inst in instances:
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " " + str(oms.get_attribute_value(refobj, inst)))
print("")
def print_instance_list_with_relationship_values(self, instances : list, oms : Oms, refobj : InstanceReference):
print("")
print("instance key global identifier value")
print("--------------- -------------------------------------- ---------")
for inst in instances:
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " ", end = "")
rels = oms.get_related_instances(refobj, inst)
for rel in rels:
if rels.index(rel) > 0:
print (" ", end = "")
print(str(rel.get_instance_key()))
print("")
def print_error(self, text : str):
print (f"{Fore.RED}error: {Style.RESET_ALL}" + text)
def print_error_not_open(self):
self.print_error ("no database specified, use `create` or `open` first")
def __init__(self):
self.current_user = None
self.current_file = None
from mocha.oms.db.sqlite import SQLiteDatabaseOms
self.oms = SQLiteDatabaseOms()
def open_file(self, oms : Oms, filename : str):
if not os.path.exists(filename):
print ("file not found")
return False
oms.open(filename)
print("opening '" + filename + "'")
un = input("user name: ")
pw = getpass("password: ")
user = oms.get_user_by_username(un)
if user is not None:
hash = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordHash))
salt = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordSalt))
passsalt = str(pw + salt).encode("utf-8")
import hashlib
passhash = hashlib.sha512(passsalt).hexdigest()
if (hash == passhash):
self.current_user = un
self.current_file = filename
login = oms.create_instance_of(oms.get_instance_by_global_identifier(KnownClassGuids.UserLogin))
oms.assign_relationship(login, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User_Login__has__User), user)
# oms.assign_relationship(user, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User__for__User_Login), login)
token = Guid.create()
oms.set_attribute_value(login, oms.get_instance_by_global_identifier(KnownAttributeGuids.Token), str(token))
return True
print ("invalid user name or password")
self.current_file = None
oms.close()
return False
def strip(self, iid_or_gid : str) -> str:
if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]":
return iid_or_gid[1:-1]
return iid_or_gid
def parse(self, iid_or_gid : str) -> InstanceReference:
if len(iid_or_gid) < 3:
return None
if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]":
ik = InstanceKey.parse(self.strip(iid_or_gid))
return self.oms.get_instance_by_key(ik)
elif iid_or_gid[0] == "{" and iid_or_gid[-1] == "}":
ik = Guid.parse(iid_or_gid)
return self.oms.get_instance_by_global_identifier(ik)
def create_file(self, oms : Oms, filename : str):
self.oms.open(filename)
self.oms.init()
self.oms.close()
def before_start_internal(self):
print (f"latte v0.3 - {Fore.GREEN}Local Application Testing and Troubleshooting Environment{Style.RESET_ALL}")
if len(sys.argv) > 1:
if os.path.exists(sys.argv[1]):
self.current_file = sys.argv[1]
else:
print(f"{Fore.RED}error:{Style.RESET_ALL} file not found '" + sys.argv[1] + "'")
if input("would you like to create it? (Y/N) [N] > ").lower() == "y":
self.create_file(self.oms, sys.argv[1])
self.current_file = sys.argv[1]
if self.current_file is None:
self.print_error_not_open()
else:
self.open_file(self.oms, self.current_file)
print ("")
def get_prompt(self):
prompt = f"{Fore.GREEN}{Style.BRIGHT}" + os.getlogin() + f"@localhost{Style.RESET_ALL}:{Fore.BLUE}{Style.BRIGHT}" + format_cwd(os.getcwd()) + f"{Style.RESET_ALL}\n↳ latte "
if self.current_file is not None:
if self.current_user is not None:
prompt += f"{Fore.GREEN}" + self.current_user + f"{Style.RESET_ALL}" + "@"
prompt += f"{Fore.CYAN}" + self.current_file + f"{Style.RESET_ALL}" + " "
prompt += "> "
return prompt
def interactive_create_attribute(self, attr_class : InstanceReference):
att_Name = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.Name)
name = input("name: ")
if name == "":
print("please specify a name")
return None
attr = self.oms.create_instance_of(attr_class)
self.oms.set_attribute_value(attr, att_Name, name)
return attr
def process_input(self, value, quotes):
if len(value) > 0:
if value[0] == "cd":
os.chdir(parse_cwd(value[1]))
elif value[0] == "ls":
os.system("ls --color=always")
elif value[0] == "open":
if len(value) == 2:
if value[1] == "database":
if len(value) > 2:
self.open_file(self.oms, value[2])
return
else:
print("usage: open database|tenant 'filename'")
elif value[1] == "tenant":
print("error: not implemented")
return
else:
# default to "open database ..."
self.open_file(self.oms, value[1])
return
else:
print("usage: open database|tenant 'filename'")
elif value[0] == "create":
if len (value) > 1:
self.create_file(self.oms, value[1])
self.open_file(self.oms, value[1])
else:
print("usage: create filename.mql")
# print ("error: no database open, use `open filename.mql` first")
elif value[0] == "instance":
if len(value) > 1:
if value[1] == "create":
if len(value) == 3:
iid = self.parse(value[2])
inst = self.oms.create_instance_of(iid)
print(inst)
else:
print("usage: instance create class_iid [index]")
elif value[1] == "get":
if len(value) == 3:
iid = self.parse(value[2])
print(iid)
elif value[1] == "list":
if not self.oms.is_open():
self.print_error_not_open()
return
ik_of = None
if len(value) > 2:
ik_of = self.parse(value[2])
instances = self.oms.get_instances(ik_of)
self.print_instance_list(instances)
else:
print("usage: instance create|get|list")
elif value[0] == "attribute":
if len(value) > 1:
if value[1] == "create":
if len(value) > 2:
if value[2] == "text" or value[2] == "richtext":
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.TextAttribute)
if value[2] == "richtext":
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.RichTextAttribute)
attr = self.interactive_create_attribute(attrClass)
if attr is None:
return
att_MaximumLength = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MaximumLength)
maxlen = input("maximum length (leave empty for infinite): ")
if maxlen != "":
self.oms.set_attribute_value(attr, att_MaximumLength, int(maxlen))
elif value[2] == "boolean":
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.BooleanAttribute)
attr = self.interactive_create_attribute(attrClass)
if attr is None:
return
elif value[2] == "numeric":
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.NumericAttribute)
attr = self.interactive_create_attribute(attrClass)
if attr is None:
return
att_MinimumValue = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MinimumValue)
att_MaximumValue = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MaximumValue)
if att_MinimumValue is None:
print("warning: numeric attribute `Minimum Value` is not defined")
else:
minimum_value = input("minimum value (empty for infinite): ")
if minimum_value != "":
self.oms.set_attribute_value(attr, att_MinimumValue, int(minimum_value))
if att_MaximumValue is None:
print("warning: numeric attribute `Maximum Value` is not defined")
else:
maximum_value = input("maximum value (empty for infinite): ")
if maximum_value != "":
self.oms.set_attribute_value(attr, att_MaximumValue, int(maximum_value))
else:
print("usage: attribute create text|boolean|numeric|date|xml|richtext")
else:
print("usage: attribute create text|boolean|numeric|date|xml|richtext")
if value[1] == "list":
if len(value) == 3:
inst_id = self.parse(value[2])
if inst_id is None:
print("invalid inst id " + value[2] + " (inst not found)")
return
attrs = self.oms.get_attributes(inst_id)
self.print_instance_list_with_attribute_values(attrs, self.oms, inst_id)
else:
print ("usage: attribute list instance_id")
elif value[1] == "get":
if len(value) == 4:
if not self.oms.is_open():
self.print_error_not_open()
return
inst_id = self.parse(value[2])
att_id = self.parse(value[3])
if inst_id is None:
print("invalid inst id " + self.strip(value[2]) + " (inst not found)")
return
if att_id is None:
print("invalid inst id " + self.strip(value[3]) + " (inst not found)")
return
value = self.oms.get_attribute_value(inst_id, att_id)
print(value)
else:
print("usage: attribute get inst_iid|inst_gid att_iid|att_gid")
elif value[1] == "set":
if len(value) == 5:
if not self.oms.is_open():
self.print_error_not_open()
return
inst_id = self.parse(value[2])
att_id = self.parse(value[3])
value = value[4]
if inst_id is None:
print("invalid inst id " + self.strip(value[2]) + " (inst not found)")
return
if att_id is None:
print("invalid inst id " + self.strip(value[3]) + " (inst not found)")
return
result = self.oms.set_attribute_value(inst_id, att_id, value)
if result.success:
print("ok")
else:
print ("error: " + result.message)
else:
print("usage: attribute set inst_iid|inst_gid att_iid|att_gid value")
else:
print("usage: attribute list|get|set")
elif value[0] == "relationship":
if value[1] == "list":
if len(value) == 3:
inst_id = self.parse(value[2])
if inst_id is None:
print("invalid inst id " + value[2] + " (inst not found)")
return
rels = self.oms.get_relationships(inst_id)
self.print_instance_list_with_relationship_values(rels, self.oms, inst_id)
else:
print ("usage: relationship list instance_id")
elif value[0] == "user":
if value[1] == "list":
ik_User = self.oms.get_instance_by_global_identifier(KnownClassGuids.User)
ik_UserName = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.UserName)
users = self.oms.get_instances(ik_User)
if users is None:
self.print_error_not_open()
return
print ("user name ")
print ("---------------------")
for user in users:
user_name = self.oms.get_attribute_value(user, ik_UserName)
print(user_name)
return
elif value[1] == "set-password":
if len(value) > 2:
my_username = value[2]
my_user = self.oms.get_user_by_username(my_username)
if my_user is None:
print("passwd: user '" + my_username + "' does not exist")
return
print("Changing password for " + my_username + ".")
pw = getpass("New password: ")
pw2 = getpass("Confirm new password: ")
if pw == pw2:
self.oms.set_user_password(my_user, pw)
print("passwd: password updated successfully")
else:
print("Sorry, passwords do not match.")
return
elif value[0] == "close":
self.oms.close()
else:
print("invalid cmd '" + value[0] + "'")
if __name__ == '__main__':
prog = MochaShell()
prog.start()