Skip to content

Commit

Permalink
Merge pull request #1166 from codalab/develop
Browse files Browse the repository at this point in the history
Merge develop into master
  • Loading branch information
Didayolo authored Sep 22, 2023
2 parents 7e9ab22 + b728d06 commit a5e1976
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 52 deletions.
8 changes: 8 additions & 0 deletions .env_sample
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ AWS_STORAGE_PRIVATE_BUCKET_NAME=private
AWS_S3_ENDPOINT_URL=http://minio:9000/
AWS_QUERYSTRING_AUTH=False

# -----------------------------------------------------------------------------
# Limit for re-running submission
# This is used to limit users to rerun submissions
# on default queue when number of submissions are < RERUN_SUBMISSION_LIMIT
# -----------------------------------------------------------------------------
RERUN_SUBMISSION_LIMIT=30


# # S3 storage example
# STORAGE_TYPE=s3
# AWS_ACCESS_KEY_ID=12312312312312312331223
Expand Down
104 changes: 94 additions & 10 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
from kombu import Queue, Exchange
from urllib3 import Retry


logger = logging.getLogger()


# -----------------------------------------------
# Celery + Rabbit MQ
# -----------------------------------------------
# Init celery + rabbit queue definitions
app = Celery()
app.config_from_object('celery_config') # grabs celery_config.py
Expand All @@ -38,13 +41,20 @@
]


# -----------------------------------------------
# Directories
# -----------------------------------------------
# Setup base directories used by all submissions
# note: we need to pass this directory to docker-compose so it knows where to store things!
HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/")
BASE_DIR = "/codabench/" # base directory inside the container
CACHE_DIR = os.path.join(BASE_DIR, "cache")
MAX_CACHE_DIR_SIZE_GB = float(os.environ.get('MAX_CACHE_DIR_SIZE_GB', 10))


# -----------------------------------------------
# Submission status
# -----------------------------------------------
# Status options for submissions
STATUS_NONE = "None"
STATUS_SUBMITTING = "Submitting"
Expand All @@ -65,6 +75,10 @@
STATUS_FAILED,
)


# -----------------------------------------------
# Container Engine
# -----------------------------------------------
# Setup the container engine that we are using
if os.environ.get("CONTAINER_ENGINE_EXECUTABLE"):
CONTAINER_ENGINE_EXECUTABLE = os.environ.get("CONTAINER_ENGINE_EXECUTABLE")
Expand All @@ -75,9 +89,18 @@
CONTAINER_ENGINE_EXECUTABLE = "docker"


# -----------------------------------------------
# Exceptions
# -----------------------------------------------
class SubmissionException(Exception):
pass

class DockerImagePullException(Exception):
pass

class ExecutionTimeLimitExceeded(Exception):
pass


# -----------------------------------------------------------------------------
# The main compute worker entrypoint, this is how a job is ran at the highest
Expand All @@ -94,6 +117,8 @@ def run_wrapper(run_args):
if run.is_scoring:
run.push_scores()
run.push_output()
except DockerImagePullException as e:
run._update_status(STATUS_FAILED, str(e))
except SubmissionException as e:
run._update_status(STATUS_FAILED, str(e))
except SoftTimeLimitExceeded:
Expand Down Expand Up @@ -160,14 +185,14 @@ def is_valid_zip(zip_path):
return False


class ExecutionTimeLimitExceeded(Exception):
pass


def alarm_handler(signum, frame):
raise ExecutionTimeLimitExceeded


