"""Unit test VTA's instructions """ import tvm import numpy as np import topi from tvm.contrib import util import vta import vta.testing from vta.testing import simulator def test_save_load_out(): """Test save/store output command""" def _run(env, remote): n = 6 x = tvm.placeholder( (n, n, env.BATCH, env.BLOCK_OUT), name="x", dtype=env.acc_dtype) x_buf = tvm.compute( (n, n, env.BATCH, env.BLOCK_OUT), lambda *i: x(*i), "x_buf") # insert no-op that won't be optimized away y_buf = tvm.compute( (n, n, env.BATCH, env.BLOCK_OUT), lambda *i: x_buf(*i)>>0, "y_buf") y = tvm.compute( (n, n, env.BATCH, env.BLOCK_OUT), lambda *i: y_buf(*i).astype(env.inp_dtype), "y") # schedule s = tvm.create_schedule(y.op) s[x_buf].set_scope(env.acc_scope) s[x_buf].pragma(x_buf.op.axis[0], env.dma_copy) s[y_buf].set_scope(env.acc_scope) s[y_buf].pragma(y_buf.op.axis[0], env.alu) s[y].pragma(y.op.axis[0], env.dma_copy) # verification with vta.build_config(): m = vta.build(s, [x, y], "ext_dev", env.target_host) if not remote: return temp = util.tempdir() m.save(temp.relpath("load_act.o")) remote.upload(temp.relpath("load_act.o")) f = remote.load_module("load_act.o") # verify ctx = remote.ext_dev(0) x_np = np.random.randint( 1, 10, size=(n, n, env.BATCH, env.BLOCK_OUT)).astype(x.dtype) y_np = x_np.astype(y.dtype) x_nd = tvm.nd.array(x_np, ctx) y_nd = tvm.nd.empty(y_np.shape, ctx=ctx, dtype=y_np.dtype) f(x_nd, y_nd) np.testing.assert_equal(y_np, y_nd.asnumpy()) vta.testing.run(_run) def test_padded_load(): """Test padded load.""" def _run(env, remote): # declare n = 21 m = 20 pad_before = [0, 1, 0, 0] pad_after = [1, 3, 0, 0] x = tvm.placeholder( (n, m, env.BATCH, env.BLOCK_OUT), name="x", dtype=env.acc_dtype) x_buf = topi.nn.pad(x, pad_before, pad_after, name="y") # insert no-op that won't be optimized away y_buf = tvm.compute((n + pad_before[0] + pad_after[0], m + pad_before[1] + pad_after[1], env.BATCH, env.BLOCK_OUT), lambda *i: x_buf(*i)>>0, "y_buf") y = tvm.compute((n + pad_before[0] + pad_after[0], m + pad_before[1] + pad_after[1], env.BATCH, env.BLOCK_OUT), lambda *i: y_buf(*i).astype(env.inp_dtype), "y") # schedule s = tvm.create_schedule(y.op) s[x_buf].set_scope(env.acc_scope) s[x_buf].pragma(x_buf.op.axis[0], env.dma_copy) s[y_buf].set_scope(env.acc_scope) s[y_buf].pragma(y_buf.op.axis[0], env.alu) s[y].pragma(y.op.axis[0], env.dma_copy) # build with vta.build_config(): mod = vta.build(s, [x, y], "ext_dev", env.target_host) if not remote: return temp = util.tempdir() mod.save(temp.relpath("padded_load.o")) remote.upload(temp.relpath("padded_load.o")) f = remote.load_module("padded_load.o") # verify ctx = remote.ext_dev(0) x_np = np.random.randint(1, 2, size=( n, m, env.BATCH, env.BLOCK_OUT)).astype(x.dtype) y_np = np.zeros((n + pad_before[0] + pad_after[0], m + pad_before[1] + pad_after[1], env.BATCH, env.BLOCK_OUT)).astype(y.dtype) y_np[pad_before[0]:pad_before[0] + n, pad_before[1]:pad_before[1] + m, :] = x_np x_nd = tvm.nd.array(x_np, ctx) y_nd = tvm.nd.empty(y_np.shape, ctx=ctx, dtype=y_np.dtype) f(x_nd, y_nd) np.testing.assert_equal(y_np, y_nd.asnumpy()) vta.testing.run(_run) def test_gemm(): """Test GEMM.""" def _run(env, remote): # declare o = 4 n = 1 m = 4 x = tvm.placeholder((o, n, env.BATCH, env.BLOCK_IN), name="x", dtype=env.inp_dtype) w = tvm.placeholder((m, n, env.BLOCK_OUT, env.BLOCK_IN), name="w", dtype=env.wgt_dtype) x_buf = tvm.compute((o, n, env.BATCH, env.BLOCK_IN), lambda *i: x(*i), "x_buf") w_buf = tvm.compute((m, n, env.BLOCK_OUT, env.BLOCK_IN), lambda *i: w(*i), "w_buf") ko = tvm.reduce_axis((0, n), name="ko") ki = tvm.reduce_axis((0, env.BLOCK_IN), name="ki") y_gem = tvm.compute( (o, m, env.BATCH, env.BLOCK_OUT), lambda bo, co, bi, ci: tvm.sum(x_buf[bo, ko, bi, ki].astype(env.acc_dtype) * w_buf[co, ko, ci, ki].astype(env.acc_dtype), axis=[ko, ki]), name="y_gem") y_shf = tvm.compute( (o, m, env.BATCH, env.BLOCK_OUT), lambda *i: y_gem(*i)>>8, name="y_shf") y_max = tvm.compute( (o, m, env.BATCH, env.BLOCK_OUT), lambda *i: tvm.max(y_shf(*i), 0), "y_max") #relu y_min = tvm.compute( (o, m, env.BATCH, env.BLOCK_OUT), lambda *i: tvm.min(y_max(*i), (1<<(env.INP_WIDTH-1))-1), "y_min") #relu y = tvm.compute( (o, m, env.BATCH, env.BLOCK_OUT), lambda *i: y_min(*i).astype(env.inp_dtype), name="y") if not remote: return def verify(s): mod = vta.build(s, [x, w, y], "ext_dev", env.target_host) temp = util.tempdir() mod.save(temp.relpath("gemm.o")) remote.upload(temp.relpath("gemm.o")) f = remote.load_module("gemm.o") # verify ctx = remote.ext_dev(0) x_np = np.random.randint( -128, 128, size=(o, n, env.BATCH, env.BLOCK_IN)).astype(x.dtype) w_np = np.random.randint( -128, 128, size=(m, n, env.BLOCK_OUT, env.BLOCK_IN)).astype(w.dtype) y_np = np.zeros((o, m, env.BATCH, env.BLOCK_OUT)).astype(y.dtype) x_nd = tvm.nd.array(x_np, ctx) w_nd = tvm.nd.array(w_np, ctx) y_nd = tvm.nd.array(y_np, ctx) y_np = y_np.astype(env.acc_dtype) for b in range(o): for i in range(m): for j in range(n): y_np[b,i,:] += np.dot(x_np[b,j,:].astype(env.acc_dtype), w_np[i,j].T.astype(env.acc_dtype)) y_np = np.right_shift(y_np, 8) y_np = np.clip(y_np, 0, (1<<(env.INP_WIDTH-1))-1).astype(y.dtype) if env.TARGET == "sim": simulator.clear_stats() f(x_nd, w_nd, y_nd) print(simulator.stats()) else: f(x_nd, w_nd, y_nd) np.testing.assert_equal(y_np, y_nd.asnumpy()) def test_schedule1(): # default schedule with no smt s = tvm.create_schedule(y.op) # set the scope of the SRAM buffers s[x_buf].set_scope(env.inp_scope) s[w_buf].set_scope(env.wgt_scope) s[y_gem].set_scope(env.acc_scope) s[y_shf].set_scope(env.acc_scope) s[y_max].set_scope(env.acc_scope) s[y_min].set_scope(env.acc_scope) # set pragmas for DMA transfer and ALU ops s[x_buf].compute_at(s[y_gem], ko) s[x_buf].pragma(s[x_buf].op.axis[0], env.dma_copy) s[w_buf].compute_at(s[y_gem], ko) s[w_buf].pragma(s[w_buf].op.axis[0], env.dma_copy) s[y_shf].pragma(s[y_shf].op.axis[0], env.alu) s[y_max].pragma(s[y_max].op.axis[0], env.alu) s[y_min].pragma(s[y_min].op.axis[0], env.alu) s[y].pragma(s[y].op.axis[0], env.dma_copy) # tensorization s[y_gem].reorder( ko, s[y_gem].op.axis[0], s[y_gem].op.axis[1], s[y_gem].op.axis[2], s[y_gem].op.axis[3], ki) s[y_gem].tensorize(s[y_gem].op.axis[2], env.gemm) verify(s) def test_smt(): # test smt schedule s = tvm.create_schedule(y.op) s[x_buf].set_scope(env.inp_scope) s[w_buf].set_scope(env.wgt_scope) s[y_gem].set_scope(env.acc_scope) s[y_shf].set_scope(env.acc_scope) s[y_max].set_scope(env.acc_scope) s[y_min].set_scope(env.acc_scope) abo, aco, abi, aci = s[y].op.axis abo1, abo2 = s[y].split(abo, nparts=2) s[y].bind(abo1, tvm.thread_axis("cthread")) s[y_gem].compute_at(s[y], abo1) s[y_shf].compute_at(s[y], abo1) s[y_max].compute_at(s[y], abo1) s[y_min].compute_at(s[y], abo1) s[y_gem].reorder( ko, s[y_gem].op.axis[0], s[y_gem].op.axis[1], s[y_gem].op.axis[2], s[y_gem].op.axis[3], ki) s[y_gem].tensorize(s[y_gem].op.axis[2], env.gemm) s[y_shf].pragma(s[y_shf].op.axis[0], env.alu) s[y_max].pragma(s[y_max].op.axis[0], env.alu) s[y_min].pragma(s[y_min].op.axis[0], env.alu) s[x_buf].compute_at(s[y_gem], ko) s[x_buf].pragma(s[x_buf].op.axis[0], env.dma_copy) s[w_buf].compute_at(s[y_gem], ko) s[w_buf].pragma(s[w_buf].op.axis[0], env.dma_copy) s[y].pragma(abo2, env.dma_copy) verify(s) test_schedule1() test_smt() vta.testing.run(_run) def test_alu(): def _run(env, remote): def check_alu(tvm_op, np_op=None, use_imm=False): """Test ALU""" m = 8 n = 8 imm = np.random.randint(1,5) # compute a = tvm.placeholder( (m, n, env.BATCH, env.BLOCK_OUT), name="a", dtype=env.acc_dtype) a_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: a(*i), "a_buf") #DRAM->SRAM if use_imm: res_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: tvm_op(a_buf(*i), imm), "res_buf") #compute else: b = tvm.placeholder( (m, n, env.BATCH, env.BLOCK_OUT), name="b", dtype=env.acc_dtype) b_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: b(*i), "b_buf") #DRAM->SRAM res_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: tvm_op(a_buf(*i), b_buf(*i)), "res_buf") #compute5B res = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: res_buf(*i).astype(env.inp_dtype), "res") #SRAM->DRAM # schedule s = tvm.create_schedule(res.op) s[a_buf].set_scope(env.acc_scope) # SRAM s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM s[res_buf].set_scope(env.acc_scope) # SRAM s[res_buf].pragma(res_buf.op.axis[0], env.alu) # compute s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM if not use_imm: s[b_buf].set_scope(env.acc_scope) # SRAM s[b_buf].pragma(b_buf.op.axis[0], env.dma_copy) # DRAM->SRAM if not remote: return # build with vta.build_config(): if use_imm: mod = vta.build(s, [a, res], "ext_dev", env.target_host) else: mod = vta.build(s, [a, b, res], "ext_dev", env.target_host) temp = util.tempdir() mod.save(temp.relpath("load_act.o")) remote.upload(temp.relpath("load_act.o")) f = remote.load_module("load_act.o") # verify ctx = remote.ext_dev(0) a_np = np.random.randint( -16, 16, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype) if use_imm: res_np = np_op(a_np, imm) if np_op else tvm_op(a_np, imm) else: b_np = np.random.randint( -16, 16, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(b.dtype) res_np = np_op(a_np, b_np) if np_op else tvm_op(a_np, b_np) res_np = res_np.astype(res.dtype) a_nd = tvm.nd.array(a_np, ctx) res_nd = tvm.nd.array( np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx) if use_imm: f(a_nd, res_nd) else: b_nd = tvm.nd.array(b_np, ctx) f(a_nd, b_nd, res_nd) np.testing.assert_equal(res_np, res_nd.asnumpy()) check_alu(lambda x, y: x << y, np.left_shift, use_imm=True) check_alu(tvm.max, np.maximum, use_imm=True) check_alu(tvm.max, np.maximum) check_alu(lambda x, y: x + y, use_imm=True) check_alu(lambda x, y: x + y) check_alu(lambda x, y: x >> y, np.right_shift, use_imm=True) vta.testing.run(_run) def test_relu(): """Test RELU on ALU""" def _run(env, remote): m = 8 n = 10 # compute a = tvm.placeholder( (m, n, env.BATCH, env.BLOCK_OUT), name="a", dtype=env.acc_dtype) a_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: a(*i), "a_buf") # DRAM->SRAM max_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: tvm.max(a_buf(*i), 0), "res_buf") # relu min_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: tvm.min(max_buf(*i), (1<<(env.INP_WIDTH-1))-1), "max_buf") # relu res = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: min_buf(*i).astype(env.inp_dtype), "min_buf") # SRAM->DRAM # schedule s = tvm.create_schedule(res.op) s[a_buf].set_scope(env.acc_scope) # SRAM s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM s[max_buf].set_scope(env.acc_scope) # SRAM s[min_buf].set_scope(env.acc_scope) # SRAM s[max_buf].pragma(max_buf.op.axis[0], env.alu) # compute s[min_buf].pragma(min_buf.op.axis[0], env.alu) # compute s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM # build with vta.build_config(): mod = vta.build(s, [a, res], "ext_dev", env.target_host) if not remote: return temp = util.tempdir() mod.save(temp.relpath("load_act.o")) remote.upload(temp.relpath("load_act.o")) f = remote.load_module("load_act.o") # verify ctx = remote.ext_dev(0) a_np = np.random.randint( -256, 256, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype) res_np = np.clip(a_np, 0, (1<<(env.INP_WIDTH-1))-1).astype(res.dtype) a_nd = tvm.nd.array(a_np, ctx) res_nd = tvm.nd.array( np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx) f(a_nd, res_nd) np.testing.assert_equal(res_np, res_nd.asnumpy()) vta.testing.run(_run) def test_shift_and_scale(): """Test shift and scale on ALU""" def _run(env, remote): m = 2 n = 8 imm_shift = np.random.randint(0,8) imm_scale = np.random.randint(1,5) # compute a = tvm.placeholder( (m, n, env.BATCH, env.BLOCK_OUT), name="a", dtype=env.acc_dtype) a_buf = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: a(*i), "a_buf") # DRAM->SRAM res_shift = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: a_buf(*i)+imm_shift, "res_shift") # compute res_scale = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: res_shift(*i)>>imm_scale, "res_scale") # compute res = tvm.compute( (m, n, env.BATCH, env.BLOCK_OUT), lambda *i: res_scale(*i).astype(env.inp_dtype), "res") # SRAM->DRAM # schedule s = tvm.create_schedule(res.op) s[a_buf].set_scope(env.acc_scope) # SRAM s[res_shift].set_scope(env.acc_scope) # SRAM s[res_scale].set_scope(env.acc_scope) # SRAM s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM s[res_shift].pragma(res_shift.op.axis[0], env.alu) # compute s[res_scale].pragma(res_scale.op.axis[0], env.alu) # compute s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM # build mod = vta.build(s, [a, res], "ext_dev", env.target_host) if not remote: return temp = util.tempdir() mod.save(temp.relpath("load_act.o")) remote.upload(temp.relpath("load_act.o")) f = remote.load_module("load_act.o") # verify ctx = remote.ext_dev(0) a_np = np.random.randint( -10, 10, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype) res_np = np.right_shift((a_np + imm_shift), imm_scale) res_np = res_np.astype(res.dtype) a_nd = tvm.nd.array(a_np, ctx) res_nd = tvm.nd.array( np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), ctx) f(a_nd, res_nd) np.testing.assert_equal(res_np, res_nd.asnumpy()) vta.testing.run(_run) def test_runtime_array(): def _run(env, remote): n = 100 ctx = remote.ext_dev(0) x_np = np.random.randint( 1, 10, size=(n, n, env.BATCH, env.BLOCK_OUT)).astype("int8") x_nd = tvm.nd.array(x_np, ctx) np.testing.assert_equal(x_np, x_nd.asnumpy()) vta.testing.run(_run) if __name__ == "__main__": print("Array test") test_runtime_array() print("Load/store test") test_save_load_out() print("Padded load test") #test_padded_load() print("GEMM test") test_gemm() test_alu() print("ALU test") test_relu() print("Shift and scale") test_shift_and_scale()