Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[microTVM] Clean-up test_crt.py and add to pylint #13886

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/lint/pylint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set -euxo pipefail

python3 -m pylint python/tvm --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint vta/python/vta --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/unittest/test_crt.py --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/unittest/test_tvmscript_type.py --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/contrib/test_cmsisnn --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/contrib/test_ethosn --rcfile="$(dirname "$0")"/pylintrc
Expand Down
167 changes: 71 additions & 96 deletions tests/python/unittest/test_crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
# specific language governing permissions and limitations
# under the License.

import numpy as np
import os
"""Test C runtime"""

import pathlib
import shutil
import pytest

pytest.importorskip("pty")

import pytest
import numpy as np

import tvm
import tvm.relay
Expand All @@ -32,6 +29,8 @@
from tvm.relay.backend import Runtime
from tvm.relay.backend import Executor

pytest.importorskip("pty")

BUILD = True
DEBUG = False

Expand All @@ -57,38 +56,30 @@ def _make_session(temp_dir, mod):


def _make_add_sess(temp_dir):
A = tvm.te.placeholder((2,), dtype="int8")
B = tvm.te.placeholder((1,), dtype="int8")
C = tvm.te.compute(A.shape, lambda i: A[i] + B[0], name="C")
sched = tvm.te.create_schedule(C.op)
return _make_sess_from_op(temp_dir, "add", sched, [A, B, C])


def _make_ident_sess(temp_dir):
A = tvm.te.placeholder((2,), dtype="int8")
B = tvm.te.compute(A.shape, lambda i: A[i], name="B")
sched = tvm.te.create_schedule(B.op)
return _make_sess_from_op(temp_dir, "ident", sched, [A, B])
a = tvm.te.placeholder((2,), dtype="int8")
b = tvm.te.placeholder((1,), dtype="int8")
c = tvm.te.compute(a.shape, lambda i: a[i] + b[0], name="c")
sched = tvm.te.create_schedule(c.op)
return _make_sess_from_op(temp_dir, "add", sched, [a, b, c])


@tvm.testing.requires_micro
def test_compile_runtime():
"""Test compiling the on-device runtime."""
import tvm.micro

temp_dir = tvm.contrib.utils.tempdir()

with _make_add_sess(temp_dir) as sess:
A_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device)
assert (A_data.numpy() == np.array([2, 3])).all()
B_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device)
assert (B_data.numpy() == np.array([4])).all()
C_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device)
assert (C_data.numpy() == np.array([0, 0])).all()
a_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device)
assert (a_data.numpy() == np.array([2, 3])).all()
b_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device)
assert (b_data.numpy() == np.array([4])).all()
c_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device)
assert (c_data.numpy() == np.array([0, 0])).all()

system_lib = sess.get_system_lib()
system_lib.get_function("add")(A_data, B_data, C_data)
assert (C_data.numpy() == np.array([6, 7])).all()
system_lib.get_function("add")(a_data, b_data, c_data)
assert (c_data.numpy() == np.array([6, 7])).all()


@tvm.testing.requires_micro
Expand All @@ -111,8 +102,6 @@ def test_compile_runtime_llvm():
@tvm.testing.requires_micro
def test_reset():
"""Test when the remote end resets during a session."""
import tvm.micro
from tvm.micro import transport

temp_dir = tvm.contrib.utils.tempdir()

Expand All @@ -128,10 +117,7 @@ def test_reset():
def test_graph_executor():
"""Test use of the graph executor with microTVM."""

