367 lines
10 KiB
Python
367 lines
10 KiB
Python
#
|
|
# mocha-shell - the 'latte' python REPL shell for mocha
|
|
# Copyright (C) 2024 Michael Becker
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from mocha.core import InstanceKey, InstanceReference
|
|
from mocha.oms import Oms
|
|
from mocha.definitions import KnownClassGuids, KnownInstanceGuids, KnownAttributeGuids, KnownRelationshipGuids
|
|
|
|
from framework import REPLApplication, Guid, format_cwd, parse_cwd
|
|
|
|
import atexit
|
|
import colorama
|
|
import os
|
|
import readline
|
|
import sys
|
|
|
|
from colorama import Fore, Style
|
|
from getpass import getpass
|
|
|
|
configpath = os.path.join(os.path.expanduser("~"), ".config", "latte")
|
|
if not os.path.exists(configpath):
|
|
os.mkdir(configpath)
|
|
|
|
histfile = os.path.join(configpath, "history")
|
|
|
|
try:
|
|
readline.read_history_file(histfile)
|
|
# default history len is -1 (infinite), which may grow unruly
|
|
readline.set_history_length(1000)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
atexit.register(readline.write_history_file, histfile)
|
|
|
|
colorama.init()
|
|
|
|
def clear():
|
|
print("\033c", end='')
|
|
|
|
class MochaShell (REPLApplication):
|
|
|
|
def print_instance_list(self, instances : list):
|
|
print("")
|
|
print("instance key global identifier ")
|
|
print("--------------- --------------------------------------")
|
|
for inst in instances:
|
|
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()))
|
|
print("")
|
|
|
|
def print_instance_list_with_attribute_values(self, instances : list, oms : Oms, refobj : InstanceReference):
|
|
print("")
|
|
print("instance key global identifier value")
|
|
print("--------------- -------------------------------------- ---------")
|
|
for inst in instances:
|
|
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " " + str(oms.get_attribute_value(refobj, inst)))
|
|
print("")
|
|
|
|
def print_instance_list_with_relationship_values(self, instances : list, oms : Oms, refobj : InstanceReference):
|
|
print("")
|
|
print("instance key global identifier value")
|
|
print("--------------- -------------------------------------- ---------")
|
|
for inst in instances:
|
|
print(str(inst.get_instance_key()).ljust(16) + str(inst.get_global_identifier()) + " ", end = "")
|
|
|
|
rels = oms.get_related_instances(refobj, inst)
|
|
for rel in rels:
|
|
if rels.index(rel) > 0:
|
|
print (" ", end = "")
|
|
|
|
print(str(rel.get_instance_key()))
|
|
|
|
print("")
|
|
|
|
def print_error(self, text : str):
|
|
print (f"{Fore.RED}error: {Style.RESET_ALL}" + text)
|
|
|
|
def print_error_not_open(self):
|
|
self.print_error ("no database specified, use `create` or `open` first")
|
|
|
|
|
|
def __init__(self):
|
|
self.current_user = None
|
|
self.current_file = None
|
|
|
|
from mocha.oms.db.sqlite import SQLiteDatabaseOms
|
|
self.oms = SQLiteDatabaseOms()
|
|
|
|
def open_file(self, oms : Oms, filename : str):
|
|
|
|
if not os.path.exists(filename):
|
|
print ("file not found")
|
|
return False
|
|
|
|
oms.open(filename)
|
|
|
|
print("opening '" + filename + "'")
|
|
un = input("user name: ")
|
|
pw = getpass("password: ")
|
|
|
|
user = oms.get_user_by_username(un)
|
|
if user is not None:
|
|
hash = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordHash))
|
|
salt = oms.get_attribute_value(user, oms.get_instance_by_global_identifier(KnownAttributeGuids.PasswordSalt))
|
|
|
|
passsalt = str(pw + salt).encode("utf-8")
|
|
import hashlib
|
|
passhash = hashlib.sha512(passsalt).hexdigest()
|
|
|
|
if (hash == passhash):
|
|
self.current_user = un
|
|
self.current_file = filename
|
|
|
|
login = oms.create_instance_of(oms.get_instance_by_global_identifier(KnownClassGuids.UserLogin))
|
|
oms.assign_relationship(login, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User_Login__has__User), user)
|
|
# oms.assign_relationship(user, oms.get_instance_by_global_identifier(KnownRelationshipGuids.User__for__User_Login), login)
|
|
|
|
token = Guid.create()
|
|
oms.set_attribute_value(login, oms.get_instance_by_global_identifier(KnownAttributeGuids.Token), str(token))
|
|
return True
|
|
|
|
print ("invalid user name or password")
|
|
self.current_file = None
|
|
|
|
oms.close()
|
|
return False
|
|
|
|
def strip(self, iid_or_gid : str) -> str:
|
|
if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]":
|
|
return iid_or_gid[1:-1]
|
|
|
|
return iid_or_gid
|
|
|
|
def parse(self, iid_or_gid : str) -> InstanceReference:
|
|
|
|
if len(iid_or_gid) < 3:
|
|
return None
|
|
|
|
if iid_or_gid[0] == "[" and iid_or_gid[-1] == "]":
|
|
ik = InstanceKey.parse(self.strip(iid_or_gid))
|
|
return self.oms.get_instance_by_key(ik)
|
|
elif iid_or_gid[0] == "{" and iid_or_gid[-1] == "}":
|
|
ik = Guid.parse(iid_or_gid)
|
|
return self.oms.get_instance_by_global_identifier(ik)
|
|
|
|
def create_file(self, oms : Oms, filename : str):
|
|
self.oms.open(filename)
|
|
self.oms.init()
|
|
self.oms.close()
|
|
|
|
def before_start_internal(self):
|
|
print (f"latte v0.3 - {Fore.GREEN}Local Application Testing and Troubleshooting Environment{Style.RESET_ALL}")
|
|
|
|
if len(sys.argv) > 1:
|
|
if os.path.exists(sys.argv[1]):
|
|
self.current_file = sys.argv[1]
|
|
else:
|
|
print(f"{Fore.RED}error:{Style.RESET_ALL} file not found '" + sys.argv[1] + "'")
|
|
if input("would you like to create it? (Y/N) [N] > ").lower() == "y":
|
|
self.create_file(self.oms, sys.argv[1])
|
|
self.current_file = sys.argv[1]
|
|
|
|
if self.current_file is None:
|
|
self.print_error_not_open()
|
|
else:
|
|
self.open_file(self.oms, self.current_file)
|
|
|
|
print ("")
|
|
|
|
def get_prompt(self):
|
|
|
|
prompt = f"{Fore.GREEN}{Style.BRIGHT}" + os.getlogin() + f"@localhost{Style.RESET_ALL}:{Fore.BLUE}{Style.BRIGHT}" + format_cwd(os.getcwd()) + f"{Style.RESET_ALL}\n↳ latte "
|
|
if self.current_file is not None:
|
|
if self.current_user is not None:
|
|
prompt += f"{Fore.GREEN}" + self.current_user + f"{Style.RESET_ALL}" + "@"
|
|
prompt += f"{Fore.CYAN}" + self.current_file + f"{Style.RESET_ALL}" + " "
|
|
|
|
prompt += "> "
|
|
return prompt
|
|
|
|
def process_input(self, value):
|
|
parms = value.split(' ')
|
|
|
|
if len(parms) > 0:
|
|
|
|
if parms[0] == "cd":
|
|
os.chdir(parse_cwd(parms[1]))
|
|
|
|
elif parms[0] == "ls":
|
|
os.system("ls --color=always")
|
|
|
|
elif parms[0] == "open":
|
|
|
|
if len(parms) == 2:
|
|
|
|
if parms[1] == "database":
|
|
|
|
if len(parms) > 2:
|
|
|
|
self.open_file(self.oms, parms[2])
|
|
return
|
|
|
|
else:
|
|
print("usage: open database|tenant 'filename'")
|
|
|
|
elif parms[1] == "tenant":
|
|
|
|
print("error: not implemented")
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
# default to "open database ..."
|
|
self.open_file(self.oms, parms[1])
|
|
return
|
|
|
|
|
|
else:
|
|
print("usage: open database|tenant 'filename'")
|
|
|
|
elif parms[0] == "create":
|
|
|
|
self.create_file(self.oms, parms[1])
|
|
|
|
self.open_file(self.oms, parms[1])
|
|
|
|
# print ("error: no database open, use `open filename.mql` first")
|
|
|
|
elif parms[0] == "instance":
|
|
|
|
if parms[1] == "create":
|
|
|
|
if len(parms) == 3:
|
|
|
|
iid = self.parse(parms[2])
|
|
inst = self.oms.create_instance_of(iid)
|
|
print(inst)
|
|
|
|
else:
|
|
print("usage: instance create class_iid [index]")
|
|
|
|
elif parms[1] == "get":
|
|
|
|
if len(parms) == 3:
|
|
iid = self.parse(parms[2])
|
|
print(iid)
|
|
|
|
elif parms[1] == "list":
|
|
|
|
if not self.oms.is_open():
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
instances = self.oms.get_instances()
|
|
self.print_instance_list(instances)
|
|
|
|
elif parms[0] == "attribute":
|
|
|
|
if parms[1] == "list":
|
|
|
|
if len(parms) == 3:
|
|
|
|
inst_id = self.parse(parms[2])
|
|
if inst_id is None:
|
|
print("invalid inst id " + parms[2] + " (inst not found)")
|
|
return
|
|
|
|
attrs = self.oms.get_attributes(inst_id)
|
|
self.print_instance_list_with_attribute_values(attrs, self.oms, inst_id)
|
|
|
|
else:
|
|
|
|
print ("usage: attribute list instance_id")
|
|
|
|
elif parms[1] == "get":
|
|
|
|
if len(parms) == 4:
|
|
|
|
if not self.oms.is_open():
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
inst_id = self.parse(parms[2])
|
|
att_id = self.parse(parms[3])
|
|
|
|
if inst_id is None:
|
|
print("invalid inst id " + self.strip(parms[2]) + " (inst not found)")
|
|
return
|
|
if att_id is None:
|
|
print("invalid inst id " + self.strip(parms[3]) + " (inst not found)")
|
|
return
|
|
|
|
value = self.oms.get_attribute_value(inst_id, att_id)
|
|
print(value)
|
|
|
|
else:
|
|
|
|
print("usage: attribute get inst_iid|inst_gid att_iid|att_gid")
|
|
|
|
elif parms[1] == "set":
|
|
|
|
if len(parms) == 5:
|
|
|
|
if not self.oms.is_open():
|
|
self.print_error_not_open()
|
|
return
|
|
|
|
inst_id = self.parse(parms[2])
|
|
att_id = self.parse(parms[3])
|
|
value = parms[4]
|
|
|
|
if inst_id is None:
|
|
print("invalid inst id " + self.strip(parms[2]) + " (inst not found)")
|
|
return
|
|
if att_id is None:
|
|
print("invalid inst id " + self.strip(parms[3]) + " (inst not found)")
|
|
return
|
|
|
|
self.oms.set_attribute_value(inst_id, att_id, value)
|
|
print("ok")
|
|
|
|
else:
|
|
|
|
print("usage: attribute set inst_iid|inst_gid att_iid|att_gid value")
|
|
|
|
elif parms[0] == "relationship":
|
|
|
|
if parms[1] == "list":
|
|
|
|
if len(parms) == 3:
|
|
|
|
inst_id = self.parse(parms[2])
|
|
if inst_id is None:
|
|
print("invalid inst id " + parms[2] + " (inst not found)")
|
|
return
|
|
|
|
rels = self.oms.get_relationships(inst_id)
|
|
self.print_instance_list_with_relationship_values(rels, self.oms, inst_id)
|
|
|
|
else:
|
|
|
|
print ("usage: relationship list instance_id")
|
|
|
|
elif parms[0] == "close":
|
|
self.oms.close()
|
|
|
|
else:
|
|
|
|
print("invalid cmd '" + parms[0] + "'")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
prog = MochaShell()
|
|
prog.start() |