# -----------------------------------------------
# Class Run
# Respnosible for running a submission inside a docker container
# -----------------------------------------------
class Run:
"""A "Run" in Codalab is composed of some program, some data to work with, and some signed URLs to upload results
to. There is also a secret key to do special commands for just this submission.
Expand Down Expand Up @@ -340,8 +365,55 @@ def _get_container_image(self, image_name):
container_engine_pull = check_output(cmd)
logger.info("Pull complete for image: {0} with output of {1}".format(image_name, container_engine_pull))
except CalledProcessError:
logger.info("Pull for image: {} returned a non-zero exit code!")
raise SubmissionException(f"Pull for {image_name} failed!")
error_message = f"Pull for image: {image_name} returned a non-zero exit code! Check if the docker image exists on docker hub."
logger.info(error_message)
# Prepare data to be sent to submissions api
docker_pull_fail_data = {
"type": "Docker_Image_Pull_Fail",
"error_message": error_message,
}
# Send data to be written to ingestion logs
self._update_submission(docker_pull_fail_data)
# Send error through web socket to the frontend
asyncio.run(self._send_data_through_socket(error_message))
raise DockerImagePullException(f"Pull for {image_name} failed!")

async def _send_data_through_socket(self, error_message):
"""
This function gets an error messages and sends it through a web socket. This function is used for sending
- Docker image pull failure logs
- Execution time limit exceeded logs
"""
logger.info(f"Connecting to {self.websocket_url} to send docker image pull error")

# connect to web socket
websocket = await websockets.connect(self.websocket_url)

# define websocket errors
websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError)

try:
# send message
await websocket.send(json.dumps({
"kind": "stderr",
"message": error_message
}))

except websocket_errors:
# handle websocket errors
logger.info(f"Error sending failed through websocket")
try:
await websocket.close()
except Exception as e:
logger.error(e)
else:
# no error in websocket message sending
logger.info(f"Error sent successfully through websocket")

logger.info(f"Disconnecting from websocket {self.websocket_url}")

# close websocket
await websocket.close()

def _get_bundle(self, url, destination, cache=True):
"""Downloads zip from url and unzips into destination. If cache=True then url is hashed and checked
Expand Down Expand Up @@ -384,7 +456,7 @@ def _get_bundle(self, url, destination, cache=True):
raise # Re-raise the last caught BadZipFile exception
else:
logger.info("Failed. Retrying in 60 seconds...")
time.sleep(60) # Wait 60 seconds before retrying
time.sleep(60) # Wait 60 seconds before retrying
# Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it
return bundle_file

