from tvm import relay import numpy as np def test_combine_parallel_conv2d(): """Simple testcase.""" def before(x, w1, w2, w3, w4): args = [x, w1, w2, w3, w4] y1 = relay.nn.conv2d(x, w1) y2 = relay.nn.conv2d(x, w2) # y3 cannot be combined y3 = relay.nn.conv2d(x, w3) y4 = relay.nn.conv2d(x, w4) y5 = relay.nn.max_pool2d(x) y = relay.Tuple((y1, y2, y3, y4, y5)) return relay.Function(args, y) def expected(x, w1, w2, w3, w4, channels1, channels2, channels3, channels4): # use a fixed order of args so alpha equal check can pass args = [x, w1, w2, w3, w4] w = relay.concatenate((w1, w2, w4), axis=0) y = relay.nn.conv2d(x, w, channels=channels1 + channels2 + channels4) y1 = relay.strided_slice(y, [0, 0], [None, channels1]) y2 = relay.strided_slice(y, [0, channels1], [None, channels1 + channels2]) y3 = relay.nn.conv2d(x, w3) y4 = relay.strided_slice(y, [0, channels1 + channels2], [None, channels1 + channels2 + channels4]) y5 = relay.nn.max_pool2d(x) y = relay.Tuple((y1, y2, y3, y4, y5)) return relay.Function(args, y) def check(x_shape, channels1, channels2, channels3, channels4): x = relay.var("x", shape=x_shape) in_c = x_shape[1] w1 = relay.var("w1", shape=(channels1, in_c, 1, 1)) w2 = relay.var("w2", shape=(channels2, in_c, 1, 1)) w3 = relay.var("w3", shape=(channels3, in_c, 3, 3)) w4 = relay.var("w4", shape=(channels4, in_c, 1, 1)) y_before = before(x, w1, w2, w3, w4) y = relay.ir_pass.infer_type(y_before) y = relay.ir_pass.combine_parallel_conv2d(y) y = relay.ir_pass.infer_type(y) y_expected = expected(x, w1, w2, w3, w4, channels1, channels2, channels3, channels4) y_expected = relay.ir_pass.infer_type(y_expected) assert relay.ir_pass.alpha_equal(y, y_expected) check((1, 4, 16, 16), 4, 4, 4, 4) check((1, 4, 16, 16), 4, 8, 4, 7) def test_combine_parallel_conv2d_scale_relu(): """Testcase of combining conv2d + scale + relu""" def before(x, w1, w2, scale1, scale2, bias): args = [x, w1, w2, scale1, scale2, bias] y1 = relay.nn.conv2d(x, w1) y1 = relay.multiply(y1, scale1) y1 = relay.nn.relu(y1) y2 = relay.nn.conv2d(x, w2) y2 = relay.multiply(y2, scale2) y2 = relay.nn.relu(y2) y2 = relay.add(y2, bias) y = relay.Tuple((y1, y2)) return relay.Function(args, y) def expected(x, w1, w2, scale1, scale2, bias, channels1, channels2): args = [x, w1, w2, scale1, scale2, bias] w = relay.concatenate((w1, w2), axis=0) scale = relay.concatenate((scale1, scale2), axis=0) y = relay.nn.conv2d(x, w, channels=channels1 + channels2) y = relay.multiply(y, scale) y = relay.nn.relu(y) y1 = relay.strided_slice(y, [0, 0], [None, channels1]) y2 = relay.strided_slice(y, [0, channels1], [None, channels1 + channels2]) y2 = relay.add(y2, bias) y = relay.Tuple((y1, y2)) return relay.Function(args, y) def check(x_shape, channels1, channels2): x = relay.var("x", shape=x_shape) in_c = x_shape[1] w1 = relay.var("w1", shape=(channels1, in_c, 1, 1)) w2 = relay.var("w2", shape=(channels2, in_c, 1, 1)) scale1 = relay.var("scale1", shape=(channels1, 1, 1)) scale2 = relay.var("scale2", shape=(channels2, 1, 1)) bias = relay.var("bias", shape=(channels2, 1, 1)) y_before = before(x, w1, w2, scale1, scale2, bias) y = relay.ir_pass.infer_type(y_before) y = relay.ir_pass.combine_parallel_conv2d(y) y = relay.ir_pass.infer_type(y) y_expected = expected(x, w1, w2, scale1, scale2, bias, channels1, channels2) y_expected = relay.ir_pass.infer_type(y_expected) assert relay.ir_pass.alpha_equal(y, y_expected) check((1, 4, 16, 16), 4, 8) def test_combine_parallel_conv2d_scale(): """Testcase of un-combinable scale""" def before(x, w1, w2, scale1, scale2): args = [x, w1, w2, scale1, scale2] y1 = relay.nn.conv2d(x, w1) y1 = relay.multiply(y1, scale1) y2 = relay.nn.conv2d(x, w2) y2 = relay.multiply(y2, scale2) y = relay.Tuple((y1, y2)) return relay.Function(args, y) def expected(x, w1, w2, scale1, scale2, channels1, channels2): args = [x, w1, w2, scale1, scale2] w = relay.concatenate((w1, w2), axis=0) y = relay.nn.conv2d(x, w, channels=channels1 + channels2) y1 = relay.strided_slice(y, [0, 0], [None, channels1]) y2 = relay.strided_slice(y, [0, channels1], [None, channels1 + channels2]) y1 = relay.multiply(y1, scale1) y2 = relay.multiply(y2, scale2) y = relay.Tuple((y1, y2)) return relay.Function(args, y) def check(x_shape, channels1, channels2): x = relay.var("x", shape=x_shape) in_c = x_shape[1] w1 = relay.var("w1", shape=(channels1, in_c, 1, 1)) w2 = relay.var("w2", shape=(channels2, in_c, 1, 1)) scale1 = relay.var("scale1", shape=(1,)) scale2 = relay.var("scale2", shape=(1,)) y_before = before(x, w1, w2, scale1, scale2) y = relay.ir_pass.infer_type(y_before) y = relay.ir_pass.combine_parallel_conv2d(y) y = relay.ir_pass.infer_type(y) y_expected = expected(x, w1, w2, scale1, scale2, channels1, channels2) y_expected = relay.ir_pass.infer_type(y_expected) assert relay.ir_pass.alpha_equal(y, y_expected) check((1, 4, 16, 16), 4, 8) def test_combine_parallel_conv2d_multiple_blocks(): def before(x, w, repeat): args = [x, w] y = x for i in range(repeat): y1 = relay.nn.conv2d(y, w) y2 = relay.nn.conv2d(y, w) y = relay.concatenate((y1, y2), axis=1) return relay.Function(args, y) def expected(x, w, channels, repeat): args = [x, w] y = x for i in range(repeat): w_concat = relay.concatenate((w, w), axis=0) y = relay.nn.conv2d(y, w_concat, channels=channels*2) y1 = relay.strided_slice(y, [0, 0], [None, channels]) y2 = relay.strided_slice(y, [0, channels], [None, channels * 2]) y = relay.concatenate((y1, y2), axis=1) return relay.Function(args, y) def check(x_shape, repeat): x = relay.var("x", shape=x_shape) in_c = x_shape[1] out_c = in_c // 2 w = relay.var("w", shape=(out_c, in_c, 1, 1)) y_before = before(x, w, repeat) y = relay.ir_pass.infer_type(y_before) y = relay.ir_pass.combine_parallel_conv2d(y) y = relay.ir_pass.infer_type(y) y_expected = expected(x, w, out_c, repeat) y_expected = relay.ir_pass.infer_type(y_expected) assert relay.ir_pass.alpha_equal(y, y_expected) check((1, 4, 16, 16), 4) if __name__ == "__main__": test_combine_parallel_conv2d() test_combine_parallel_conv2d_scale_relu() test_combine_parallel_conv2d_scale() test_combine_parallel_conv2d_multiple_blocks()