Skip to content

Commit

Permalink
Fix boolean setting input verification and show errors to user
Browse files Browse the repository at this point in the history
Replace all instances of `send_error` with `get_exception_page` so that HTTP error codes are sent as well, not just "oh look there is an error here"
Replace debug input verification prints with displaying issues to user (may be security issue though, needs more deep thought)
Fix boolean settings not having their constraints set and therefore failing input verification
  • Loading branch information
CoolCat467 committed Jun 3, 2024
1 parent 5fa87dc commit f6e9922
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions src/sanescansrv/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,17 @@ async def send_error(
)


async def get_exception_page(code: int, name: str, desc: str) -> tuple[AsyncIterator[str], int]:
async def get_exception_page(
code: int,
name: str,
desc: str,
return_link: str | None = None,
) -> tuple[AsyncIterator[str], int]:
"""Return Response for exception."""
resp_body = await send_error(
page_title=f"{code} {name}",
error_body=desc,
return_link=return_link,
)
return (resp_body, code)

Expand Down Expand Up @@ -291,6 +297,9 @@ def get_device_settings(device_addr: str) -> list[DeviceSetting]:
type_ = sane.TYPE_STR[option.type].removeprefix("TYPE_")
# print(f'{type_ = }')

if type_ == "BOOL":
constraints = [0, 1]

option_type = type_

default = "None"
Expand Down Expand Up @@ -474,7 +483,8 @@ async def scan_status_get() -> AsyncIterator[str] | WerkzeugResponse:
"""Handle scan status GET request."""
raw_status = APP_STORAGE.get("scan_status")
if raw_status is None:
return await send_error(
return await get_exception_page(
404, # not found
"No Scan Currently Running",
"There are no scan requests running currently. "
"Start one by pressing the `Scan!` button on the main page.",
Expand All @@ -486,7 +496,8 @@ async def scan_status_get() -> AsyncIterator[str] | WerkzeugResponse:
if status == ScanStatus.ERROR:
exception = data[0]
name = pretty_exception_name(exception)
return await send_error(
return await get_exception_page(
500, # internal server error
"Scan Error",
"The following error occurred attempting to process the scan "
f"request: {name!r} (See server console for more details).",
Expand Down Expand Up @@ -573,7 +584,8 @@ async def root_post() -> WerkzeugResponse | AsyncIterator[str]:
if raw_status is not None:
status, *_data = raw_status
if status not in {ScanStatus.ERROR, ScanStatus.DONE}:
return await send_error(
return await get_exception_page(
403, # forbidden
"Scan Already Currently Running",
"There is a scan request already running. Please wait for the previous scan to complete.",
return_link="/scan-status",
Expand Down Expand Up @@ -611,7 +623,8 @@ async def update_scanners_get() -> WerkzeugResponse | AsyncIterator[str]:
"""Update scanners get handling."""
success = await update_scanners_async()
if not success:
return await send_error(
return await get_exception_page(
403, # forbidden
"Scan Currently Running",
"There is a scan request currently running, updating the device list at this time might not be smart.",
return_link="/update_scanners",
Expand Down Expand Up @@ -764,36 +777,47 @@ async def settings_post() -> WerkzeugResponse:
if data.get("settings_update_submit_button"):
data.pop("settings_update_submit_button")

errors: list[str] = []

for setting_name, new_value in data.items():
# Input validation
if setting_name not in valid_settings:
print(f"{setting_name!r} not valid")
errors.append(f"{setting_name} not valid")
continue
idx = valid_settings[setting_name]
if not scanner_settings[idx].usable:
print(f"{setting_name!r} not usable")
errors.append(f"{setting_name} not usable")
continue
options = scanner_settings[idx].options
if isinstance(options, list) and str(new_value) not in set(map(str, options)):
print(f"{setting_name!r}[{new_value!r}] invalid option")
errors.append(f"{setting_name}[{new_value}] invalid option)")
continue
if isinstance(options, tuple):
if len(options) != 3:
raise RuntimeError("Should be unreachable")
try:
as_float = float(new_value)
except ValueError:
print(f"{setting_name!r}[{new_value!r}] invalid float")
errors.append(f"{setting_name}[{new_value}] invalid float")
continue
min_, max_, step = options
if as_float < min_ or as_float > max_:
print(f"{setting_name!r}[{new_value!r}] out of bounds")
errors.append(f"{setting_name}[{new_value}] out of bounds")
continue
if step and as_float % step != 0:
print(f"{setting_name!r}[{new_value!r}] bad step multiple")
errors.append(f"{setting_name}[{new_value}] bad step multiple")
continue
APP_STORAGE["device_settings"][device][idx].set = new_value

if errors:
errors.insert(0, "Request succeeded, but the following errors were encountered:")
return await get_exception_page(
400, # bad request
"Bad Request",
"<br>".join(errors),
request.url,
)

# Return to page for that scanner
return app.redirect(request.url)

Expand Down

0 comments on commit f6e9922

Please sign in to comment.