# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import tvm def lower(s, args): binds = {} arg_list = [] for x in args: assert isinstance(x, tvm.tensor.Tensor) buf = tvm.decl_buffer(x.shape, dtype=x.dtype, name=x.op.name) binds[x] = buf arg_list.append(buf) s.normalize() bounds = tvm.schedule.InferBound(s) stmt = tvm.schedule.ScheduleOps(s, bounds) stmt = tvm.ir_pass.StorageFlatten(stmt, binds, 64) stmt = tvm.ir_pass.CanonicalSimplify(stmt) stmt = tvm.ir_pass.Simplify(stmt) return stmt def test_basic_pipeline(): n = tvm.convert(128) A = tvm.placeholder((n,), name='A') stages = [] num_stage = 3 B = A for k in range(num_stage): stages.append(B) B = tvm.compute((n,), lambda i: B[i] + k, name="A%s" % k) s = tvm.create_schedule(B.op) xo, xi = s[B].split(B.op.axis[0], nparts=1) s[B].bind(xo, tvm.thread_axis("pipeline")) xo, xi = s[B].split(xi, factor=4) for S in stages: s[S].compute_at(s[B], xo) stmt = lower(s, [A, B]) stmt = tvm.ir_pass.SplitPipeline(stmt, False) print(stmt) stmt = tvm.ir_pass.NarrowChannelAccess(stmt) print(stmt) assert(tvm.ir_pass.VerifySSA(stmt)) def test_conv1d(): n = tvm.var('n') A = tvm.compute((n+2), lambda i: 1, name='A') def computeB(ii): i = ii + 1 return A[i-1] + A[i] + A[i+1] B = tvm.compute(n, computeB, name='B') s = tvm.create_schedule(B.op) px, xi = s[B].split(B.op.axis[0], nparts=1) s[B].bind(px, tvm.thread_axis("pipeline")) s[A].compute_at(s[B], px) stmt = lower(s, [B]) stmt = tvm.ir_pass.SplitPipeline(stmt, False) print(stmt) stmt = tvm.ir_pass.NarrowChannelAccess(stmt) print(stmt) if __name__ == "__main__": test_basic_pipeline() test_conv1d()