diff --git a/src/sanescansrv/elapsed.py b/src/sanescansrv/elapsed.py new file mode 100644 index 0000000..8ab066e --- /dev/null +++ b/src/sanescansrv/elapsed.py @@ -0,0 +1,138 @@ +"""Elapsed Time.""" + +# Programmed by CoolCat467 + +from __future__ import annotations + +# Elapsed Time +# Copyright (C) 2022-2024 CoolCat467 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Iterable + +__title__ = "Elapsed Time" +__author__ = "CoolCat467" +__license__ = "GNU General Public License Version 3" + + +def split_time(seconds: int) -> list[int]: + """Split time into units of time.""" + seconds = int(seconds) + + # values = (1, 60, 60, 24, 7, 365/12/7, 12, 10, 10, 10, 1000, 10, 10, 5) + # mults = {0:values[0]} + # for i in range(len(values)): + # mults[i+1] = round(mults[i] * values[i]) + # divs = list(reversed(mults.values()))[:-1] + divs = ( + 15768000000000000, + 3153600000000000, + 315360000000000, + 31536000000000, + 31536000000, + 3153600000, + 315360000, + 31536000, + 2628000, + 604800, + 86400, + 3600, + 60, + 1, + ) + ret = [] + for num in divs: + divisions, seconds = divmod(seconds, num) + ret.append(divisions) + return ret + + +def combine_end(data: Iterable[str], final: str = "and") -> str: + """Join values of text, and have final with the last one properly.""" + data = list(map(str, data)) + if len(data) >= 2: + data[-1] = f"{final} {data[-1]}" + if len(data) > 2: + return ", ".join(data) + return " ".join(data) + + +def get_elapsed(seconds: int) -> str: + """Return elapsed time as a string.""" + times = ( + "eons", + "eras", + "epochs", + "ages", + "millennia", + "centuries", + "decades", + "years", + "months", + "weeks", + "days", + "hours", + "minutes", + "seconds", + ) + single = [i[:-1] for i in times] + single[4] = "millennium" + single[5] = "century" + + negative_flag = seconds < 0 + if negative_flag: + seconds = -seconds + + split = split_time(seconds) + zip_index = [(i, v) for i, v in enumerate(split) if v] + + data = [] + for index, value in zip_index: + title = single[index] if value < 2 else times[index] + data.append(f"{value} {title}") + + if negative_flag: + data[0] = "Negative " + data[0] + return combine_end(data) + + +def split_end(data: str, final: str = "and") -> list[str]: + """Split a combine_end joined string.""" + values = data.split(", ") + values.extend(values.pop().split(final, 1)) + return [v.strip() for v in values if v] + + +def get_time_of_day(hour: int, season: int = 0) -> str: + """Figure out and return what time of day it is. + + If season is -1, it is winter and afternoon is 12 PM to 4 PM + If season is 0, season is unknown and afternoon is 12 PM to 6 PM + If season is 1, it is summer and afternoon is 12 PM to 8 PM + """ + season_offset = season << 1 # quick multiply by 2 + + if hour > 4 and hour < 12: + return "Morning" + if hour > 11 and hour < (19 + season_offset): + # "Afternoon is usually from 12 PM to 6 PM, + # but during winter it may be from 12 PM to 4 PM + # and during summer it may be from 12 PM to 8 PM." + return "Afternoon" + if hour > (18 + season_offset) and hour < 22: + return "Evening" + return "Night" diff --git a/src/sanescansrv/generate_pages.py b/src/sanescansrv/generate_pages.py index 55156c2..fe9c4b9 100644 --- a/src/sanescansrv/generate_pages.py +++ b/src/sanescansrv/generate_pages.py @@ -414,8 +414,9 @@ def generate_scan_status_get() -> str: percent_complete = htmlgen.wrap_tag("strong", f"{percent}%", block=False) estimate_strong = htmlgen.wrap_tag("strong", estimated_wait, block=False) - estimate_plural = htmlgen.jinja_number_plural("estimated_wait", "second") - estimate = f"{estimate_strong} {estimate_plural}" + ##estimate_plural = htmlgen.jinja_number_plural("estimated_wait", "second") + ##estimate = f"{estimate_strong} {estimate_plural}" + estimate = estimate_strong refresh_link = htmlgen.create_link("/scan-status", "this link") diff --git a/src/sanescansrv/htmlgen.py b/src/sanescansrv/htmlgen.py index 24aa63f..5ad87ad 100644 --- a/src/sanescansrv/htmlgen.py +++ b/src/sanescansrv/htmlgen.py @@ -209,6 +209,7 @@ def input_field( "id": field_id, "name": field_name, } + pre_label = field_type in {"number"} if args["type"] == "text": # Browser defaults to text del args["type"] @@ -224,6 +225,8 @@ def input_field( lines.append(tag("input", **args)) if field_title is not None: lines.append(wrap_tag("label", field_title, False, for_=field_id)) + if pre_label: + return "\n".join(reversed(lines)) return "\n".join(lines) diff --git a/src/sanescansrv/server.py b/src/sanescansrv/server.py index a01a30d..1c45c4f 100644 --- a/src/sanescansrv/server.py +++ b/src/sanescansrv/server.py @@ -20,7 +20,7 @@ __title__ = "Sane Scanner Webserver" __author__ = "CoolCat467" -__version__ = "3.0.0" +__version__ = "3.1.0" __license__ = "GPLv3" @@ -50,7 +50,7 @@ from quart_trio import QuartTrio from werkzeug.exceptions import HTTPException -from sanescansrv import htmlgen, logger +from sanescansrv import elapsed, htmlgen, logger from sanescansrv.logger import log if sys.version_info < (3, 11): @@ -228,26 +228,19 @@ def get_devices() -> dict[str, str]: return devices -class OptionType(IntEnum): - """Option Types for Device Settings.""" - - RADIO = auto() - CHECK = auto() - RANGE_DROPDOWN = auto() - - @dataclass class DeviceSetting: """Setting for device.""" name: str title: str - options: list[str] + options: list[str | int] | tuple[int | float, int | float, int | float] default: str unit: str desc: str + option_type: str set: str | None = None - option_type: OptionType = OptionType.RADIO + usable: bool = True def as_argument(self) -> str: """Return setting as argument.""" @@ -275,45 +268,30 @@ def get_device_settings(device_addr: str) -> list[DeviceSetting]: # print(f"\n{result = }") if not result[1]: continue + option = sane.Option(result, device) + + usable = True if not option.is_settable(): # print("> Not settable") - continue + usable = False + + if not option.is_active(): + usable = False + + # Disable button control items for now (greyed out) + if usable and "button" in option.name: + usable = False - constraints: list[str] = [] + constraints: list[str | int] | tuple[int | float, int | float, int | float] = [] + if option.constraint is not None: + constraints = option.constraint + if isinstance(option.constraint, tuple) and len(option.constraint) != 3: + usable = False type_ = sane.TYPE_STR[option.type].removeprefix("TYPE_") # print(f'{type_ = }') - option_type = OptionType.RADIO - - if type_ not in {"INT", "STRING", "BOOL"}: - # print(f"type {type_!r} is invalid") - continue - if type_ == "BOOL": - constraints = ["1", "0"] - elif isinstance(option.constraint, tuple): - if isinstance(option.constraint[0], float): - # print("> Float constraint") - continue - if option.constraint[2] == 0: - # print("> Zero constraint") - continue - range_ = range(*option.constraint) - if len(range_) <= 5: - constraints = [str(i) for i in range_] - else: - constraints = [str(x) for x in option.constraint] - option_type = OptionType.RANGE_DROPDOWN - # TODO: Make range constraint work - continue - elif option.constraint is None: - # print("> None constraint") - continue - else: - constraints = [str(x) for x in option.constraint] - # if len(constraints) < 2: - # print("> Less than two constraints") - # continue + option_type = type_ default = "None" with contextlib.suppress(AttributeError, ValueError): @@ -331,6 +309,7 @@ def get_device_settings(device_addr: str) -> list[DeviceSetting]: unit=unit, desc=option.desc, option_type=option_type, + usable=usable, ), ) @@ -356,18 +335,34 @@ def preform_scan( filepath = Path(app.static_folder) / filename ints = {"TYPE_BOOL", "TYPE_INT"} + float_ = "TYPE_FIXED" with sane.open(device_name) as device: for setting in APP_STORAGE["device_settings"][device_name]: name = setting.name.replace("-", "_") if setting.set is None: continue - value: str | int = setting.set - if sane.TYPE_STR[device[name].type] in ints: + value: str | int | float = setting.set + if sane.TYPE_STR[device[name].type] == float_: + assert isinstance(value, str), f"{value = } {type(value) = }" + try: + value = float(value) + except ValueError: + continue + elif sane.TYPE_STR[device[name].type] in ints: assert isinstance(value, str), f"{value = } {type(value) = }" if value.isdigit(): value = int(value) - setattr(device, name, value) + try: + setattr(device, name, value) + except (AttributeError, TypeError) as exc: + print(f"\n{name} = {value}") + # traceback.print_exception changed in 3.10 + if sys.version_info < (3, 10): + tb = sys.exc_info()[2] + traceback.print_exception(etype=None, value=exc, tb=tb) + else: + traceback.print_exception(exc) with device.scan(progress) as image: # bounds = image.getbbox() image.save(filepath, out_type) @@ -388,6 +383,7 @@ class ScanStatus(IntEnum): STARTED = auto() IN_PROGRESS = auto() DONE = auto() + ERROR = auto() def fake_preform_scan( @@ -410,7 +406,7 @@ async def preform_scan_async( device_name: str, out_type: str, task_status: trio.TaskStatus[Any] = trio.TASK_STATUS_IGNORED, -) -> str: +) -> str | None: """Scan using device and return path.""" if out_type not in {"pnm", "tiff", "png", "jpeg"}: raise ValueError("Output type must be pnm, tiff, png, or jpeg") @@ -433,13 +429,27 @@ def progress(current: int, total: int) -> None: APP_STORAGE["scan_status"] = (ScanStatus.STARTED,) task_status.started() last_time = time.perf_counter_ns() - filename = await trio.to_thread.run_sync( - preform_scan, # fake_preform_scan, - device_name, - out_type, - progress, - thread_name="preform_scan_async", - ) + try: + filename = await trio.to_thread.run_sync( + preform_scan, # fake_preform_scan, + device_name, + out_type, + progress, + thread_name="preform_scan_async", + ) + except SaneError as exc: + # traceback.print_exception changed in 3.10 + if sys.version_info < (3, 10): + tb = sys.exc_info()[2] + traceback.print_exception(etype=None, value=exc, tb=tb) + else: + traceback.print_exception(exc) + + APP_STORAGE["scan_status"] = ( + ScanStatus.ERROR, + exc, + ) + return None ##except SaneError as ex: ## if "Invalid argument" in ex.args: APP_STORAGE["scan_status"] = ( @@ -464,6 +474,15 @@ async def scan_status_get() -> AsyncIterator[str] | WerkzeugResponse: status, *data = raw_status + if status == ScanStatus.ERROR: + exception = data[0] + name = pretty_exception_name(exception) + return await send_error( + "Scan Error", + "The following error occurred attempting to process the scan " + f"request: {name!r} (See server console for more details).", + ) + if status == ScanStatus.DONE: filename = data[0] return app.redirect(f"/{filename}") @@ -471,7 +490,7 @@ async def scan_status_get() -> AsyncIterator[str] | WerkzeugResponse: progress: ScanProgress | None = None time_deltas_ns: list[int] | None = None delay = 5 - estimated_wait: int = 9999 + estimated_wait: int = 120 if status == ScanStatus.IN_PROGRESS: progress, time_deltas_ns = data @@ -492,7 +511,7 @@ async def scan_status_get() -> AsyncIterator[str] | WerkzeugResponse: "scan-status_get.html.jinja", just_started=status == ScanStatus.STARTED, progress=progress, - estimated_wait=estimated_wait, + estimated_wait=elapsed.get_elapsed(estimated_wait), refreshes_after=delay, ) @@ -537,7 +556,7 @@ async def root_post() -> WerkzeugResponse | AsyncIterator[str]: if raw_status is not None: status, *_data = raw_status - if status != ScanStatus.DONE: + if status not in {ScanStatus.ERROR, ScanStatus.DONE}: return await send_error( "Scan Already Currently Running", "There is a scan request already running. Please wait for the previous scan to complete.", @@ -600,34 +619,79 @@ async def scanners_get() -> AsyncIterator[str]: def get_setting_radio(setting: DeviceSetting) -> str: """Return setting radio section.""" + box_title = f"{setting.title} - {setting.desc}" + default = setting.default if setting.set is None else setting.set options: Mapping[str, str | dict[str, str]] = {} - if setting.option_type == OptionType.RADIO: - options = {x.title(): x for x in setting.options} - if set(options.keys()) == {"1", "0"}: - options = {"True": "1", "False": "0"} - if len(options) == 1 or "button" in setting.name: - # TODO: Table one? - for title, value in tuple(options.items()): - assert isinstance(value, str) + + if setting.option_type == "BOOL": + options = {"True": "1", "False": "0"} + elif setting.option_type == "STRING": + options = {f"{x}".title(): f"{x}" for x in setting.options} + elif setting.option_type in {"INT", "FIXED"}: + if isinstance(setting.options, list): + options = {x: x for x in (f"{x}" for x in setting.options)} + elif isinstance(setting.options, tuple): + attributes: dict[str, str] = {"type": "number", "value": f"{default}"} + extra = "" + if len(setting.options) != 3: + response_html = htmlgen.wrap_tag( # type: ignore[unreachable] + "p", + "Numerical range constraints are invalid, please report!", + block=False, + ) + return htmlgen.contain_in_box(response_html, box_title) + min_, max_, step = setting.options + attributes.update( + { + "min": f"{min_}", + "max": f"{max_}", + }, + ) + extra = f", Min {min_}, Max {max_}" + if step != 0: + attributes["step"] = f"{step}" + if step != 1: + extra += f", Step {step}" + elif setting.option_type == "FIXED": + attributes["step"] = "any" + extra += ", w/ decimal support" + options = {f"Value ({setting.unit}{extra})": attributes} + else: + response_html = htmlgen.wrap_tag( + "p", + f"No options exist for {setting.option_type!r} option types at this time.", + block=False, + ) + return htmlgen.contain_in_box(response_html, box_title) + ##else: + ## formatted = pprint.pformat(setting) + ## formatted = formatted.replace(" ", " ") + ## response_html = htmlgen.wrap_tag( + ## "textarea", + ## formatted, + ## readonly="", + ## rows=len(formatted.splitlines()), + ## cols=80, + ## ) + ## return htmlgen.contain_in_box(response_html, f"{setting.title} - {setting.desc}") + + if not setting.usable: + for title, value in tuple(options.items()): + if isinstance(value, str): options[title] = { "value": value, "disabled": "disabled", } - elif setting.option_type == OptionType.RANGE_DROPDOWN: - range_control = list(setting.options) - while len(range_control) != 3: - range_control.append("1") - min_, max_, step = range_control - options = {"Number": {"type": "number", "min": min_, "max": max_, "step": step, "value": default}} - else: - raise NotImplementedError(f"{setting.option_type = }") + else: + assert isinstance(options[title], dict) + options[title].update({"disabled": "disabled"}) # type: ignore[union-attr] return htmlgen.select_box( submit_name=setting.name, options=options, default=default, - box_title=f"{setting.title} - {setting.desc}", + box_title=box_title, ) @@ -670,8 +734,23 @@ async def settings_post() -> WerkzeugResponse: if setting_name not in valid_settings: continue idx = valid_settings[setting_name] - if new_value not in scanner_settings[idx].options: + if not scanner_settings[idx].usable: + continue + options = scanner_settings[idx].options + if isinstance(options, list) and new_value not in options: continue + if isinstance(options, tuple): + if len(options) != 3: + raise RuntimeError("Should be unreachable") + try: + as_float = float(new_value) + except ValueError: + continue + min_, max_, step = options + if as_float < min_ or as_float > max_: + continue + if step and as_float % step != 0: + continue APP_STORAGE["device_settings"][device][idx].set = new_value # Return to page for that scanner diff --git a/src/sanescansrv/templates/error_page.html.jinja b/src/sanescansrv/templates/error_page.html.jinja index 461b21a..f8fc155 100644 --- a/src/sanescansrv/templates/error_page.html.jinja +++ b/src/sanescansrv/templates/error_page.html.jinja @@ -24,7 +24,7 @@ diff --git a/src/sanescansrv/templates/root_get.html.jinja b/src/sanescansrv/templates/root_get.html.jinja index 05d49df..7bcedd7 100644 --- a/src/sanescansrv/templates/root_get.html.jinja +++ b/src/sanescansrv/templates/root_get.html.jinja @@ -55,7 +55,7 @@ diff --git a/src/sanescansrv/templates/scan-status_get.html.jinja b/src/sanescansrv/templates/scan-status_get.html.jinja index cf0b80a..f728b2e 100644 --- a/src/sanescansrv/templates/scan-status_get.html.jinja +++ b/src/sanescansrv/templates/scan-status_get.html.jinja @@ -12,7 +12,7 @@

