import hashlib import json import logging import math import os import random import re import resource import socket import string import subprocess from datetime import datetime, timezone from decimal import getcontext from pathlib import Path from django.contrib.auth.models import User from troggle.core.models.troggle import Expedition from troggle.core.models.troggle import Person getcontext().prec = 2 # use 2 significant figures for decimal calculations import settings """This file declares TROG a globally visible object for caches. TROG is a dictionary holding globally visible indexes and cache functions. It is a Global Object, see https://python-patterns.guide/python/module-globals/ troggle.utils.TROG chaosmonkey(n) - used by survex import to regenerate some .3d files various git add/commit functions that need refactoring together NOTE that TROG is not serialized! Two users can update it and conflict !! This needs to be in a multi-user database with transactions. However it may be useful when doing a data import with databaseReset.py as that has a single thread. TIMEZONES Read this now: https://nerderati.com/a-python-epoch-timestamp-timezone-trap/ """ TROG = {"pagecache": {"expedition": {}}, "caves": {"gcavelookup": {}, "gcavecount": {}}} alphabet = [] sha = hashlib.new('sha256') COOKIE_MAX_AGE = 2*365*24*60*60 # seconds COOKIE_SHORT_TIMEOUT = 60*60 # seconds PUBLIC_LAPTOP_COOKIE_NAME = "public_laptop" PUBLIC_LAPTOP_COOKIE_TEXT = "this is a public laptop" throw = 35.0 EXPOSERVER = "expo" # hostname of the server at expo.survex.com DEV_OK = """On branch master Your branch is ahead of 'origin/master' by 1 commit. (use "git push" to publish your local commits) nothing to commit, working tree clean """ class DatabaseResetOngoing(Exception): """Exception class for errors while the server is reimporting everything""" def __init__(self, message): if message: self.message = message else: self.message = f"Expo Database re-import ongoing on server.\nPlease wait 7 minutes.\n If still not working, contact a nerd." def __str__(self): return f"DatabaseResetOngoing: {self.message}" # This is module-level executable. This is a Bad Thing. Especially when it touches the file system. try: logging.basicConfig(level=logging.DEBUG, filename=settings.LOGFILE, filemode="w") except: # Opening of file for writing is going to fail currently, so decide it doesn't matter for now pass def get_cookie_max_age(request): """This is where we detect whether the machine the user is using is a shared-use device or a personal device. If it is shared-use, then we set a much shorter cookie timout period. """ if shared_use_machine(request): return COOKIE_SHORT_TIMEOUT else: return COOKIE_MAX_AGE def sanitize_name(name): """Filenames sould not contain these characters as then the system barf when it tries to use them in URLs """ return name.replace("#","-").replace("?","=").replace("&","+").replace(":","^") def get_process_memory(): usage = resource.getrusage(resource.RUSAGE_SELF) return usage[2] / 1024.0 def chaosmonkey(n): """returns True once every n calls - randomly""" if random.randrange(0, n) != 0: return False # print("CHAOS strikes !", file=sys.stderr) return True def unique_slug(text, n): """This gives an almost-unique id based on the text, 2 hex digits would seem adequate, but we might get a collision. Deterministic """ sha.update(text.encode('utf-8')) return sha.hexdigest()[0:n] def random_slug(text, n): """This gives an almost-unique id based on the text, 2 hex digits would seem adequate, but we might get a collision. Random """ text = text + alphabet_suffix(3) sha.update(text.encode('utf-8')) return sha.hexdigest()[0:n] def alphabet_suffix(n): """This is called repeatedly during initial parsing import, hence the cached list """ global alphabet if not alphabet: alphabet = list(string.ascii_lowercase) if n < len(alphabet) and n > 0: suffix = alphabet[n-1] else: suffix = "_X_" + random.choice(string.ascii_lowercase) + random.choice(string.ascii_lowercase) return suffix def wrap_text(text): """ Splits a long string into multiple lines, ensuring that each line is no more than 70 characters long. Newline characters are inserted immediately before spaces to prevent breaking words. Parameters: text (str): The input string that needs to be wrapped. Returns: str: The input string formatted with newline characters such that each line does not exceed 70 characters in length. Functionality: 1. The function first splits the input string into individual words. 2. It iterates through the words and adds them to the current line, ensuring the line's length does not exceed 70 characters. 3. If adding a word would cause the line to exceed the limit, the current line is added to a list of lines, and a new line is started with the word. 4. The process repeats until all words are processed, and the final line is added. 5. The list of lines is then joined with newline characters and returned as a single formatted string. This function written by CoPilot. BUT interactions between existing \n characters makes this much less simple than it appears. """ return text # abort all wrap processing pending proper redesign words = text.split(' ') lines = [] current_line = "" for word in words: # Check if adding the word exceeds 70 characters if len(current_line) + len(word) + 1 <= 70: if current_line: current_line += ' ' current_line += word else: lines.append(current_line.strip(' ')) current_line = word # Add the last line if current_line: lines.append(current_line.strip(' ')) # Join the lines with newline characters return '\n'.join(lines) def make_new_expo(year): coUniqueAttribs = {"year": year} otherAttribs = {"name": f"CUCC expo {year}", "logbookfile": "logbook.html"} e = Expedition.objects.create(**otherAttribs, **coUniqueAttribs) u = User.objects.get(username='expo') u.current_year = year u.save() def make_new_expo_dir(year): pages = ["index", "logbook", "mission", "travel"] t = "
" for ff in pages: t += f"{ff}
\n" year_dir = Path(settings.EXPOWEB, 'years', year) if not year_dir.is_dir(): year_dir.mkdir(parents=True, exist_ok=True) for ff in pages: content = f"{year} {ff}

