import hashlib
import json
import logging
import math
import os
import random
import re
import resource
import socket
import string
import subprocess
from datetime import datetime, timezone
from decimal import getcontext
from pathlib import Path
from django.contrib.auth.models import User
from troggle.core.models.troggle import Expedition
from troggle.core.models.troggle import Person
getcontext().prec = 2 # use 2 significant figures for decimal calculations
import settings
"""This file declares TROG a globally visible object for caches.
TROG is a dictionary holding globally visible indexes and cache functions.
It is a Global Object, see https://python-patterns.guide/python/module-globals/
troggle.utils.TROG
chaosmonkey(n) - used by survex import to regenerate some .3d files
various git add/commit functions that need refactoring together
NOTE that TROG is not serialized! Two users can update it and conflict !!
This needs to be in a multi-user database with transactions. However it may be
useful when doing a data import with databaseReset.py as that has a single
thread.
TIMEZONES
Read this now: https://nerderati.com/a-python-epoch-timestamp-timezone-trap/
"""
TROG = {"pagecache": {"expedition": {}}, "caves": {"gcavelookup": {}, "gcavecount": {}}}
alphabet = []
sha = hashlib.new('sha256')
COOKIE_MAX_AGE = 2*365*24*60*60 # seconds
COOKIE_SHORT_TIMEOUT = 60*60 # seconds
PUBLIC_LAPTOP_COOKIE_NAME = "public_laptop"
PUBLIC_LAPTOP_COOKIE_TEXT = "this is a public laptop"
throw = 35.0
EXPOSERVER = "expo" # hostname of the server at expo.survex.com
DEV_OK = """On branch master
Your branch is ahead of 'origin/master' by 1 commit.
(use "git push" to publish your local commits)
nothing to commit, working tree clean
"""
class DatabaseResetOngoing(Exception):
"""Exception class for errors while the server is reimporting everything"""
def __init__(self, message):
if message:
self.message = message
else:
self.message = f"Expo Database re-import ongoing on server.\nPlease wait 7 minutes.\n If still not working, contact a nerd."
def __str__(self):
return f"DatabaseResetOngoing: {self.message}"
# This is module-level executable. This is a Bad Thing. Especially when it touches the file system.
try:
logging.basicConfig(level=logging.DEBUG, filename=settings.LOGFILE, filemode="w")
except:
# Opening of file for writing is going to fail currently, so decide it doesn't matter for now
pass
def get_cookie_max_age(request=None):
"""This is where we detect whether the machine the user is using is a shared-use device or a personal device.
If it is shared-use, then we set a much shorter cookie timout period.
"""
if shared_use_machine(request):
return COOKIE_SHORT_TIMEOUT
else:
return COOKIE_MAX_AGE
def sanitize_name(name):
"""Filenames sould not contain these characters as then the system barf when it tries to use them in URLs
"""
return name.replace("#","-").replace("?","=").replace("&","+").replace(":","^")
def get_process_memory():
usage = resource.getrusage(resource.RUSAGE_SELF)
return usage[2] / 1024.0
def chaosmonkey(n):
"""returns True once every n calls - randomly"""
if random.randrange(0, n) != 0:
return False
# print("CHAOS strikes !", file=sys.stderr)
return True
def unique_slug(text, n):
"""This gives an almost-unique id based on the text,
2 hex digits would seem adequate, but we might get a collision.
Deterministic
"""
sha.update(text.encode('utf-8'))
return sha.hexdigest()[0:n]
def random_slug(text, n):
"""This gives an almost-unique id based on the text,
2 hex digits would seem adequate, but we might get a collision.
Random
"""
text = text + alphabet_suffix(3)
sha.update(text.encode('utf-8'))
return sha.hexdigest()[0:n]
def alphabet_suffix(n):
"""This is called repeatedly during initial parsing import, hence the cached list
"""
global alphabet
if not alphabet:
alphabet = list(string.ascii_lowercase)
if n < len(alphabet) and n > 0:
suffix = alphabet[n-1]
else:
suffix = "_X_" + random.choice(string.ascii_lowercase) + random.choice(string.ascii_lowercase)
return suffix
def wrap_text(text):
"""
Splits a long string into multiple lines, ensuring that each line is no more than
70 characters long. Newline characters are inserted immediately before spaces to
prevent breaking words.
Parameters:
text (str): The input string that needs to be wrapped.
Returns:
str: The input string formatted with newline characters such that each line
does not exceed 70 characters in length.
Functionality:
1. The function first splits the input string into individual words.
2. It iterates through the words and adds them to the current line, ensuring the
line's length does not exceed 70 characters.
3. If adding a word would cause the line to exceed the limit, the current line is
added to a list of lines, and a new line is started with the word.
4. The process repeats until all words are processed, and the final line is added.
5. The list of lines is then joined with newline characters and returned as a
single formatted string.
This function written by CoPilot.
BUT interactions between existing \n characters makes this much less simple than it appears.
"""
return text # abort all wrap processing pending proper redesign
words = text.split(' ')
lines = []
current_line = ""
for word in words:
# Check if adding the word exceeds 70 characters
if len(current_line) + len(word) + 1 <= 70:
if current_line:
current_line += ' '
current_line += word
else:
lines.append(current_line.strip(' '))
current_line = word
# Add the last line
if current_line:
lines.append(current_line.strip(' '))
# Join the lines with newline characters
return '\n'.join(lines)
def make_new_expo(year):
coUniqueAttribs = {"year": year}
otherAttribs = {"name": f"CUCC expo {year}", "logbookfile": "logbook.html"}
e = Expedition.objects.create(**otherAttribs, **coUniqueAttribs)
u = User.objects.get(username='expo')
u.current_year = year
u.save()
def make_new_expo_dir(year):
pages = ["index", "logbook", "mission", "travel"]
t = "
"
for ff in pages:
t += f"{ff}
\n"
year_dir = Path(settings.EXPOWEB, 'years', year)
if not year_dir.is_dir():
year_dir.mkdir(parents=True, exist_ok=True)
for ff in pages:
content = f"{year} {ff}{ff}
Add content here.{t}"
p = Path(year_dir, ff+".html")
if not p.is_file():
write_and_commit( [(p, content, "utf8")], f"Auto new year {ff} file creation", "Auto New Year ")
def current_expo():
"""Returns the current expo year, but also checks if the most recent expo year is the same
as this year. If it is not, then it creates an empty Expedition and fixes some files and
folders. If we are more than one year out of date, it creates all intervening Expo objects
and folders. You will need to tidy this up manually.
"""
expos = Expedition.objects.all().order_by('-year')
if expos:
year = str(datetime.now(timezone.utc).year)
last_expo = expos[0].year
if int(last_expo) < int(year): # coming year, after Dec.31st
for y in range(int(last_expo)+1, int(year)+1):
#print(f"--!{year}---")
make_new_expo(str(y))
make_new_expo_dir(str(y))
# print(f"---{year}---")
return year
else:
return settings.EPOCH.year # this is 1970
def parse_aliases(aliasfile):
"""Reads a long text string containing pairs of strings:
(alias, target)
where the alias is an old name for a cave and the target is the current, valid name for the cave, e.g.
("2015-mf-06", "1623-288"),
returns a list of tuple pairs
May fail if there is more than one space after the comma separating strings
"""
report = [] # Messages about failed lines
aliases = []
filepath = Path(settings.EXPOWEB) / "cave_data" / aliasfile
if not filepath.is_file():
message = f' ** {filepath} is not a file.'
print(message)
return [(None, None)]
try:
with open(filepath, "r") as aliasfile:
for line in aliasfile:
l, sep, tail = line.partition('#')
l, sep, tail = l.partition(';')
l = l.strip().strip(',') # remove terminal comma if present
l = l.strip().strip('()')
l = l.replace("\"","")
l = l.replace("\'","")
l = l.replace(" ","") # removes all spaces
l = " ".join(l.split(',')) # subtle, splits on comma, joins with one space
if len(l) == 0:
# print(f"no parseable content: {line}")
continue
key, sep, target = l.partition(' ')
if len(key) == 0 or len(target) == 0:
message = f' ** Parsing failure for {line}'
print(message)
continue
# print(f"{key} => {target}")
aliases.append((key,target))
except:
message = f' ** Cannot open {filepath} for text file reading even though it is a file.'
print(message)
return [(None, None)], "Fail on file reading"
return aliases, report
def get_editor(request):
current_user = request.user
if is_identified_user(current_user):
return get_git_string(current_user)
else:
return get_cookie(request)
def is_identified_user(user):
if user.is_anonymous:
return False
if user.username in ["expo", "expoadmin"]:
return False
return True
def is_admin_user(user):
if user.is_anonymous:
return False
if user.username in ["expoadmin"]:
return True
if user.is_superuser: # set in parsers/users.py i.e. Wookey, Philip S.
return True
return False
def get_git_string(user):
if not is_identified_user(user):
return None
else:
people = Person.objects.filter(user=user)
if len(people) != 1:
# someone like "fluffy-bunny" not associated with a Person
return None
person = people[0]
return f"{person.fullname} <{user.email}>"
def shared_use_machine(request):
"""Looks for a cookie which only exists on shared use machines
"""
print(f" - shared use cookie check {request}")
if not request: # temporary while rolling out implementation to all calling functions
return False
if not (cookie_txt := request.COOKIES.get(PUBLIC_LAPTOP_COOKIE_NAME, "")):
return False
elif cookie_txt == PUBLIC_LAPTOP_COOKIE_TEXT:
print(f" - shared use cookie exists, and has expected value: '{cookie_txt}'")
return True
else:
print(f" - shared use cookie exists, but has wrong value: '{cookie_txt}' not '{PUBLIC_LAPTOP_COOKIE_TEXT}'")
return True
def get_cookie(request):
"""The initial idea of having a default turned out to be a bad idea as people just ignore the field.
If the default value is blank, then the form validation code makes the user type something in.
So having a blank is best if the cookie is to be displayed as the first value seen on a form.
But if the cookie is to be stored, then "Unset" may be better.
"""
# NO_COOKIE_DEFAULT = 'Unset Cookie '
# print(f"-- Getting cookie...")
editor_id = request.COOKIES.get('editor_id', "")
editor = git_string(editor_id) # belt and braces, should have been validity checked on saving already
# print(f"-- Cookie to be used: {editor=}")
return editor
def git_string(author_string):
"""Rewrites the supplied editor string into a git-complient author string
Uses a regular expression for a git-compatible author string written mostly by Copilot
valid example "John Doe "
"""
author_regex = re.compile(r'^[a-zA-Z][\w\s\_\.\-]* <[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-_]+\.[a-zA-Z]{2,}>$')
if author_string.startswith("Unset"):
# clean out laziness in users' PCs
return ""
if author_regex.match(author_string):
# print(f"++ Is valid git-compatible author string: '{author_string}'")
return author_string
else:
editor = author_string.replace("@","_at_")
editor = re.sub('[^0-9a-zA-Z_]+', '_', editor)
if editor.startswith("_"):
editor = "X" + editor
editor += f" <{editor}@potatohut.expo>"
print(f"++ Not git-compatible author string '{author_string}', replacing as '{editor}'")
return editor
def git_add(filename, cwd, commands=[]):
"""Add a file to the list of Staged files ready for a later git commit
"""
git = settings.GIT
# what is the purpose of this 'git diff' ? To prevent merge conflicts happening I guess,
# so we do not have to reverse a 'git add'
# print(f"git diff {filename} in {cwd}")
cmd_diff = [git, "diff", filename]
commands.append(cmd_diff)
cp_diff = subprocess.run(cmd_diff, cwd=cwd, capture_output=True, text=True)
if cp_diff.returncode != 0:
msgdata = f"Ask a nerd to fix this DIFF problem in git_add().\n--{cp_diff.stderr}\n--{cp_diff.stdout}\n--return code:{str(cp_diff.returncode)}"
raise WriteAndCommitError(
f"CANNOT git ADD on server for this file {filename}.\n\n" + msgdata
)
print(f"git add {filename} in {cwd}")
cmd_add = [git, "add", filename]
commands.append(cmd_add)
cp_add = subprocess.run(cmd_add, cwd=cwd, capture_output=True, text=True)
if cp_add.returncode != 0:
msgdata = f"Ask a nerd to fix this ADD problem in git_add().\n--{cp_add.stderr}\n--{cp_add.stdout}\n--return code:{str(cp_add.returncode)}"
raise WriteAndCommitError(
f"CANNOT git ADD on server for this file {filename}.\n\n" + msgdata
)
return commands
def git_commit(cwd, message, editor, commands=[]):
"""Commits whatever has been Staged by git in this directory 'cwd'
"""
git = settings.GIT
print(f"git commit in {cwd}")
if socket.gethostname() != EXPOSERVER:
message += f" - on dev machine '{socket.gethostname()}'"
elif settings.DEVSERVER:
message += " - on a dev machine using 'runserver'"
print(f"..{message=}\n..{editor=}")
cmd_commit = [git, "commit", "-m", message, "--author", f"{editor}"]
commands.append(cmd_commit)
cp_commit = subprocess.run(cmd_commit, cwd=cwd, capture_output=True, text=True)
# This produces return code = 1 if it commits OK, but when the local repo still needs to be pushed to origin/repo
# which will be the case when running a test troggle system on a development machine
# Several ways of testing if the commit failed
# if cp_commit.stdout.split("\n")[-2] != "nothing to commit, working tree clean":
# if cp_commit.returncode == 1 and cp_commit.stdout == DEV_OK: # only good for 1 commit ahead of origin/repo
if cp_commit.returncode != 0 and not cp_commit.stdout.strip().endswith(
"nothing to commit, working tree clean"
):
msgdata = f'--Ask a nerd to fix this problem in git_commit().\n--{cp_commit.stderr}\n--"{cp_commit.stdout}"\n--return code:{str(cp_commit.returncode)}'
print(msgdata)
raise WriteAndCommitError(
f"Error code with git on server in this directory: {cwd}. Edits saved, added to git, but NOT committed.\n\n"
+ msgdata
)
return commands
def add_commit(fname, message, editor):
"""Used to commit a survex file edited and saved in view/survex.py,
also contents.json for an edited wallet,
also encrypted.json for an edited user registration
"""
cwd = fname.parent
filename = fname.name
commands = []
if editor:
editor = git_string(editor)
else:
# 'cannot happen' as form verification has this is an obligatory field
editor = "Anathema Device "
try:
# print(f"add_commit: {editor=} {filename=} {cwd=}")
commands = git_add(filename, cwd, commands)
commands = git_commit(cwd, message, editor, commands)
except subprocess.SubprocessError:
msg = f"CANNOT git ADD or COMMIT on server for this file {filename}.\nSubprocess error: {commands}\nEdits probably not saved.\nAsk a nerd to fix this."
print(msg)
raise WriteAndCommitError(msg)
def write_binary_file(filepath, content):
print(f"write_binary_file: {filepath}")
write_files([(filepath, content, "")]) # null encoding does "wb"
def ensure_dir_exists(filepath):
"""Takes a filepath for a file and all the parent directiories,
makes any directories necessary to make the filepath valid
"""
if filepath.is_dir():
raise OSError(
f"CANNOT write this file {filepath} as this is an existing DIRECTORY."
)
try:
filepath.parent.mkdir(parents=True, exist_ok=True)
# os.makedirs(os.path.dirname(filepath), exist_ok = True)
except PermissionError as e:
raise PermissionError(
f"CANNOT make the directory.\nPERMISSIONS incorrectly set on server for this file {filepath}. Ask a nerd to fix this: {e}"
)
except Exception as e:
raise OSError(
f"CANNOT make the directory for {filepath}. Ask a nerd to fix this: {e}"
)
def write_files(files):
for filepath, content, encoding in files:
filename = filepath.name
ensure_dir_exists(filepath)
if encoding:
mode = "w"
kwargs = {"encoding": encoding}
else:
mode = "wb"
kwargs = {}
try:
with open(filepath, mode, **kwargs) as f:
# print(f"WRITING {filepath} ")
f.write(content)
except PermissionError as e:
raise PermissionError(
f"CANNOT save this file.\nPERMISSIONS incorrectly set on server for this file {filepath}. Ask a nerd to fix this: {e}"
)
except Exception as e:
raise OSError(
f"CANNOT write this file {filepath}. Ask a nerd to fix this: {e}"
)
return
def write_and_commit(files, message, editor):
"""For each file in files, it writes the content to the filepath
and adds and commits the file to git.
filepath, content, encoding = file
If this fails, a WriteAndCommitError is raised.
message - the "-m" comment field for the git commit
editor - the "--author" field for the git commit
"""
# GIT see also core/views/uploads.py dwgupload()
# GIT see also core/views/expo.py editexpopage()
git = settings.GIT
commands = []
editor = git_string(editor)
write_files(files)
try:
for filepath, content, encoding in files:
cwd = filepath.parent
filename = filepath.name
commands = git_add(filename, cwd, commands)
commands = git_commit(cwd, message, editor, commands)
except subprocess.SubprocessError:
raise WriteAndCommitError(
f"CANNOT git on server for this file {filename}. Subprocess error. Edits not saved.\nAsk a nerd to fix this."
)
return True
class WriteAndCommitError(Exception):
"""Exception class for errors writing files and comitting them to git"""
def __init__(self, message):
self.message = message
def __str__(self):
return f"WriteAndCommitError: {self.message}"
"""The following is a Bard converted version of Radosts's MIT copyrighted Javascript on 2023-10-27
with hand-editing.
>>> height_from_utm(326123.456, 4896789.012)
1234.56 (reference: 100m away)
"""
def height_from_utm(easting, northing):
"""Returns the height from UTM coordinates.
This does NO interpolation. It just return the height of the nearest point in the height data list"""
global rawheights
res = find_nearest_point(rawheights, (easting, northing))
winner, distance = res
#return f"{winner} (reference: {(distance).toFixed(0)}m away)"
return winner, distance
def find_nearest_point(points, target_point):
"""Returns the nearest point to a target point from a list of points.
TODO FIND OUT
1. is this SRTM data ? TICK. Yes.
2. what is the zero altitude datum? Geoid or ellisoid ? Do we need to subtract 47m ??
3. remove all these numbers from the .py file as it is confusing the code length calcs
In our dataset, the survey stations are all within 30m of an srtm reference point.
So we can safely ignore points more than 100m away in either x or y directions.
This is not true for the 1624 and 1627 areas though.
TO DO: store this list twice, once sorted by x and once sorted by y,
do a bounding box search for a set of nearby points before doing the exhaustive pythagorean distance search."""
nearest_distance_squared = float("inf")
nearest_point = None
x_target, y_target = target_point
max_ds = throw * throw # 1089 = 33 x 33
for point in points:
x, y, z = point
if z < 0:
distance_squared = 1000000.0 # ie ignore it
else:
if x - x_target > throw:
distance_squared = max_ds
elif y - y_target > throw:
distance_squared = max_ds
else:
distance_squared = math.pow(x - x_target, 2) + math.pow(y - y_target, 2)
if distance_squared < nearest_distance_squared:
nearest_distance_squared = distance_squared
nearest_point_alt = z
nearest_distance = math.sqrt(nearest_distance_squared)
return nearest_point_alt, nearest_distance
# This data was extarcted by Radost:
# The original source of heights (and positions is the file)
# loser/surface/terrain/SRTM-7x9-M31-482-279-offset.svx
"""
; Survex terrain data created by TerrainTool version 1.12a from Shuttle Radar Topography Mission data
; SRTM DEM data is public-domain.
; Used coordinate system AustrianM31 with ellipsoid "Bessel 1841" and datum "MGI Datum (Austria)"
; Grid centred at XXX 488000 283750
; Lat/Long of centre = 47.691318 13.838777 ( 13 E 50' 19.6" 47 N 41' 28.7") relative to WGS84/GRS80 datum
; Grid convergence at centre = -0.374 degrees ( 0 W 22' 27.9")
; Point scale factor at centre = 1.0000177
; Output offset by adding (-450000.00,-200000.00, 0.00) to calculated results;
; TerrainTool (c) 2008 - 2012 Mike McCombe
"""
# Load the data from the JSON file
hpath = settings.TROGGLE_PATH / "core" / "heights.json"
with open(hpath, 'r') as file:
rawheights = json.load(file)
# Now you can use rawheights as before
# print(rawheights)