{% if just_started %}Just Started Scanning{% else %}Scan Is In Progress{% endif %}

{% if just_started %}Just Started.{% elif progress[0] == progress[1] %}Just finished, saving file...{% else %}{{ (progress[0] / progress[1] * 100)|round(2) }}% Complete{% endif %}

-

Scan is estimated to be done in {{ estimated_wait }} second{% if estimated_wait > 1 %}s{% elif estimated_wait == 0 %}s{% endif %}.

+

Scan is estimated to be done in {{ estimated_wait }}.


This page will automatically refresh after {{ refreshes_after }} second{% if refreshes_after > 1 %}s{% elif refreshes_after == 0 %}s{% endif %}. @@ -21,7 +21,7 @@ diff --git a/src/sanescansrv/templates/scanners_get.html.jinja b/src/sanescansrv/templates/scanners_get.html.jinja index 4d74270..c4a0454 100644 --- a/src/sanescansrv/templates/scanners_get.html.jinja +++ b/src/sanescansrv/templates/scanners_get.html.jinja @@ -33,7 +33,7 @@ diff --git a/src/sanescansrv/templates/settings_get.html.jinja b/src/sanescansrv/templates/settings_get.html.jinja index f6df18e..9f2fca0 100644 --- a/src/sanescansrv/templates/settings_get.html.jinja +++ b/src/sanescansrv/templates/settings_get.html.jinja @@ -26,7 +26,7 @@