ws_root = pathlib.Path(os.path.dirname(__file__) + "/micro-workspace")
if ws_root.exists():
shutil.rmtree(ws_root)
temp_dir = tvm.contrib.utils.tempdir(ws_root.resolve())
temp_dir = tvm.contrib.utils.tempdir()
relay_mod = tvm.parser.fromtext(
"""
#[version = "0.0.5"]
Expand All @@ -147,15 +133,15 @@ def @main(%a : Tensor[(1, 2), uint8], %b : Tensor[(1, 2), uint8]) {

def do_test(graph_mod):

A_data = tvm.nd.array(np.array([2, 3], dtype="uint8"), device=sess.device)
assert (A_data.numpy() == np.array([2, 3])).all()
B_data = tvm.nd.array(np.array([4, 7], dtype="uint8"), device=sess.device)
assert (B_data.numpy() == np.array([4, 7])).all()
a_data = tvm.nd.array(np.array([2, 3], dtype="uint8"), device=sess.device)
assert (a_data.numpy() == np.array([2, 3])).all()
b_data = tvm.nd.array(np.array([4, 7], dtype="uint8"), device=sess.device)
assert (b_data.numpy() == np.array([4, 7])).all()

assert graph_mod.get_input_index("a") == 0
assert graph_mod.get_input_index("b") == 1

graph_mod.run(a=A_data, b=B_data)
graph_mod.run(a=a_data, b=b_data)

out = graph_mod.get_output(0)
assert (out.numpy() == np.array([6, 10])).all()
Expand All @@ -179,10 +165,7 @@ def do_test(graph_mod):
def test_aot_executor():
"""Test use of the AOT executor with microTVM."""

ws_root = pathlib.Path(os.path.dirname(__file__) + "/micro-workspace")
if ws_root.exists():
shutil.rmtree(ws_root)
temp_dir = tvm.contrib.utils.tempdir(ws_root.resolve())
temp_dir = tvm.contrib.utils.tempdir()
relay_mod = tvm.parser.fromtext(
"""
#[version = "0.0.5"]
Expand Down Expand Up @@ -210,20 +193,20 @@ def do_test():
assert aot_executor.get_num_inputs() == 2
assert aot_executor.get_num_outputs() == 1

A_np = np.array([[2, 3]], dtype="uint8")
B_np = np.array([[4, 7]], dtype="uint8")
a_np = np.array([[2, 3]], dtype="uint8")
b_np = np.array([[4, 7]], dtype="uint8")

A_data = aot_executor.get_input("a").copyfrom(A_np)
B_data = aot_executor.get_input("b").copyfrom(B_np)
aot_executor.get_input("a").copyfrom(a_np)
b_data = aot_executor.get_input("b").copyfrom(b_np)

aot_executor.run()

out = aot_executor.get_output(0)
assert (out.numpy() == np.array([6, 10])).all()

B_np_new = np.array([[5, 8]])
aot_executor.set_input("b", B_np_new)
assert (B_data.numpy() == B_np_new).all()
b_np_new = np.array([[5, 8]])
aot_executor.set_input("b", b_np_new)
assert (b_data.numpy() == b_np_new).all()

with _make_session(temp_dir, factory) as sess:
do_test()
Expand All @@ -233,10 +216,7 @@ def do_test():
def test_aot_executor_usmp_const_pool():
"""Test the AOT executor with microTVM using USMP to generate a constant data pool."""

ws_root = pathlib.Path(os.path.dirname(__file__) + "/micro-workspace-usmp")
if ws_root.exists():
shutil.rmtree(ws_root)
temp_dir = tvm.contrib.utils.tempdir(ws_root.resolve())
temp_dir = tvm.contrib.utils.tempdir()
relay_mod = tvm.parser.fromtext(
"""
#[version = "0.0.5"]
Expand All @@ -251,9 +231,8 @@ def @main(%a : Tensor[(1, 2), uint8], %b : Tensor[(1, 2), uint8], %c : Tensor[(1
executor = Executor("aot")
main_func = relay_mod["main"]
type_dict = {p.name_hint: p.checked_type.dtype for p in main_func.params}
B_np = np.array([[4, 7]], dtype="uint8").astype(type_dict["b"])
C_np = np.array([[8, 9]], dtype="uint8").astype(type_dict["c"])
params = {"c": C_np}
c_np = np.array([[8, 9]], dtype="uint8").astype(type_dict["c"])
params = {"c": c_np}
with tvm.transform.PassContext(
opt_level=3, config={"tir.disable_vectorize": True, "tir.usmp.enable": True}
):
Expand All @@ -272,28 +251,28 @@ def do_test():
sess.get_system_lib(), sess.device, "default"
)
)
except tvm._ffi.base.TVMError as e:
raise e
except tvm._ffi.base.TVMError as excpt:
raise excpt

assert aot_executor.get_input_index("a") == 0
assert aot_executor.get_input_index("b") == 1

assert aot_executor.get_num_inputs() == 2
assert aot_executor.get_num_outputs() == 1

A_np = np.array([[2, 3]], dtype="uint8")
B_np = np.array([[4, 7]], dtype="uint8")
a_np = np.array([[2, 3]], dtype="uint8")
b_np = np.array([[4, 7]], dtype="uint8")

A_data = aot_executor.get_input("a").copyfrom(A_np)
B_data = aot_executor.get_input("b").copyfrom(B_np)
a_data = aot_executor.get_input("a").copyfrom(a_np)
b_data = aot_executor.get_input("b").copyfrom(b_np)
aot_executor.run()

out = aot_executor.get_output(0)
assert (out.numpy() == np.array([14, 19])).all()

B_np_new = np.array([[5, 8]])
aot_executor.set_input("b", B_np_new)
assert (B_data.numpy() == B_np_new).all()
b_np_new = np.array([[5, 8]])
aot_executor.set_input("b", b_np_new)
assert (b_data.numpy() == b_np_new).all()

with _make_session(temp_dir, factory) as sess:
do_test()
Expand All @@ -302,78 +281,74 @@ def do_test():
@tvm.testing.requires_micro
def test_std_math_functions():
"""Verify that standard math functions can be used."""
import tvm.micro

temp_dir = tvm.contrib.utils.tempdir()

with _make_add_sess(temp_dir) as sess:
A_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device)
assert (A_data.numpy() == np.array([2, 3])).all()
B_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device)
assert (B_data.numpy() == np.array([4])).all()
C_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device)
assert (C_data.numpy() == np.array([0, 0])).all()
a_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device)
assert (a_data.numpy() == np.array([2, 3])).all()
b_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device)
assert (b_data.numpy() == np.array([4])).all()
c_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device)
assert (c_data.numpy() == np.array([0, 0])).all()

system_lib = sess.get_system_lib()
system_lib.get_function("add")(A_data, B_data, C_data)
system_lib.get_function("add")(a_data, b_data, c_data)

temp_dir = tvm.contrib.utils.tempdir()
A = tvm.te.placeholder((2,), dtype="float32", name="A")
B = tvm.te.compute(A.shape, lambda i: tvm.te.exp(A[i]), name="B")
s = tvm.te.create_schedule(B.op)
a = tvm.te.placeholder((2,), dtype="float32", name="a")
b = tvm.te.compute(a.shape, lambda i: tvm.te.exp(a[i]), name="b")
s = tvm.te.create_schedule(b.op)

with _make_sess_from_op(temp_dir, "myexpf", s, [A, B]) as sess:
A_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
B_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
with _make_sess_from_op(temp_dir, "myexpf", s, [a, b]) as sess:
a_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
b_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
lib = sess.get_system_lib()
func = lib["myexpf"]
func(A_data, B_data)
np.testing.assert_allclose(B_data.numpy(), np.array([7.389056, 20.085537]))
func(a_data, b_data)
np.testing.assert_allclose(b_data.numpy(), np.array([7.389056, 20.085537]))


@tvm.testing.requires_micro
def test_platform_timer():
"""Verify the platform timer can be used to time remote functions."""
import tvm.micro

temp_dir = tvm.contrib.utils.tempdir()
A = tvm.te.placeholder((2,), dtype="float32", name="A")
B = tvm.te.compute(A.shape, lambda i: tvm.te.exp(A[i]), name="B")
s = tvm.te.create_schedule(B.op)
a = tvm.te.placeholder((2,), dtype="float32", name="a")
b = tvm.te.compute(a.shape, lambda i: tvm.te.exp(a[i]), name="b")
s = tvm.te.create_schedule(b.op)

with _make_sess_from_op(temp_dir, "myexpf", s, [A, B]) as sess:
A_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
B_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
with _make_sess_from_op(temp_dir, "myexpf", s, [a, b]) as sess:
a_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
b_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device)
lib = sess.get_system_lib()
time_eval_f = lib.time_evaluator(
"myexpf", sess.device, number=2000, repeat=3, min_repeat_ms=40
)
result = time_eval_f(A_data, B_data)
result = time_eval_f(a_data, b_data)
assert result.mean > 0
assert len(result.results) == 3


@tvm.testing.requires_micro
def test_autotune():
"""Verify that autotune works with micro."""
import tvm.relay as relay
from tvm.micro.testing.utils import check_tune_log

runtime = Runtime("crt", {"system-lib": True})

data = relay.var("data", relay.TensorType((1, 3, 64, 64), "float32"))
weight = relay.var("weight", relay.TensorType((8, 3, 5, 5), "float32"))
y = relay.nn.conv2d(
data = tvm.relay.var("data", tvm.relay.TensorType((1, 3, 64, 64), "float32"))
weight = tvm.relay.var("weight", tvm.relay.TensorType((8, 3, 5, 5), "float32"))
y = tvm.relay.nn.conv2d(
data,
weight,
padding=(2, 2),
kernel_size=(5, 5),
kernel_layout="OIHW",
out_dtype="float32",
)
f = relay.Function([data, weight], y)
f = tvm.relay.Function([data, weight], y)
mod = tvm.IRModule.from_expr(f)
mod = relay.transform.InferType()(mod)
mod = tvm.relay.transform.InferType()(mod)

main_func = mod["main"]
shape_dict = {p.name_hint: p.checked_type.concrete_shape for p in main_func.params}
Expand Down