diff --git a/.env_sample b/.env_sample index ce6916ef1..84a0888dc 100644 --- a/.env_sample +++ b/.env_sample @@ -59,6 +59,14 @@ AWS_STORAGE_PRIVATE_BUCKET_NAME=private AWS_S3_ENDPOINT_URL=http://minio:9000/ AWS_QUERYSTRING_AUTH=False +# ----------------------------------------------------------------------------- +# Limit for re-running submission +# This is used to limit users to rerun submissions +# on default queue when number of submissions are < RERUN_SUBMISSION_LIMIT +# ----------------------------------------------------------------------------- +RERUN_SUBMISSION_LIMIT=30 + + # # S3 storage example # STORAGE_TYPE=s3 # AWS_ACCESS_KEY_ID=12312312312312312331223 diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 875051090..5df5995eb 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -26,9 +26,12 @@ from kombu import Queue, Exchange from urllib3 import Retry - logger = logging.getLogger() + +# ----------------------------------------------- +# Celery + Rabbit MQ +# ----------------------------------------------- # Init celery + rabbit queue definitions app = Celery() app.config_from_object('celery_config') # grabs celery_config.py @@ -38,6 +41,9 @@ ] +# ----------------------------------------------- +# Directories +# ----------------------------------------------- # Setup base directories used by all submissions # note: we need to pass this directory to docker-compose so it knows where to store things! HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/") @@ -45,6 +51,10 @@ CACHE_DIR = os.path.join(BASE_DIR, "cache") MAX_CACHE_DIR_SIZE_GB = float(os.environ.get('MAX_CACHE_DIR_SIZE_GB', 10)) + +# ----------------------------------------------- +# Submission status +# ----------------------------------------------- # Status options for submissions STATUS_NONE = "None" STATUS_SUBMITTING = "Submitting" @@ -65,6 +75,10 @@ STATUS_FAILED, ) + +# ----------------------------------------------- +# Container Engine +# ----------------------------------------------- # Setup the container engine that we are using if os.environ.get("CONTAINER_ENGINE_EXECUTABLE"): CONTAINER_ENGINE_EXECUTABLE = os.environ.get("CONTAINER_ENGINE_EXECUTABLE") @@ -75,9 +89,18 @@ CONTAINER_ENGINE_EXECUTABLE = "docker" +# ----------------------------------------------- +# Exceptions +# ----------------------------------------------- class SubmissionException(Exception): pass +class DockerImagePullException(Exception): + pass + +class ExecutionTimeLimitExceeded(Exception): + pass + # ----------------------------------------------------------------------------- # The main compute worker entrypoint, this is how a job is ran at the highest @@ -94,6 +117,8 @@ def run_wrapper(run_args): if run.is_scoring: run.push_scores() run.push_output() + except DockerImagePullException as e: + run._update_status(STATUS_FAILED, str(e)) except SubmissionException as e: run._update_status(STATUS_FAILED, str(e)) except SoftTimeLimitExceeded: @@ -160,14 +185,14 @@ def is_valid_zip(zip_path): return False -class ExecutionTimeLimitExceeded(Exception): - pass - - def alarm_handler(signum, frame): raise ExecutionTimeLimitExceeded +# ----------------------------------------------- +# Class Run +# Respnosible for running a submission inside a docker container +# ----------------------------------------------- class Run: """A "Run" in Codalab is composed of some program, some data to work with, and some signed URLs to upload results to. There is also a secret key to do special commands for just this submission. @@ -340,8 +365,55 @@ def _get_container_image(self, image_name): container_engine_pull = check_output(cmd) logger.info("Pull complete for image: {0} with output of {1}".format(image_name, container_engine_pull)) except CalledProcessError: - logger.info("Pull for image: {} returned a non-zero exit code!") - raise SubmissionException(f"Pull for {image_name} failed!") + error_message = f"Pull for image: {image_name} returned a non-zero exit code! Check if the docker image exists on docker hub." + logger.info(error_message) + # Prepare data to be sent to submissions api + docker_pull_fail_data = { + "type": "Docker_Image_Pull_Fail", + "error_message": error_message, + } + # Send data to be written to ingestion logs + self._update_submission(docker_pull_fail_data) + # Send error through web socket to the frontend + asyncio.run(self._send_data_through_socket(error_message)) + raise DockerImagePullException(f"Pull for {image_name} failed!") + + async def _send_data_through_socket(self, error_message): + """ + This function gets an error messages and sends it through a web socket. This function is used for sending + - Docker image pull failure logs + - Execution time limit exceeded logs + """ + logger.info(f"Connecting to {self.websocket_url} to send docker image pull error") + + # connect to web socket + websocket = await websockets.connect(self.websocket_url) + + # define websocket errors + websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError) + + try: + # send message + await websocket.send(json.dumps({ + "kind": "stderr", + "message": error_message + })) + + except websocket_errors: + # handle websocket errors + logger.info(f"Error sending failed through websocket") + try: + await websocket.close() + except Exception as e: + logger.error(e) + else: + # no error in websocket message sending + logger.info(f"Error sent successfully through websocket") + + logger.info(f"Disconnecting from websocket {self.websocket_url}") + + # close websocket + await websocket.close() def _get_bundle(self, url, destination, cache=True): """Downloads zip from url and unzips into destination. If cache=True then url is hashed and checked @@ -384,7 +456,7 @@ def _get_bundle(self, url, destination, cache=True): raise # Re-raise the last caught BadZipFile exception else: logger.info("Failed. Retrying in 60 seconds...") - time.sleep(60) # Wait 60 seconds before retrying + time.sleep(60) # Wait 60 seconds before retrying # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file @@ -627,7 +699,7 @@ def _put_file(self, url, file=None, raw_data=None, content_type='application/zip """ if file and raw_data: raise Exception("Cannot put both a file and raw_data") - + headers = { # For Azure only, other systems ignore these headers 'x-ms-blob-type': 'BlockBlob', @@ -731,7 +803,19 @@ def start(self): try: loop.run_until_complete(gathered_tasks) except ExecutionTimeLimitExceeded: - raise SubmissionException(f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds") + error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" + logger.info(error_message) + # Prepare data to be sent to submissions api + execution_time_limit_exceeded_data = { + "type": "Execution_Time_Limit_Exceeded", + "error_message": error_message, + "is_scoring": self.is_scoring + } + # Send data to be written to ingestion/scoring std_err + self._update_submission(execution_time_limit_exceeded_data) + # Send error through web socket to the frontend + asyncio.run(self._send_data_through_socket(error_message)) + raise SubmissionException(error_message) finally: self.watch = False for kind, logs in self.logs.items(): diff --git a/docker-compose.yml b/docker-compose.yml index e4e9e8180..22311bb68 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -126,7 +126,7 @@ services: # Rabbitmq & Flower monitoring tool #----------------------------------------------- rabbit: - image: rabbitmq:3.6-management + image: rabbitmq:management # setting hostname here makes data persist properly between # containers being destroyed..! hostname: rabbit diff --git a/requirements.txt b/requirements.txt index 5a14473b8..291f4d483 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,7 @@ bleach==3.1.4 # Heroku staging debug tools django-debug-toolbar==3.2 django-querycount==0.7.0 +blessings==1.7 # User impersonation django-su==0.9.0 diff --git a/src/apps/api/serializers/tasks.py b/src/apps/api/serializers/tasks.py index b98ce36ea..1526a1980 100644 --- a/src/apps/api/serializers/tasks.py +++ b/src/apps/api/serializers/tasks.py @@ -138,6 +138,7 @@ class Meta: fields = ( 'id', 'created_when', + 'created_by', 'key', 'name', 'solutions', diff --git a/src/apps/api/views/competitions.py b/src/apps/api/views/competitions.py index d58c057e3..fa34b4d55 100644 --- a/src/apps/api/views/competitions.py +++ b/src/apps/api/views/competitions.py @@ -38,6 +38,7 @@ from api.permissions import IsOrganizerOrCollaborator from datetime import datetime from django.db import transaction +from django.conf import settings class CompetitionViewSet(ModelViewSet): @@ -614,15 +615,56 @@ def manually_migrate(self, request, pk): @action(detail=True, url_name='rerun_submissions') def rerun_submissions(self, request, pk): + phase = self.get_object() comp = phase.competition - if request.user not in comp.all_organizers and not request.user.is_superuser: - raise PermissionDenied('You do not have permission to re-run submissions') + + # Get submissions submissions = phase.submissions.all() - for submission in submissions: - submission.re_run() - rerun_count = len(submissions) - return Response({"count": rerun_count}) + + can_re_run_submissions = False + error_message = "" + + # Super admin can rerun without any restrictions + if request.user.is_superuser: + can_re_run_submissions = True + + # competition admin can run only if + elif request.user in comp.all_organizers: + + # submissions are in limit + if len(submissions) <= int(settings.RERUN_SUBMISSION_LIMIT): + can_re_run_submissions = True + + # submissions are not in limit + else: + # Codabemch public queue + if comp.queue is None: + can_re_run_submissions = False + error_message = f"You cannot rerun more than {settings.RERUN_SUBMISSION_LIMIT} submissions on Codabench public queue! Contact us on `info@codalab.org` to request a rerun." + + # Other queue where user is not owner and not organizer + elif request.user != comp.queue.owner and request.user not in comp.queue.organizers.all(): + can_re_run_submissions = False + error_message = f"You cannot rerun more than {settings.RERUN_SUBMISSION_LIMIT} submissions on a queue which is not yours! Contact us on `info@codalab.org` to request a rerun." + + # User can rerun submissions where he is owner or organizer + else: + can_re_run_submissions = True + + else: + can_re_run_submissions = False + error_message = 'You do not have permission to re-run submissions' + + # error when user is not super user or admin of the competition + if can_re_run_submissions: + # rerun all submissions + for submission in submissions: + submission.re_run() + rerun_count = len(submissions) + return Response({"count": rerun_count}) + else: + raise PermissionDenied(error_message) @swagger_auto_schema(responses={200: PhaseResultsSerializer}) @action(detail=True, methods=['GET']) diff --git a/src/apps/api/views/submissions.py b/src/apps/api/views/submissions.py index d4d82fba2..6ce9da84b 100644 --- a/src/apps/api/views/submissions.py +++ b/src/apps/api/views/submissions.py @@ -1,5 +1,6 @@ import json import uuid +import logging from django.db.models import Q from django_filters.rest_framework import DjangoFilterBackend @@ -14,14 +15,17 @@ from rest_framework.settings import api_settings from rest_framework.viewsets import ModelViewSet from rest_framework_csv import renderers +from django.core.files.base import ContentFile from profiles.models import Organization, Membership from tasks.models import Task from api.serializers.submissions import SubmissionCreationSerializer, SubmissionSerializer, SubmissionFilesSerializer -from competitions.models import Submission, Phase, CompetitionParticipant +from competitions.models import Submission, SubmissionDetails, Phase, CompetitionParticipant from leaderboards.strategies import put_on_leaderboard_by_submission_rule from leaderboards.models import SubmissionScore, Column, Leaderboard +logger = logging.getLogger() + class SubmissionViewSet(ModelViewSet): queryset = Submission.objects.all().order_by('-pk') @@ -50,6 +54,44 @@ def check_object_permissions(self, request, obj): hostname = request.data['status_details'].replace('scoring_hostname-', '') obj.scoring_worker_hostname = hostname obj.save() + + # check if type is in request data. type can have the following values + # - Docker_Image_Pull_Fail + # - Execution_Time_Limit_Exceeded + if "type" in self.request.data.keys(): + + if request.data["type"] in ["Docker_Image_Pull_Fail", "Execution_Time_Limit_Exceeded"]: + + # Get the error message + error_message = request.data['error_message'] + + # Set file name to ingestion std error as default + error_file_name = "prediction_ingestion_stderr" + + # Change error file name when error comes from execution time limit + # and error occured during scoring + if request.data["type"] == "Execution_Time_Limit_Exceeded" and request.data['is_scoring'] == "True": + error_file_name = "scoring_stderr" + + try: + # Get submission detail for this submission + submission_detail = SubmissionDetails.objects.get( + name=error_file_name, + submission=obj, + ) + + # Read the existing content from the file + existing_content = submission_detail.data_file.read().decode("utf-8") + + # Append the new error message to the existing content + modified_content = existing_content + "\n" + error_message + + # write error message to the file + submission_detail.data_file.save(submission_detail.data_file.name, ContentFile(modified_content.encode("utf-8"))) + + except SubmissionDetails.DoesNotExist: + logger.warning("SubmissionDetails object not found.") + not_bot_user = self.request.user.is_authenticated and not self.request.user.is_bot if self.action in ['update_fact_sheet', 're_run_submission']: diff --git a/src/settings/base.py b/src/settings/base.py index 0c756334d..62d3871e8 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -450,3 +450,10 @@ # Django-Su (User impersonation) SU_LOGIN_CALLBACK = 'profiles.admin.su_login_callback' AJAX_LOOKUP_CHANNELS = {'django_su': dict(model='profiles.User', search_field='username')} + +# ============================================================================= +# Limit for re-running submission +# This is used to limit users to rerun submissions +# on default queue when number of submissions are < RERUN_SUBMISSION_LIMIT +# ============================================================================= +RERUN_SUBMISSION_LIMIT = os.environ.get('RERUN_SUBMISSION_LIMIT', 30) diff --git a/src/static/img/paper.png b/src/static/img/paper.png new file mode 100644 index 000000000..8cf0231d8 Binary files /dev/null and b/src/static/img/paper.png differ diff --git a/src/static/riot/competitions/detail/_tabs.tag b/src/static/riot/competitions/detail/_tabs.tag index dccee3d94..74ba34972 100644 --- a/src/static/riot/competitions/detail/_tabs.tag +++ b/src/static/riot/competitions/detail/_tabs.tag @@ -60,7 +60,12 @@ Phase Task Type - Available + Available + + + Size diff --git a/src/static/riot/competitions/detail/submission_manager.tag b/src/static/riot/competitions/detail/submission_manager.tag index 057cd089b..7b9aa43d7 100644 --- a/src/static/riot/competitions/detail/submission_manager.tag +++ b/src/static/riot/competitions/detail/submission_manager.tag @@ -284,6 +284,9 @@ toastr.success(`Rerunning ${response.count} submissions`) self.update_submissions() }) + .fail(function (response) { + toastr.error(response.responseJSON.detail) + }) } } self.filter = function () { diff --git a/src/static/riot/competitions/editor/_phases.tag b/src/static/riot/competitions/editor/_phases.tag index 748c2609f..002bdf7a5 100644 --- a/src/static/riot/competitions/editor/_phases.tag +++ b/src/static/riot/competitions/editor/_phases.tag @@ -389,6 +389,25 @@ $(self.refs.modal).modal('hide') } + self.formatDateToYYYYMMDD = function(input) { + // This function formats date in the format YYYY-MM-DD + + // convert input to date + var dateObject = new Date(input) + + // check if date has a time + if (!isNaN(dateObject.getTime())) { + // Extract year + var year = dateObject.getFullYear() + // Extract Month + var month = (dateObject.getMonth() + 1).toString().padStart(2, '0') + // Extract day + var day = dateObject.getDate().toString().padStart(2, '0') + return `${year}-${month}-${day}` + } + return input + } + self.form_updated = function () { // This checks phases overall to make sure they are ready to go var is_valid = true @@ -406,9 +425,8 @@ }) _.forEach(_.range(self.phases.length), i => { if (i !== 0) { - let end = Date.parse(self.phases[i - 1].end) - let start = Date.parse(self.phases[i].start) - + let end = Date.parse(self.formatDateToYYYYMMDD(self.phases[i - 1].end)) + let start = Date.parse(self.formatDateToYYYYMMDD(self.phases[i].start)) if (end > start || !end) { let message = `Phase "${_.get(self.phases[i], 'name', i + 1)}" must start after phase "${_.get(self.phases[i - 1], 'name', i)}" ends` if (!self.warnings.includes(message)) { diff --git a/src/static/riot/competitions/public-list.tag b/src/static/riot/competitions/public-list.tag index 3b51ddbc9..8a3e09683 100644 --- a/src/static/riot/competitions/public-list.tag +++ b/src/static/riot/competitions/public-list.tag @@ -8,30 +8,33 @@
-
-
-

- {competition.title} -

-

- { pretty_description(competition.description)} -

-

- Organized by: {competition.created_by} -

-
+
+
+

+ {competition.title} +

+

+ { pretty_description(competition.description)} +

+

+ Organized by: {competition.created_by} +

+
+
{pretty_date(competition.created_when)} -
-
+
+
+ + +
{competition.participant_count} Participants
-
@@ -142,7 +145,10 @@ margin auto .link-no-deco + all unset text-decoration none + cursor pointer + width 100% .tile-wrapper border solid 1px gainsboro diff --git a/src/static/riot/competitions/tile/competition_tile.tag b/src/static/riot/competitions/tile/competition_tile.tag index 5d2dbbf7b..5c4fdcb7e 100644 --- a/src/static/riot/competitions/tile/competition_tile.tag +++ b/src/static/riot/competitions/tile/competition_tile.tag @@ -1,9 +1,10 @@ - +
+

{title} @@ -15,14 +16,17 @@ Organized by: {created_by}

-
+ +
{pretty_date(created_when)} -
-
+
+
+ + +
{participant_count} Participants
-