test_tuning.py 4.65 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
"""
Test the tuner
"""
import logging
import time

import tvm

from tvm import autotvm
from tvm.autotvm.tuner import RandomTuner

@autotvm.template
def conv2d_no_batching(N, H, W, CI, CO, KH, KW):
    """An example template for testing"""
    assert N == 1, "Only consider batch_size = 1 in this template"

    data = tvm.placeholder((N, CI, H, W), name='data')
    kernel = tvm.placeholder((CO, CI, KH, KW), name='kernel')

    rc = tvm.reduce_axis((0, CI), name='rc')
    ry = tvm.reduce_axis((0, KH), name='ry')
    rx = tvm.reduce_axis((0, KW), name='rx')

    conv = tvm.compute(
        (N, CO, H - KH + 1, W - KW + 1),
        lambda nn, ff, yy, xx: tvm.sum(
            data[nn, rc, yy + ry, xx + rx] * kernel[ff, rc, ry, rx],
            axis=[rc, ry, rx]), tag="conv2d_nchw")

    s = tvm.create_schedule([conv.op])

    output = conv
    OL = s.cache_write(conv, 'local')

    # create cache stage
    AA = s.cache_read(data, 'shared', [OL])
    WW = s.cache_read(kernel, 'shared', [OL])
    AL = s.cache_read(AA, 'local', [OL])
    WL = s.cache_read(WW, 'local', [OL])

    # tile and bind spatial axes
    n, f, y, x = s[output].op.axis
    cfg = autotvm.get_config()
    cfg.define_split("tile_f", cfg.axis(f), num_outputs=4)
    cfg.define_split("tile_y", cfg.axis(y), num_outputs=4)
    cfg.define_split("tile_x", cfg.axis(x), num_outputs=4)
    bf, vf, tf, fi = cfg["tile_f"].apply(s, output, f)
    by, vy, ty, yi = cfg["tile_y"].apply(s, output, y)
    bx, vx, tx, xi = cfg["tile_x"].apply(s, output, x)
    kernel_scope = n  # this is the scope to attach global config inside this kernel

    s[output].bind(bf, tvm.thread_axis("blockIdx.z"))
    s[output].bind(by, tvm.thread_axis("blockIdx.y"))
    s[output].bind(bx, tvm.thread_axis("blockIdx.x"))
    s[output].bind(vf, tvm.thread_axis("vthread"))
    s[output].bind(vy, tvm.thread_axis("vthread"))
    s[output].bind(vx, tvm.thread_axis("vthread"))
    s[output].bind(tf, tvm.thread_axis("threadIdx.z"))
    s[output].bind(ty, tvm.thread_axis("threadIdx.y"))
    s[output].bind(tx, tvm.thread_axis("threadIdx.x"))
    s[output].reorder(n, bf, by, bx, vf, vy, vx, tf, ty, tx, fi, yi, xi)
    s[OL].compute_at(s[output], tx)

    # tile and bind reduction axes
    n, f, y, x = s[OL].op.axis
    rc, ry, rx = s[OL].op.reduce_axis
    cfg.define_split("tile_rc", cfg.axis(rc), num_outputs=3)
    cfg.define_split("tile_ry", cfg.axis(ry), num_outputs=3)
    cfg.define_split("tile_rx", cfg.axis(rx), num_outputs=3)
    rco, rcm, rci = cfg['tile_rc'].apply(s, OL, rc)
    ryo, rym, ryi = cfg['tile_rx'].apply(s, OL, ry)
    rxo, rxm, rxi = cfg['tile_ry'].apply(s, OL, rx)
    s[OL].reorder(rco, ryo, rxo, rcm, rym, rxm, rci, ryi, rxi, n, f, y, x)

    s[AA].compute_at(s[OL], rxo)
    s[WW].compute_at(s[OL], rxo)
    s[AL].compute_at(s[OL], rxm)
    s[WL].compute_at(s[OL], rxm)

    # cooperative fetching
    for load in [AA, WW]:
        n, f, y, x = s[load].op.axis
        fused = s[load].fuse(n, f, y, x)
        tz, fused = s[load].split(fused, nparts=cfg["tile_f"].size[2])
        ty, fused = s[load].split(fused, nparts=cfg["tile_y"].size[2])
        tx, fused = s[load].split(fused, nparts=cfg["tile_x"].size[2])
        s[load].bind(tz, tvm.thread_axis("threadIdx.z"))
        s[load].bind(ty, tvm.thread_axis("threadIdx.y"))
        s[load].bind(tx, tvm.thread_axis("threadIdx.x"))

    # tune unroll
    cfg.define_knob("auto_unroll_max_step", [0, 512, 1500])
    cfg.define_knob("unroll_explicit", [0, 1])
    s[output].pragma(kernel_scope, 'auto_unroll_max_step', cfg['auto_unroll_max_step'].val)
    s[output].pragma(kernel_scope, 'unroll_explicit', cfg['unroll_explicit'].val)

    return s, [data, kernel, conv]

def get_sample_task(target=tvm.target.cuda(), target_host=None):
    """return a sample task for testing"""
    task = autotvm.task.create(conv2d_no_batching,
                               args=(1, 7, 7, 512, 512, 3, 3),
                               target=target, target_host=target_host)
    return task, target

106
def test_tuning():
107 108 109 110 111 112 113 114 115 116
    def check(target, target_host):
        ctx = tvm.context(target, 0)
        if not ctx.exist:
            logging.info("Skip test because %s is not available" % target)
            return

        # init task
        task, target = get_sample_task(target, target_host)
        logging.info("%s", task.config_space)

117 118 119
        measure_option = autotvm.measure_option(
            autotvm.LocalBuilder(),
            autotvm.LocalRunner())
120 121

        tuner = RandomTuner(task)
122
        tuner.tune(n_trial=20, measure_option=measure_option)
123 124 125 126 127 128

    check("cuda", None)
    check("opencl", None)

if __name__ == "__main__":
    # only print log when invoked from main
129
    logging.basicConfig(level=logging.DEBUG)
130

131
    test_tuning()