504 lines
14 KiB
Python
504 lines
14 KiB
Python
#
|
|
# mocha-shell - the 'latte' python REPL shell for mocha
|
|
# Copyright (C) 2024 Michael Becker
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from mocha.core import InstanceKey, InstanceReference
|
|
from mocha.oms import Oms
|
|
from mocha.definitions import KnownClassGuids, KnownInstanceGuids, KnownAttributeGuids, KnownRelationshipGuids
|
|
|
|
from framework import REPLApplication, Guid, format_cwd, parse_cwd
|
|
|
|
import atexit
|
|
import colorama
|
|
import os
|
|
import readline
|
|
import sys
|
|
|
|
from colorama import Fore, Style
|
|
from getpass import getpass
|
|
|
|
configpath = os.path.join(os.path.expanduser("~"), ".config", "latte")
|
|
if not os.path.exists(configpath):
|
|
os.mkdir(configpath)
|
|
|
|
histfile = os.path.join(configpath, "history")
|
|
|
|
try:
|
|
readline.read_history_file(histfile)
|
|
# default history len is -1 (infinite), which may grow unruly
|
|
readline.set_history_length(1000)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
atexit.register(readline.write_history_file, histfile)
|
|
|
|
colorama.init()
|
|
|
|
class MochaShell (REPLApplication):
|
|
|
|
def print_instance_list(self, instances : list):
|
|
print("")
|
|
print("instance key global identifier ")
|
|
print("--------------- --------------------------------------")
|
|
for inst in instances:
|
|
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()))
|
|
print("")
|
|
|
|
def print_instance_list_with_attribute_values(self, instances : list, oms : Oms, refobj : InstanceReference):
|
|
print("")
|
|
print("instance key global identifier value")
|
|
print("--------------- -------------------------------------- ---------")
|
|
for inst in instances:
|
|
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " " + str(oms.get_attribute_value(refobj, inst)))
|
|
print("")
|
|
|
|
def print_instance_list_with_relationship_values(self, instances : list, oms : Oms, refobj : InstanceReference):
|
|
print("")
|
|
print("instance key global identifier value")
|
|
print("--------------- -------------------------------------- ---------")
|
|
for inst in instances:
|
|
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " ", end = "")
|
|
|
|
rels = oms.get_related_instances(refobj, inst)
|
|
for rel in rels:
|
|
if rels.index(rel) > 0:
|
|
print (" ", end = "")
|
|
|
|
print(str(rel.get_instance_key()))
|
|
|
|
print("")
|
|
|
|
def print_error(self, text : str):
|
|
print (f"{Fore.RED}error: {Style.RESET_ALL}" + text)
|
|
|
|
def print_error_not_open(self):
|
|
self.print_error ("no database specified, use `create` or `open` first")
|
|
|
|
|
|
def __init__(self):
|
|
self.current_user = None
|
|
self.current_file = None
|
|
|
|
from mocha.oms.db.sqlite import SQLiteDatabaseOms
|
|
self.oms = SQLiteDatabaseOms()
|
|
|
|
def open_file(self, oms : Oms, filename : str):
|
|
|
|
if not os.path.exists(filename):
|
|
print ("file not found")
|
|
return False
|
|
|
|
oms.open(filename)
|
|
|
|
print("opening '" + filename + "'")
|
|
un = input("user name: ")
|
|
pw = getpass("password: ")
|
|
|
|
user = oms.get_user_by_username(un)
|
|
if user is not None:
|
|
hash = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordHash))
|
|
salt = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordSalt))
|
|
|
|
passsalt = str(pw + salt).encode("utf-8")
|
|
import hashlib
|
|
passhash = hashlib.sha512(passsalt).hexdigest()
|
|
|
|
if (hash == passhash):
|
|
self.current_user = un
|
|
self.current_file = filename
|
|
|
|
login = oms.create_instance_of(oms.get_instance_by_global_identifier(KnownClassGuids.UserLogin))
|
|
oms.assign_relationship(login, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User_Login__has__User), user)
|
|
# oms.assign_relationship(user, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User__for__User_Login), login)
|
|
|
|
token = Guid.create()
|
|
oms.set_attribute_value(login, oms.get_instance_by_global_identifier(KnownAttributeGuids.Token), str(token))
|
|
return True
|
|
|
|
print ("invalid user name or password")
|
|
self.current_file = None
|
|
|
|
oms.close()
|
|
return False
|
|
|
|
def strip(self, iid_or_gid : str) -> str:
|
|
if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]":
|
|
return iid_or_gid[1:-1]
|
|
|
|
return iid_or_gid
|
|
|
|
def parse(self, iid_or_gid : str) -> InstanceReference:
|
|
|
|
if len(iid_or_gid) < 3:
|
|
return None
|
|
|
|
if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]":
|
|
ik = InstanceKey.parse(self.strip(iid_or_gid))
|
|
return self.oms.get_instance_by_key(ik)
|
|
elif iid_or_gid[0] == "{" and iid_or_gid[-1] == "}":
|
|
ik = Guid.parse(iid_or_gid)
|
|
return self.oms.get_instance_by_global_identifier(ik)
|
|
|
|
def create_file(self, oms : Oms, filename : str):
|
|
self.oms.open(filename)
|
|
self.oms.init()
|
|
self.oms.close()
|
|
|
|
def before_start_internal(self):
|
|
print (f"latte v0.3 - {Fore.GREEN}Local Application Testing and Troubleshooting Environment{Style.RESET_ALL}")
|
|
|
|
if len(sys.argv) > 1:
|
|
if os.path.exists(sys.argv[1]):
|
|
self.current_file = sys.argv[1]
|
|
else:
|
|
print(f"{Fore.RED}error:{Style.RESET_ALL} file not found '" + sys.argv[1] + "'")
|
|
if input("would you like to create it? (Y/N) [N] > ").lower() == "y":
|
|
self.create_file(self.oms, sys.argv[1])
|
|
self.current_file = sys.argv[1]
|
|
|
|
if self.current_file is None:
|
|
self.print_error_not_open()
|
|
else:
|
|
self.open_file(self.oms, self.current_file)
|
|
|
|
print ("")
|
|
|
|
def get_prompt(self):
|
|
|
|
prompt = f"{Fore.GREEN}{Style.BRIGHT}" + os.getlogin() + f"@localhost{Style.RESET_ALL}:{Fore.BLUE}{Style.BRIGHT}" + format_cwd(os.getcwd()) + f"{Style.RESET_ALL}\n↳ latte "
|
|
if self.current_file is not None:
|
|
if self.current_user is not None:
|
|
prompt += f"{Fore.GREEN}" + self.current_user + f"{Style.RESET_ALL}" + "@"
|
|
prompt += f"{Fore.CYAN}" + self.current_file + f"{Style.RESET_ALL}" + " "
|
|
|
|
prompt += "> "
|
|
return prompt
|
|
|
|
def interactive_create_attribute(self, attr_class : InstanceReference):
|
|
|
|
att_Name = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.Name)
|
|
|
|
name = input("name: ")
|
|
if name == "":
|
|
print("please specify a name")
|
|
return None
|
|
|
|
attr = self.oms.create_instance_of(attr_class)
|
|
self.oms.set_attribute_value(attr, att_Name, name)
|
|
return attr
|
|
|
|
def process_input(self, value, quotes):
|
|
|
|
if len(value) > 0:
|
|
|
|
if value[0] == "cd":
|
|
os.chdir(parse_cwd(value[1]))
|
|
|
|
elif value[0] == "ls":
|
|
os.system("ls --color=always")
|
|
|
|
elif value[0] == "open":
|
|
|
|
if len(value) == 2:
|
|
|
|
if value[1] == "database":
|
|
|
|
if len(value) > 2:
|
|
|
|
self.open_file(self.oms, value[2])
|
|
return
|
|
|
|
else:
|
|
print("usage: open database|tenant 'filename'")
|
|
|
|
elif value[1] == "tenant":
|
|
|
|
print("error: not implemented")
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
# default to "open database ..."
|
|
self.open_file(self.oms, value[1])
|
|
return
|
|
|
|
|
|
else:
|
|
print("usage: open database|tenant 'filename'")
|
|
|
|
elif value[0] == "create":
|
|
|
|
if len (value) > 1:
|
|
|
|
self.create_file(self.oms, value[1])
|
|
|
|
self.open_file(self.oms, value[1])
|
|
|
|
else:
|
|
|
|
print("usage: create filename.mql")
|
|
# print ("error: no database open, use `open filename.mql` first")
|
|
|
|
elif value[0] == "instance":
|
|
|
|
if len(value) > 1:
|
|
|
|
if value[1] == "create":
|
|
|
|
if len(value) == 3:
|
|
|
|
iid = self.parse(value[2])
|
|
inst = self.oms.create_instance_of(iid)
|
|
print(inst)
|
|
|
|
else:
|
|
print("usage: instance create class_iid [index]")
|
|
|
|
elif value[1] == "get":
|
|
|
|
if len(value) == 3:
|
|
iid = self.parse(value[2])
|
|
print(iid)
|
|
|
|
elif value[1] == "list":
|
|
|
|
if not self.oms.is_open():
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
ik_of = None
|
|
if len(value) > 2:
|
|
ik_of = self.parse(value[2])
|
|
|
|
instances = self.oms.get_instances(ik_of)
|
|
self.print_instance_list(instances)
|
|
|
|
else:
|
|
|
|
print("usage: instance create|get|list")
|
|
|
|
elif value[0] == "attribute":
|
|
|
|
if len(value) > 1:
|
|
|
|
if value[1] == "create":
|
|
|
|
if len(value) > 2:
|
|
|
|
if value[2] == "text" or value[2] == "richtext":
|
|
|
|
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.TextAttribute)
|
|
if value[2] == "richtext":
|
|
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.RichTextAttribute)
|
|
|
|
attr = self.interactive_create_attribute(attrClass)
|
|
|
|
if attr is None:
|
|
return
|
|
|
|
att_MaximumLength = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MaximumLength)
|
|
|
|
maxlen = input("maximum length (leave empty for infinite): ")
|
|
if maxlen != "":
|
|
self.oms.set_attribute_value(attr, att_MaximumLength, int(maxlen))
|
|
|
|
elif value[2] == "boolean":
|
|
|
|
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.BooleanAttribute)
|
|
attr = self.interactive_create_attribute(attrClass)
|
|
|
|
if attr is None:
|
|
return
|
|
|
|
elif value[2] == "numeric":
|
|
|
|
attrClass = self.oms.get_instance_by_global_identifier(KnownClassGuids.NumericAttribute)
|
|
attr = self.interactive_create_attribute(attrClass)
|
|
|
|
if attr is None:
|
|
return
|
|
|
|
att_MinimumValue = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MinimumValue)
|
|
att_MaximumValue = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.MaximumValue)
|
|
|
|
if att_MinimumValue is None:
|
|
print("warning: numeric attribute `Minimum Value` is not defined")
|
|
else:
|
|
minimum_value = input("minimum value (empty for infinite): ")
|
|
if minimum_value != "":
|
|
self.oms.set_attribute_value(attr, att_MinimumValue, int(minimum_value))
|
|
|
|
if att_MaximumValue is None:
|
|
print("warning: numeric attribute `Maximum Value` is not defined")
|
|
else:
|
|
maximum_value = input("maximum value (empty for infinite): ")
|
|
if maximum_value != "":
|
|
self.oms.set_attribute_value(attr, att_MaximumValue, int(maximum_value))
|
|
|
|
else:
|
|
|
|
print("usage: attribute create text|boolean|numeric|date|xml|richtext")
|
|
|
|
else:
|
|
|
|
print("usage: attribute create text|boolean|numeric|date|xml|richtext")
|
|
|
|
if value[1] == "list":
|
|
|
|
if len(value) == 3:
|
|
|
|
inst_id = self.parse(value[2])
|
|
if inst_id is None:
|
|
print("invalid inst id " + value[2] + " (inst not found)")
|
|
return
|
|
|
|
attrs = self.oms.get_attributes(inst_id)
|
|
self.print_instance_list_with_attribute_values(attrs, self.oms, inst_id)
|
|
|
|
else:
|
|
|
|
print ("usage: attribute list instance_id")
|
|
|
|
elif value[1] == "get":
|
|
|
|
if len(value) == 4:
|
|
|
|
if not self.oms.is_open():
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
inst_id = self.parse(value[2])
|
|
att_id = self.parse(value[3])
|
|
|
|
if inst_id is None:
|
|
print("invalid inst id " + self.strip(value[2]) + " (inst not found)")
|
|
return
|
|
if att_id is None:
|
|
print("invalid inst id " + self.strip(value[3]) + " (inst not found)")
|
|
return
|
|
|
|
value = self.oms.get_attribute_value(inst_id, att_id)
|
|
print(value)
|
|
|
|
else:
|
|
|
|
print("usage: attribute get inst_iid|inst_gid att_iid|att_gid")
|
|
|
|
elif value[1] == "set":
|
|
|
|
if len(value) == 5:
|
|
|
|
if not self.oms.is_open():
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
inst_id = self.parse(value[2])
|
|
att_id = self.parse(value[3])
|
|
value = value[4]
|
|
|
|
if inst_id is None:
|
|
print("invalid inst id " + self.strip(value[2]) + " (inst not found)")
|
|
return
|
|
if att_id is None:
|
|
print("invalid inst id " + self.strip(value[3]) + " (inst not found)")
|
|
return
|
|
|
|
result = self.oms.set_attribute_value(inst_id, att_id, value)
|
|
if result.success:
|
|
print("ok")
|
|
else:
|
|
print ("error: " + result.message)
|
|
|
|
else:
|
|
|
|
print("usage: attribute set inst_iid|inst_gid att_iid|att_gid value")
|
|
else:
|
|
|
|
print("usage: attribute list|get|set")
|
|
|
|
elif value[0] == "relationship":
|
|
|
|
if value[1] == "list":
|
|
|
|
if len(value) == 3:
|
|
|
|
inst_id = self.parse(value[2])
|
|
if inst_id is None:
|
|
print("invalid inst id " + value[2] + " (inst not found)")
|
|
return
|
|
|
|
rels = self.oms.get_relationships(inst_id)
|
|
self.print_instance_list_with_relationship_values(rels, self.oms, inst_id)
|
|
|
|
else:
|
|
|
|
print ("usage: relationship list instance_id")
|
|
|
|
elif value[0] == "user":
|
|
|
|
if value[1] == "list":
|
|
|
|
ik_User = self.oms.get_instance_by_global_identifier(KnownClassGuids.User)
|
|
ik_UserName = self.oms.get_instance_by_global_identifier(KnownAttributeGuids.UserName)
|
|
|
|
users = self.oms.get_instances(ik_User)
|
|
if users is None:
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
print ("user name ")
|
|
print ("---------------------")
|
|
for user in users:
|
|
user_name = self.oms.get_attribute_value(user, ik_UserName)
|
|
print(user_name)
|
|
|
|
return
|
|
|
|
elif value[1] == "set-password":
|
|
|
|
if len(value) > 2:
|
|
|
|
my_username = value[2]
|
|
my_user = self.oms.get_user_by_username(my_username)
|
|
|
|
if my_user is None:
|
|
print("passwd: user '" + my_username + "' does not exist")
|
|
return
|
|
|
|
print("Changing password for " + my_username + ".")
|
|
pw = getpass("New password: ")
|
|
pw2 = getpass("Confirm new password: ")
|
|
if pw == pw2:
|
|
|
|
self.oms.set_user_password(my_user, pw)
|
|
print("passwd: password updated successfully")
|
|
|
|
else:
|
|
print("Sorry, passwords do not match.")
|
|
return
|
|
|
|
elif value[0] == "close":
|
|
self.oms.close()
|
|
|
|
else:
|
|
|
|
print("invalid cmd '" + value[0] + "'")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
prog = MochaShell()
|
|
prog.start() |