{ff}

Add content here.{t}" p = Path(year_dir, ff+".html") if not p.is_file(): write_and_commit( [(p, content, "utf8")], f"Auto new year {ff} file creation", "Auto New Year ") def current_expo(): """Returns the current expo year, but also checks if the most recent expo year is the same as this year. If it is not, then it creates an empty Expedition and fixes some files and folders. If we are more than one year out of date, it creates all intervening Expo objects and folders. You will need to tidy this up manually. """ expos = Expedition.objects.all().order_by('-year') if expos: year = str(datetime.now(timezone.utc).year) last_expo = expos[0].year if int(last_expo) < int(year): # coming year, after Dec.31st for y in range(int(last_expo)+1, int(year)+1): #print(f"--!{year}---") make_new_expo(str(y)) make_new_expo_dir(str(y)) # print(f"---{year}---") return year else: return settings.EPOCH.year # this is 1970 def parse_aliases(aliasfile): """Reads a long text string containing pairs of strings: (alias, target) where the alias is an old name for a cave and the target is the current, valid name for the cave, e.g. ("2015-mf-06", "1623-288"), returns a list of tuple pairs May fail if there is more than one space after the comma separating strings """ report = [] # Messages about failed lines aliases = [] filepath = Path(settings.EXPOWEB) / "cave_data" / aliasfile if not filepath.is_file(): message = f' ** {filepath} is not a file.' print(message) return [(None, None)] try: with open(filepath, "r") as aliasfile: for line in aliasfile: l, sep, tail = line.partition('#') l, sep, tail = l.partition(';') l = l.strip().strip(',') # remove terminal comma if present l = l.strip().strip('()') l = l.replace("\"","") l = l.replace("\'","") l = l.replace(" ","") # removes all spaces l = " ".join(l.split(',')) # subtle, splits on comma, joins with one space if len(l) == 0: # print(f"no parseable content: {line}") continue key, sep, target = l.partition(' ') if len(key) == 0 or len(target) == 0: message = f' ** Parsing failure for {line}' print(message) continue # print(f"{key} => {target}") aliases.append((key,target)) except: message = f' ** Cannot open {filepath} for text file reading even though it is a file.' print(message) return [(None, None)], "Fail on file reading" return aliases, report def get_editor(request): current_user = request.user if is_identified_user(current_user): return get_git_string(current_user) else: return get_cookie(request) def is_identified_user(user): if user.is_anonymous: return False if user.username in ["expo", "expoadmin"]: return False return True def is_admin_user(user): if user.is_anonymous: return False if user.username in ["expoadmin"]: return True if user.is_superuser: # set in parsers/users.py i.e. Wookey, Philip S. return True return False def get_git_string(user): if not is_identified_user(user): return None else: people = Person.objects.filter(user=user) if len(people) != 1: # someone like "fluffy-bunny" not associated with a Person return None person = people[0] return f"{person.fullname} <{user.email}>" def shared_use_machine(request): """Looks for a cookie which only exists on shared use machines """ if not request: # temporary while rolling out implementation to all calling functions return False if not (cookie_txt := request.COOKIES.get(PUBLIC_LAPTOP_COOKIE_NAME, "")): return False elif cookie_txt == PUBLIC_LAPTOP_COOKIE_TEXT: print(f" - shared use cookie exists, and has expected value: '{cookie_txt}'") return True else: print(f" - shared use cookie exists, but has wrong value: '{cookie_txt}' not '{PUBLIC_LAPTOP_COOKIE_TEXT}'") return True def get_cookie(request): """The initial idea of having a default turned out to be a bad idea as people just ignore the field. If the default value is blank, then the form validation code makes the user type something in. So having a blank is best if the cookie is to be displayed as the first value seen on a form. But if the cookie is to be stored, then "Unset" may be better. """ # NO_COOKIE_DEFAULT = 'Unset Cookie ' # print(f"-- Getting cookie...") editor_id = request.COOKIES.get('editor_id', "") editor = git_string(editor_id) # belt and braces, should have been validity checked on saving already # print(f"-- Cookie to be used: {editor=}") return editor def git_string(author_string): """Rewrites the supplied editor string into a git-complient author string Uses a regular expression for a git-compatible author string written mostly by Copilot valid example "John Doe " """ author_regex = re.compile(r'^[a-zA-Z][\w\s\_\.\-]* <[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-_]+\.[a-zA-Z]{2,}>$') if author_string.startswith("Unset"): # clean out laziness in users' PCs return "" if author_regex.match(author_string): # print(f"++ Is valid git-compatible author string: '{author_string}'") return author_string else: editor = author_string.replace("@","_at_") editor = re.sub('[^0-9a-zA-Z_]+', '_', editor) if editor.startswith("_"): editor = "X" + editor editor += f" <{editor}@potatohut.expo>" print(f"++ Not git-compatible author string '{author_string}', replacing as '{editor}'") return editor def git_add(filename, cwd, commands=[]): """Add a file to the list of Staged files ready for a later git commit """ git = settings.GIT # what is the purpose of this 'git diff' ? To prevent merge conflicts happening I guess, # so we do not have to reverse a 'git add' # print(f"git diff {filename} in {cwd}") cmd_diff = [git, "diff", filename] commands.append(cmd_diff) cp_diff = subprocess.run(cmd_diff, cwd=cwd, capture_output=True, text=True) if cp_diff.returncode != 0: msgdata = f"Ask a nerd to fix this DIFF problem in git_add().\n--{cp_diff.stderr}\n--{cp_diff.stdout}\n--return code:{str(cp_diff.returncode)}" raise WriteAndCommitError( f"CANNOT git ADD on server for this file {filename}.\n\n" + msgdata ) print(f"git add {filename} in {cwd}") cmd_add = [git, "add", filename] commands.append(cmd_add) cp_add = subprocess.run(cmd_add, cwd=cwd, capture_output=True, text=True) if cp_add.returncode != 0: msgdata = f"Ask a nerd to fix this ADD problem in git_add().\n--{cp_add.stderr}\n--{cp_add.stdout}\n--return code:{str(cp_add.returncode)}" raise WriteAndCommitError( f"CANNOT git ADD on server for this file {filename}.\n\n" + msgdata ) return commands def git_commit(cwd, message, editor, commands=[]): """Commits whatever has been Staged by git in this directory 'cwd' """ git = settings.GIT print(f"git commit in {cwd}") if socket.gethostname() != EXPOSERVER: message += f" - on dev machine '{socket.gethostname()}'" elif settings.DEVSERVER: message += " - on a dev machine using 'runserver'" print(f"..{message=}\n..{editor=}") cmd_commit = [git, "commit", "-m", message, "--author", f"{editor}"] commands.append(cmd_commit) cp_commit = subprocess.run(cmd_commit, cwd=cwd, capture_output=True, text=True) # This produces return code = 1 if it commits OK, but when the local repo still needs to be pushed to origin/repo # which will be the case when running a test troggle system on a development machine # Several ways of testing if the commit failed # if cp_commit.stdout.split("\n")[-2] != "nothing to commit, working tree clean": # if cp_commit.returncode == 1 and cp_commit.stdout == DEV_OK: # only good for 1 commit ahead of origin/repo if cp_commit.returncode != 0 and not cp_commit.stdout.strip().endswith( "nothing to commit, working tree clean" ): msgdata = f'--Ask a nerd to fix this problem in git_commit().\n--{cp_commit.stderr}\n--"{cp_commit.stdout}"\n--return code:{str(cp_commit.returncode)}' print(msgdata) raise WriteAndCommitError( f"Error code with git on server in this directory: {cwd}. Edits saved, added to git, but NOT committed.\n\n" + msgdata ) return commands def add_commit(fname, message, editor): """Used to commit a survex file edited and saved in view/survex.py, also contents.json for an edited wallet, also encrypted.json for an edited user registration """ cwd = fname.parent filename = fname.name commands = [] if editor: editor = git_string(editor) else: # 'cannot happen' as form verification has this is an obligatory field editor = "Anathema Device " try: # print(f"add_commit: {editor=} {filename=} {cwd=}") commands = git_add(filename, cwd, commands) commands = git_commit(cwd, message, editor, commands) except subprocess.SubprocessError: msg = f"CANNOT git ADD or COMMIT on server for this file {filename}.\nSubprocess error: {commands}\nEdits probably not saved.\nAsk a nerd to fix this." print(msg) raise WriteAndCommitError(msg) def write_binary_file(filepath, content): print(f"write_binary_file: {filepath}") write_files([(filepath, content, "")]) # null encoding does "wb" def ensure_dir_exists(filepath): """Takes a filepath for a file and all the parent directiories, makes any directories necessary to make the filepath valid """ if filepath.is_dir(): raise OSError( f"CANNOT write this file {filepath} as this is an existing DIRECTORY." ) try: filepath.parent.mkdir(parents=True, exist_ok=True) # os.makedirs(os.path.dirname(filepath), exist_ok = True) except PermissionError as e: raise PermissionError( f"CANNOT make the directory.\nPERMISSIONS incorrectly set on server for this file {filepath}. Ask a nerd to fix this: {e}" ) except Exception as e: raise OSError( f"CANNOT make the directory for {filepath}. Ask a nerd to fix this: {e}" ) def write_files(files): for filepath, content, encoding in files: filename = filepath.name ensure_dir_exists(filepath) if encoding: mode = "w" kwargs = {"encoding": encoding} else: mode = "wb" kwargs = {} try: with open(filepath, mode, **kwargs) as f: # print(f"WRITING {filepath} ") f.write(content) except PermissionError as e: raise PermissionError( f"CANNOT save this file.\nPERMISSIONS incorrectly set on server for this file {filepath}. Ask a nerd to fix this: {e}" ) except Exception as e: raise OSError( f"CANNOT write this file {filepath}. Ask a nerd to fix this: {e}" ) return def write_and_commit(files, message, editor): """For each file in files, it writes the content to the filepath and adds and commits the file to git. filepath, content, encoding = file If this fails, a WriteAndCommitError is raised. message - the "-m" comment field for the git commit editor - the "--author" field for the git commit """ # GIT see also core/views/uploads.py dwgupload() # GIT see also core/views/expo.py editexpopage() git = settings.GIT commands = [] editor = git_string(editor) write_files(files) try: for filepath, content, encoding in files: cwd = filepath.parent filename = filepath.name commands = git_add(filename, cwd, commands) commands = git_commit(cwd, message, editor, commands) except subprocess.SubprocessError: raise WriteAndCommitError( f"CANNOT git on server for this file {filename}. Subprocess error. Edits not saved.\nAsk a nerd to fix this." ) return True class WriteAndCommitError(Exception): """Exception class for errors writing files and comitting them to git""" def __init__(self, message): self.message = message def __str__(self): return f"WriteAndCommitError: {self.message}" """The following is a Bard converted version of Radosts's MIT copyrighted Javascript on 2023-10-27 with hand-editing. >>> height_from_utm(326123.456, 4896789.012) 1234.56 (reference: 100m away) """ def height_from_utm(easting, northing): """Returns the height from UTM coordinates. This does NO interpolation. It just return the height of the nearest point in the height data list""" global rawheights res = find_nearest_point(rawheights, (easting, northing)) winner, distance = res #return f"{winner} (reference: {(distance).toFixed(0)}m away)" return winner, distance def find_nearest_point(points, target_point): """Returns the nearest point to a target point from a list of points. TODO FIND OUT 1. is this SRTM data ? TICK. Yes. 2. what is the zero altitude datum? Geoid or ellisoid ? Do we need to subtract 47m ?? 3. remove all these numbers from the .py file as it is confusing the code length calcs In our dataset, the survey stations are all within 30m of an srtm reference point. So we can safely ignore points more than 100m away in either x or y directions. This is not true for the 1624 and 1627 areas though. TO DO: store this list twice, once sorted by x and once sorted by y, do a bounding box search for a set of nearby points before doing the exhaustive pythagorean distance search.""" nearest_distance_squared = float("inf") nearest_point = None x_target, y_target = target_point max_ds = throw * throw # 1089 = 33 x 33 for point in points: x, y, z = point if z < 0: distance_squared = 1000000.0 # ie ignore it else: if x - x_target > throw: distance_squared = max_ds elif y - y_target > throw: distance_squared = max_ds else: distance_squared = math.pow(x - x_target, 2) + math.pow(y - y_target, 2) if distance_squared < nearest_distance_squared: nearest_distance_squared = distance_squared nearest_point_alt = z nearest_distance = math.sqrt(nearest_distance_squared) return nearest_point_alt, nearest_distance # This data was extarcted by Radost: # The original source of heights (and positions is the file) # loser/surface/terrain/SRTM-7x9-M31-482-279-offset.svx """ ; Survex terrain data created by TerrainTool version 1.12a from Shuttle Radar Topography Mission data ; SRTM DEM data is public-domain. ; Used coordinate system AustrianM31 with ellipsoid "Bessel 1841" and datum "MGI Datum (Austria)" ; Grid centred at XXX 488000 283750 ; Lat/Long of centre = 47.691318 13.838777 ( 13 E 50' 19.6" 47 N 41' 28.7") relative to WGS84/GRS80 datum ; Grid convergence at centre = -0.374 degrees ( 0 W 22' 27.9") ; Point scale factor at centre = 1.0000177 ; Output offset by adding (-450000.00,-200000.00, 0.00) to calculated results; ; TerrainTool (c) 2008 - 2012 Mike McCombe """ # Load the data from the JSON file hpath = settings.TROGGLE_PATH / "core" / "heights.json" with open(hpath, 'r') as file: rawheights = json.load(file) # Now you can use rawheights as before # print(rawheights)