# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import tvm
import numpy as np
from tvm.contrib import mps

def test_matmul():
    if not tvm.module.enabled("metal"):
        print("skip because %s is not enabled..." % "metal")
        return
    n = 1024
    l = 128
    m = 256
    A = tvm.placeholder((n, l), name='A')
    B = tvm.placeholder((l, m), name='B')
    C = mps.matmul(A, B)
    D = tvm.compute(
        C.shape,
        lambda *i: C(*i) + 1.
    )
    s = tvm.create_schedule(D.op)
    yo, xo = D.op.axis
    block_y = tvm.thread_axis("blockIdx.y")
    block_x = tvm.thread_axis("blockIdx.x")
    thread_y = tvm.thread_axis("threadIdx.y")
    thread_x = tvm.thread_axis("threadIdx.x")
    by, ty = s[D].split(yo, factor=16)
    bx, tx = s[D].split(xo, factor=16)
    s[D].bind(by, block_y)
    s[D].bind(bx, block_x)
    s[D].bind(ty, thread_y)
    s[D].bind(tx, thread_x)



    def verify(A, B, D, s, target="metal"):
        if not tvm.get_global_func("tvm.contrib.mps.matmul", True):
            print("skip because extern function is not available")
            return
        ctx = tvm.metal(0)
        f = tvm.build(s, [A, B, D], "metal")
        a = tvm.nd.array(np.random.uniform(size=(n, l)).astype(A.dtype), ctx)
        b = tvm.nd.array(np.random.uniform(size=(l, m)).astype(B.dtype), ctx)
        c = tvm.nd.array(np.zeros((n, m), dtype=C.dtype), ctx)
        f(a, b, c)
        tvm.testing.assert_allclose(
            c.asnumpy(), np.dot(a.asnumpy(), b.asnumpy()) + 1, rtol=1e-5)
    verify(A, B, D, s)

def test_conv2d():
    if not tvm.module.enabled("metal"):
        print("skip because %s is not enabled..." % "metal")
        return
    n = 1
    h = 14
    w = 14
    ci = 2
    co = 4
    kh = 3
    kw = 3
    stride = 2
    A = tvm.placeholder((n, h, w, ci), name="x")
    B = tvm.placeholder((co, kh, kw, ci), name="w")
    C = mps.conv2d(A, B, 'SAME', 2)
    s1 = tvm.create_schedule(C.op)

    def verify(A, B, C, target="llvm"):
        if not tvm.get_global_func("tvm.contrib.mps.conv2d", True):
            print("skip because extern function is not available")
            return
        ctx = tvm.metal(0)
        f = tvm.build(s1, [A, B, C], "metal")
        a = tvm.nd.array(np.random.uniform(size=(n, h, w, ci)).astype(A.dtype), ctx)
        b = tvm.nd.array(np.random.uniform(size=(co, kh, kw, ci)).astype(B.dtype), ctx)
        c = tvm.nd.array(np.zeros((n, h // stride, w // stride, co), dtype=C.dtype), ctx)
        f(a, b, c)
        # print(c.asnumpy())
        # print(c.shape)

    verify(A, B, C, s1)


if __name__ == "__main__":
    #test_matmul()
    test_conv2d()