-
Notifications
You must be signed in to change notification settings - Fork 26
/
__init__.py
237 lines (163 loc) · 6.97 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# Copyright (C) 2020 Istituto Italiano di Tecnologia (IIT). All rights reserved.
# This software may be modified and distributed under the terms of the
# GNU Lesser General Public License v2.1 or any later version.
import os
import sys
from enum import Enum, auto
from pathlib import Path
import packaging.specifiers
import packaging.version
def supported_versions_specifier_set() -> packaging.specifiers.SpecifierSet:
# If 6 is the Ignition distribution major version, the following specifier enables
# the compatibility with all the following versions:
#
# 6.Y.Z.devK
# 6.Y.Z.alphaK
# 6.Y.Z.betaK
# 6.Y.Z.rcK
# 6.Y.Z.preK
# 6.Y.Z.postK
#
return packaging.specifiers.SpecifierSet(">=6.0.0.pre,<7.0.0.dev")
class InstallMode(Enum):
User = auto()
CondaBuild = auto()
Developer = auto()
def detect_install_mode() -> InstallMode:
import scenario.bindings.core
install_prefix = scenario.bindings.core.get_install_prefix()
# In conda, there are null bytes terminating the returned string
install_prefix = install_prefix.replace("\x00", "")
if "$PREFIX" in install_prefix:
return InstallMode.CondaBuild
if install_prefix == "":
return InstallMode.User
else:
return InstallMode.Developer
def setup_gazebo_environment() -> None:
import scenario.bindings.core
# Configure the environment
ign_gazebo_system_plugin_path = ""
if "IGN_GAZEBO_SYSTEM_PLUGIN_PATH" in os.environ:
ign_gazebo_system_plugin_path = os.environ.get("IGN_GAZEBO_SYSTEM_PLUGIN_PATH")
# Exporting this env variable is done by the conda "libscenario" package
if detect_install_mode() is InstallMode.CondaBuild:
return
# Add the plugins path
if detect_install_mode() is InstallMode.Developer:
install_prefix = scenario.bindings.core.get_install_prefix()
# In conda, there are null bytes terminating the returned string
install_prefix = Path(install_prefix.replace("\x00", ""))
elif detect_install_mode() is InstallMode.User:
install_prefix = Path(os.path.dirname(__file__))
else:
raise ValueError(detect_install_mode())
plugin_dir = install_prefix / "lib" / "scenario" / "plugins"
ign_gazebo_system_plugin_path += f":{str(plugin_dir)}"
os.environ["IGN_GAZEBO_SYSTEM_PLUGIN_PATH"] = ign_gazebo_system_plugin_path
def preload_tensorflow_shared_libraries() -> None:
# Check if tensorflow is installed
import importlib.util
spec = importlib.util.find_spec("tensorflow")
if spec is None:
return
# Get the tensorflow __init__ location
import pathlib
init = pathlib.Path(spec.origin)
# Get the tensorflow top-level folder
tensorflow_dir = init.parent
assert tensorflow_dir.is_dir()
# Get the tensorflow/python folder
tensorflow_python_dir = tensorflow_dir / "python"
assert tensorflow_python_dir.is_dir()
# Load the main shared library
for lib in tensorflow_dir.glob("*tensorflow*.so*"):
import ctypes
ctypes.CDLL(str(lib))
# Load all the shared libraries inside tensorflow/python
for lib in tensorflow_python_dir.glob("_*.so"):
import ctypes
ctypes.CDLL(str(lib))
def pre_import_gym() -> None:
# Check if gym is installed
import importlib.util
spec = importlib.util.find_spec("gym")
if spec is None:
return
import gym
def check_gazebo_installation() -> None:
import subprocess
try:
command = ["ign", "gazebo", "--versions"]
result = subprocess.run(command, capture_output=True, text=True, check=True)
except FileNotFoundError:
msg = "Failed to find the 'ign' command in your PATH. "
msg += "Make sure that Ignition is installed "
msg += "and your environment is properly configured."
raise RuntimeError(msg)
except subprocess.CalledProcessError:
raise RuntimeError(f"Failed to execute command: {' '.join(command)}") # noqa
# Strip the command output
gazebo_versions_string = result.stdout.strip()
# Get the gazebo version from the command line.
# Since the releases could be in the "6.0.0~preK" form, we replace '~' with '.' to
# be compatible with the 'packaging' package.
gazebo_version_string_normalized = gazebo_versions_string.replace("~", ".")
# The output could be multiline, listing all the Ignition Gazebo versions found
gazebo_versions = gazebo_version_string_normalized.split(sep=os.linesep)
try:
# Parse the gazebo versions
gazebo_versions_parsed = [packaging.version.Version(v) for v in gazebo_versions]
except:
raise RuntimeError(
f"Failed to parse the output of: {' '.join(command)} ({gazebo_versions})"
)
for version in gazebo_versions_parsed:
if version in supported_versions_specifier_set():
return
msg = f"Failed to find Ignition Gazebo {supported_versions_specifier_set()} "
msg += f"(found incompatible version(s): {gazebo_versions_parsed})"
raise RuntimeError(msg)
def import_gazebo() -> None:
# Check the the module was never loaded by someone else
if "scenario.bindings._gazebo" in sys.modules:
raise ImportError("Failed to load ScenarIO Gazebo with custom dlopen flags")
# Preload the shared libraries of tensorflow if the package is installed.
# If tensorflow is imported after scenario.bindings.gazebo, the application segfaults.
if os.environ.get("SCENARIO_DISABLE_TENSORFLOW_PRELOAD") != "1":
preload_tensorflow_shared_libraries()
# Import gym before scenario.bindings.gazebo. Similarly to tensorflow, also gym
# includes a module that imports protobuf, producing a similar segfault.
if os.environ.get("SCENARIO_DISABLE_GYM_PREIMPORT") != "1":
pre_import_gym()
# Import SWIG bindings
# See https://github.com/robotology/gym-ignition/issues/7
# https://stackoverflow.com/a/45473441/12150968
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
# Update the dlopen flags
dlopen_flags = sys.getdlopenflags()
sys.setdlopenflags(dlopen_flags | os.RTLD_GLOBAL)
import scenario.bindings.gazebo
# Restore the flags
sys.setdlopenflags(dlopen_flags)
else:
import scenario.bindings.gazebo
def create_home_dot_folder() -> None:
# Make sure that the dot folder in the user's home exists
Path("~/.ignition/gazebo").expanduser().mkdir(
mode=0o755, parents=True, exist_ok=True
)
# ===================
# Import the bindings
# ===================
# Find the _gazebo.* shared lib
if len(list((Path(__file__).parent / "bindings").glob(pattern="_gazebo.*"))) == 1:
check_gazebo_installation()
import_gazebo()
create_home_dot_folder()
setup_gazebo_environment()
from .bindings import gazebo
# Find the _yarp.* shared lib
if len(list((Path(__file__).parent / "bindings").glob(pattern="_yarp.*"))) == 1:
from .bindings.yarp import yarp
from .bindings import core