Expand Down Expand Up @@ -627,7 +699,7 @@ def _put_file(self, url, file=None, raw_data=None, content_type='application/zip
"""
if file and raw_data:
raise Exception("Cannot put both a file and raw_data")

headers = {
# For Azure only, other systems ignore these headers
'x-ms-blob-type': 'BlockBlob',
Expand Down Expand Up @@ -731,7 +803,19 @@ def start(self):
try:
loop.run_until_complete(gathered_tasks)
except ExecutionTimeLimitExceeded:
raise SubmissionException(f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds")
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
logger.info(error_message)
# Prepare data to be sent to submissions api
execution_time_limit_exceeded_data = {
"type": "Execution_Time_Limit_Exceeded",
"error_message": error_message,
"is_scoring": self.is_scoring
}
# Send data to be written to ingestion/scoring std_err
self._update_submission(execution_time_limit_exceeded_data)
# Send error through web socket to the frontend
asyncio.run(self._send_data_through_socket(error_message))
raise SubmissionException(error_message)
finally:
self.watch = False
for kind, logs in self.logs.items():
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ services:
# Rabbitmq & Flower monitoring tool
#-----------------------------------------------
rabbit:
image: rabbitmq:3.6-management
image: rabbitmq:management
# setting hostname here makes data persist properly between
# containers being destroyed..!
hostname: rabbit
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bleach==3.1.4
# Heroku staging debug tools
django-debug-toolbar==3.2
django-querycount==0.7.0
blessings==1.7

# User impersonation
django-su==0.9.0
Expand Down
1 change: 1 addition & 0 deletions src/apps/api/serializers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class Meta:
fields = (
'id',
'created_when',
'created_by',
'key',
'name',
'solutions',
Expand Down
54 changes: 48 additions & 6 deletions src/apps/api/views/competitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from api.permissions import IsOrganizerOrCollaborator
from datetime import datetime
from django.db import transaction
from django.conf import settings


class CompetitionViewSet(ModelViewSet):
Expand Down Expand Up @@ -614,15 +615,56 @@ def manually_migrate(self, request, pk):

@action(detail=True, url_name='rerun_submissions')
def rerun_submissions(self, request, pk):

phase = self.get_object()
comp = phase.competition
if request.user not in comp.all_organizers and not request.user.is_superuser:
raise PermissionDenied('You do not have permission to re-run submissions')

# Get submissions
submissions = phase.submissions.all()
for submission in submissions:
submission.re_run()
rerun_count = len(submissions)
return Response({"count": rerun_count})

can_re_run_submissions = False
error_message = ""

# Super admin can rerun without any restrictions
if request.user.is_superuser:
can_re_run_submissions = True

# competition admin can run only if
elif request.user in comp.all_organizers:

# submissions are in limit
if len(submissions) <= int(settings.RERUN_SUBMISSION_LIMIT):
can_re_run_submissions = True

# submissions are not in limit
else:
# Codabemch public queue
if comp.queue is None:
can_re_run_submissions = False
error_message = f"You cannot rerun more than {settings.RERUN_SUBMISSION_LIMIT} submissions on Codabench public queue! Contact us on `info@codalab.org` to request a rerun."

# Other queue where user is not owner and not organizer
elif request.user != comp.queue.owner and request.user not in comp.queue.organizers.all():
can_re_run_submissions = False
error_message = f"You cannot rerun more than {settings.RERUN_SUBMISSION_LIMIT} submissions on a queue which is not yours! Contact us on `info@codalab.org` to request a rerun."

# User can rerun submissions where he is owner or organizer
else:
can_re_run_submissions = True

else:
can_re_run_submissions = False
error_message = 'You do not have permission to re-run submissions'

# error when user is not super user or admin of the competition
if can_re_run_submissions:
# rerun all submissions
for submission in submissions:
submission.re_run()
rerun_count = len(submissions)
return Response({"count": rerun_count})
else:
raise PermissionDenied(error_message)

@swagger_auto_schema(responses={200: PhaseResultsSerializer})
@action(detail=True, methods=['GET'])
Expand Down
44 changes: 43 additions & 1 deletion src/apps/api/views/submissions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import uuid
import logging

from django.db.models import Q
from django_filters.rest_framework import DjangoFilterBackend
Expand All @@ -14,14 +15,17 @@
from rest_framework.settings import api_settings
from rest_framework.viewsets import ModelViewSet
from rest_framework_csv import renderers
from django.core.files.base import ContentFile

from profiles.models import Organization, Membership
from tasks.models import Task
from api.serializers.submissions import SubmissionCreationSerializer, SubmissionSerializer, SubmissionFilesSerializer
from competitions.models import Submission, Phase, CompetitionParticipant
from competitions.models import Submission, SubmissionDetails, Phase, CompetitionParticipant
from leaderboards.strategies import put_on_leaderboard_by_submission_rule
from leaderboards.models import SubmissionScore, Column, Leaderboard

logger = logging.getLogger()


class SubmissionViewSet(ModelViewSet):
queryset = Submission.objects.all().order_by('-pk')
Expand Down Expand Up @@ -50,6 +54,44 @@ def check_object_permissions(self, request, obj):
hostname = request.data['status_details'].replace('scoring_hostname-', '')
obj.scoring_worker_hostname = hostname
obj.save()

# check if type is in request data. type can have the following values
# - Docker_Image_Pull_Fail
# - Execution_Time_Limit_Exceeded
if "type" in self.request.data.keys():

if request.data["type"] in ["Docker_Image_Pull_Fail", "Execution_Time_Limit_Exceeded"]:

# Get the error message
error_message = request.data['error_message']

# Set file name to ingestion std error as default
error_file_name = "prediction_ingestion_stderr"

# Change error file name when error comes from execution time limit
# and error occured during scoring
if request.data["type"] == "Execution_Time_Limit_Exceeded" and request.data['is_scoring'] == "True":
error_file_name = "scoring_stderr"

try:
# Get submission detail for this submission
submission_detail = SubmissionDetails.objects.get(
name=error_file_name,
submission=obj,
)

# Read the existing content from the file
existing_content = submission_detail.data_file.read().decode("utf-8")

# Append the new error message to the existing content
modified_content = existing_content + "\n" + error_message

# write error message to the file
submission_detail.data_file.save(submission_detail.data_file.name, ContentFile(modified_content.encode("utf-8")))

except SubmissionDetails.DoesNotExist:
logger.warning("SubmissionDetails object not found.")

not_bot_user = self.request.user.is_authenticated and not self.request.user.is_bot

if self.action in ['update_fact_sheet', 're_run_submission']:
Expand Down
7 changes: 7 additions & 0 deletions src/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,10 @@
# Django-Su (User impersonation)
SU_LOGIN_CALLBACK = 'profiles.admin.su_login_callback'
AJAX_LOOKUP_CHANNELS = {'django_su': dict(model='profiles.User', search_field='username')}

# =============================================================================
# Limit for re-running submission
# This is used to limit users to rerun submissions
# on default queue when number of submissions are < RERUN_SUBMISSION_LIMIT
# =============================================================================
RERUN_SUBMISSION_LIMIT = os.environ.get('RERUN_SUBMISSION_LIMIT', 30)
Binary file added src/static/img/paper.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 6 additions & 1 deletion src/static/riot/competitions/detail/_tabs.tag
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@
<th>Phase</th>
<th>Task</th>
<th>Type</th>
<th>Available</th>
<th>Available <span class="ui mini circular icon button"
data-tooltip="Available for download to participants."
data-position="top center">
<i class="question icon"></i>
</span>
</th>
<th>Size</th>
</tr>
</thead>
Expand Down
Loading

0 comments on commit a5e1976

Please sign in to comment.