test_vpi_mem_interface.py 3.65 KB
Newer Older
1 2
import tvm
import numpy as np
3
from tvm.contrib import verilog
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53

class FIFOReader(object):
    """Auxiliary class to read from FIFO """
    def __init__(self, read_data, read_valid):
        self.read_data = read_data
        self.read_valid = read_valid
        self.data = []

    def __call__(self):
        if self.read_valid.get_int():
            self.data.append(self.read_data.get_int())

class FIFOWriter(object):
    """Auxiliary class to write to FIFO """
    def __init__(self, write_data, write_enable, write_pend, data):
        self.write_data = write_data
        self.write_enable = write_enable
        self.write_pend = write_pend
        self.data = data

    def __call__(self):
        if self.data and self.write_pend.get_int():
            self.write_enable.put_int(1)
            self.write_data.put_int(int(self.data[0]))
            del self.data[0]
        else:
            self.write_enable.put_int(0)


def test_ram_read():
    n = 10
    # context for VPI RAM
    ctx = tvm.vpi(0)
    a_np = np.arange(n).astype('int8')
    a = tvm.nd.array(a_np, ctx)

    # head ptr of a
    a_ptr = int(a.handle[0].data)
    sess = verilog.session([
        verilog.find_file("test_vpi_mem_interface.v"),
        verilog.find_file("tvm_vpi_mem_interface.v")
    ])
    rst = sess.main.rst
    read_data = sess.main.read_data
    read_valid = sess.main.read_data_valid
    read_en = sess.main.read_en
    host_read_req = sess.main.read_req
    host_read_addr = sess.main.read_addr
    host_read_size = sess.main.read_size
    rst.put_int(1)
54
    sess.yield_until_next_cycle()
55 56 57 58 59 60 61 62 63
    rst.put_int(0)
    # hook up reader
    reader = FIFOReader(read_data, read_valid)
    sess.yield_callbacks.append(reader)
    # request read
    host_read_req.put_int(1)
    host_read_addr.put_int(a_ptr)
    host_read_size.put_int(a.shape[0])

64
    sess.yield_until_next_cycle()
65 66 67 68
    # second read request
    host_read_addr.put_int(a_ptr + 2)
    host_read_size.put_int(a.shape[0] - 2)

69
    sess.yield_until_next_cycle()
70 71 72 73 74
    host_read_req.put_int(0)
    read_en.put_int(1)

    # yield until read is done
    for i in range(a.shape[0] * 3):
75
        sess.yield_until_next_cycle()
76
    sess.shutdown()
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    # check if result matches
    r = np.concatenate((a_np, a_np[2:]))
    np.testing.assert_equal(np.array(reader.data), r)

def test_ram_write():
    n = 10
    # read from offset
    offset = 2
    # context for VPI RAM
    ctx = tvm.vpi(0)
    a_np = np.zeros(n).astype('int8')
    a = tvm.nd.array(a_np, ctx)
    w_data = list(range(2, n))
    r_data = np.array(w_data, dtype='int8')

    # head ptr of a
    a_ptr = int(a.handle[0].data)

    sess = verilog.session([
        verilog.find_file("test_vpi_mem_interface.v"),
        verilog.find_file("tvm_vpi_mem_interface.v")
    ])
    rst = sess.main.rst
    write_data = sess.main.write_data
    write_en = sess.main.write_en
    write_ready = sess.main.write_data_ready
    host_write_req = sess.main.write_req
    host_write_addr = sess.main.write_addr
    host_write_size = sess.main.write_size

    rst.put_int(1)
108
    sess.yield_until_next_cycle()
109 110 111 112 113 114 115 116 117 118
    rst.put_int(0)
    # hook up writeer
    writer = FIFOWriter(write_data, write_en, write_ready, w_data)

    sess.yield_callbacks.append(writer)
    # request write
    host_write_req.put_int(1)
    host_write_addr.put_int(a_ptr + offset)
    host_write_size.put_int(a.shape[0] - offset)

119
    sess.yield_until_next_cycle()
120 121 122 123
    host_write_req.put_int(0)

    # yield until write is done
    for i in range(a.shape[0]+2):
124
        sess.yield_until_next_cycle()
125
    sess.shutdown()
126 127 128 129 130 131 132
    # check if result matches
    np.testing.assert_equal(a.asnumpy()[2:], r_data)


if __name__ == "__main__":
    test_ram_read()
    test_ram_write()