-
Notifications
You must be signed in to change notification settings - Fork 139
/
_docker_interface.py
303 lines (265 loc) · 13 KB
/
_docker_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
"""
(not asyncio) Interface to Docker
"""
import os
from datetime import datetime
from typing import List, Tuple, Dict
import docker
import logging
from docker.types import Ulimit
from inginious.agent.docker_agent._docker_runtime import DockerRuntime
DOCKER_AGENT_VERSION = 3
class DockerInterface(object): # pragma: no cover
"""
(not asyncio) Interface to Docker
We do not test coverage here, as it is a bit complicated to interact with docker in tests.
Docker-py itself is already well tested.
"""
@property
def _docker(self):
return docker.from_env()
def get_containers(self, runtimes: List[DockerRuntime]) -> Dict[str, Dict[str, Dict[str, str]]]:
"""
:param runtimes: a list of DockerRuntime. Each DockerRuntime.envtype must appear only once.
:return: a dict of available containers in the form
{
"envtype": { # the value of DockerRuntime.envtype. Eg "docker".
"name": { # for example, "default"
"id": "container img id", # "sha256:715c5cb5575cdb2641956e42af4a53e69edf763ce701006b2c6e0f4f39b68dd3"
"created": 12345678, # create date
"ports": [22, 434], # list of ports needed
"runtime": "runtime" # the value of DockerRuntime.runtime. Eg "runc".
}
}
}
"""
assert len(set(x.envtype for x in runtimes)) == len(runtimes) # no duplicates in the envtypes
logger = logging.getLogger("inginious.agent.docker")
# First, create a dict with {"env": {"id": {"title": "alias", "created": 000, "ports": [0, 1]}}}
images = {x.envtype: {} for x in runtimes}
for x in self._docker.images.list(filters={"label": "org.inginious.grading.name"}):
title = None
try:
title = x.labels["org.inginious.grading.name"]
created = x.history()[0]['Created']
ports = [int(y) for y in x.labels["org.inginious.grading.ports"].split(
",")] if "org.inginious.grading.ports" in x.labels else []
for docker_runtime in runtimes:
if "org.inginious.grading.need_root" in x.labels and not docker_runtime.run_as_root:
continue
if "org.inginious.grading.need_gpu" in x.labels and not docker_runtime.enables_gpu:
continue
logger.info("Envtype %s (%s) can use container %s", docker_runtime.envtype, docker_runtime.runtime, title)
if x.labels.get("org.inginious.grading.agent_version") != str(DOCKER_AGENT_VERSION):
logger.warning(
"Container %s is made for an old/newer version of the agent (container version is "
"%s, but it should be %i). INGInious will ignore the container.", title,
str(x.labels.get("org.inginious.grading.agent_version")), DOCKER_AGENT_VERSION)
continue
images[docker_runtime.envtype][x.attrs['Id']] = {
"title": title,
"created": created,
"ports": ports,
"runtime": docker_runtime.runtime
}
except:
logging.getLogger("inginious.agent").exception("Container %s is badly formatted", title or "[cannot load title]")
# Then, we keep only the last version of each name
latest = {}
for envtype, content in images.items():
latest[envtype] = {}
for img_id, img_c in content.items():
if img_c["title"] not in latest[envtype] or latest[envtype][img_c["title"]]["created"] < img_c["created"]:
latest[envtype][img_c["title"]] = {"id": img_id, **img_c}
return latest
def get_host_ip(self, env_with_dig='ingi/inginious-c-default'):
"""
Get the external IP of the host of the docker daemon. Uses OpenDNS internally.
:param env_with_dig: any container image that has dig
"""
try:
container = self._docker.containers.create(env_with_dig, command="dig +short myip.opendns.com @resolver1.opendns.com")
container.start()
response = container.wait()
assert response["StatusCode"] == 0 if isinstance(response, dict) else response == 0
answer = container.logs(stdout=True, stderr=False).decode('utf8').strip()
container.remove(v=True, link=False, force=True)
return answer
except:
return None
def create_container(self, image, network_grading, mem_limit, task_path, sockets_path,
course_common_path, course_common_student_path, fd_limit, runtime: str, ports=None):
"""
Creates a container.
:param image: env to start (name/id of a docker image)
:param network_grading: boolean to indicate if the network should be enabled in the container or not
:param mem_limit: in Mo
:param task_path: path to the task directory that will be mounted in the container
:param sockets_path: path to the socket directory that will be mounted in the container
:param course_common_path:
:param course_common_student_path:
:param fd_limit: Tuple with soft and hard limits per slot for FS
:param runtime: name of the docker runtime to use
:param ports: dictionary in the form {docker_port: external_port}
:return: the container id
"""
task_path = os.path.abspath(task_path)
sockets_path = os.path.abspath(sockets_path)
course_common_path = os.path.abspath(course_common_path)
course_common_student_path = os.path.abspath(course_common_student_path)
if ports is None:
ports = {}
nofile_limit = Ulimit(name='nofile', soft=fd_limit[0], hard=fd_limit[1])
response = self._docker.containers.create(
image,
stdin_open=True,
mem_limit=str(mem_limit) + "M",
memswap_limit=str(mem_limit) + "M",
mem_swappiness=0,
oom_kill_disable=True,
network_mode=("bridge" if (network_grading or len(ports) > 0) else 'none'),
ports=ports,
volumes={
task_path: {'bind': '/task'},
sockets_path: {'bind': '/sockets'},
course_common_path: {'bind': '/course/common', 'mode': 'ro'},
course_common_student_path: {'bind': '/course/common/student', 'mode': 'ro'}
},
runtime=runtime,
ulimits=[nofile_limit]
)
return response.id
def create_container_student(self, runtime: str, image: str, mem_limit, student_path,
socket_path, systemfiles_path, course_common_student_path,
parent_runtime: str,fd_limit, share_network_of_container: str=None, ports=None):
"""
Creates a student container
:param fd_limit:Tuple with soft and hard limits per slot for FS
:param runtime: name of the docker runtime to use
:param image: env to start (name/id of a docker image)
:param mem_limit: in Mo
:param student_path: path to the task directory that will be mounted in the container
:param socket_path: path to the socket that will be mounted in the container
:param systemfiles_path: path to the systemfiles folder containing files that can override partially some defined system files
:param course_common_student_path:
:param share_network_of_container: (deprecated) if a container id is given, the new container will share its
network stack.
:param ports: dictionary in the form {docker_port: external_port}
:return: the container id
"""
student_path = os.path.abspath(student_path)
socket_path = os.path.abspath(socket_path)
systemfiles_path = os.path.abspath(systemfiles_path)
course_common_student_path = os.path.abspath(course_common_student_path)
secured_scripts_path = student_path+"/scripts"
if ports is None:
ports = {}
if len(ports) > 0:
net_mode = "bridge" # TODO: better to use "bridge" or "container:" + grading_container_id ?
elif not share_network_of_container:
net_mode = "none"
else:
net_mode = 'container:' + share_network_of_container
nofile_limit = Ulimit(name='nofile', soft=fd_limit[0], hard=fd_limit[1])
response = self._docker.containers.create(
image,
stdin_open=True,
command="_run_student_intern "+runtime + " " + parent_runtime, # the script takes the runtimes as arguments
mem_limit=str(mem_limit) + "M",
memswap_limit=str(mem_limit) + "M",
mem_swappiness=0,
oom_kill_disable=True,
network_mode=net_mode,
ports=ports,
volumes={
student_path: {'bind': '/task/student'},
secured_scripts_path: {'bind': '/task/student/scripts'},
socket_path: {'bind': '/__parent.sock'},
systemfiles_path: {'bind': '/task/systemfiles', 'mode': 'ro'},
course_common_student_path: {'bind': '/course/common/student', 'mode': 'ro'}
},
runtime=runtime,
ulimits=[nofile_limit]
)
return response.id
def start_container(self, container_id):
""" Starts a container (obviously) """
self._docker.containers.get(container_id).start()
def attach_to_container(self, container_id):
""" A socket attached to the stdin/stdout of a container. The object returned contains a get_socket() function to get a socket.socket
object and close_socket() to close the connection """
sock = self._docker.containers.get(container_id).attach_socket(params={
'stdin': 1,
'stdout': 1,
'stderr': 0,
'stream': 1,
})
# fix a problem with docker-py; we must keep a reference of sock at every time
return FixDockerSocket(sock)
def get_logs(self, container_id):
""" Return the full stdout/stderr of a container"""
stdout = self._docker.containers.get(container_id).logs(stdout=True, stderr=False).decode('utf8')
stderr = self._docker.containers.get(container_id).logs(stdout=False, stderr=True).decode('utf8')
return stdout, stderr
def get_stats(self, container_id):
"""
:param container_id:
:return: an iterable that contains dictionnaries with the stats of the running container. See the docker api for content.
"""
return self._docker.containers.get(container_id).stats(decode=True)
def list_running_containers(self):
""" Returns a set of running container ids """
return {x.attrs.get('Id') for x in self._docker.containers.list(all=False, sparse=True)}
def remove_container(self, container_id):
"""
Removes a container (with fire)
"""
self._docker.containers.get(container_id).remove(v=True, link=False, force=True)
def kill_container(self, container_id, signal=None):
"""
Kills a container
:param signal: custom signal. Default is SIGKILL.
"""
self._docker.containers.get(container_id).kill(signal)
def event_stream(self, filters=None, since=None):
"""
:param filters: filters to apply on messages. See docker api.
:param since: time since when the events should be sent. See docker api.
:return: an iterable that contains events from docker. See the docker api for content.
"""
if filters is None:
filters = {}
return self._docker.events(decode=True, filters=filters, since=since)
def list_runtimes(self) -> Dict[str, str]:
"""
:return: dict of runtime: path_to_runtime
"""
return {name: x["path"] for name, x in self._docker.info()["Runtimes"].items()}
class FixDockerSocket(): # pragma: no cover
"""
Fix the API inconsistency of docker-py with attach_socket
"""
def __init__(self, docker_py_sock):
self.docker_py_sock = docker_py_sock
def get_socket(self):
"""
Returns a valid socket.socket object
"""
try:
return self.docker_py_sock._sock # pylint: disable=protected-access
except AttributeError:
return self.docker_py_sock
def close_socket(self):
"""
Correctly closes the socket
:return:
"""
try:
self.docker_py_sock._sock.close() # pylint: disable=protected-access
except AttributeError:
pass
self.docker_py_sock.close()