forked from Xilinx/mlir-aie
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aie2.py
97 lines (81 loc) · 3.21 KB
/
aie2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# passthrough_kernel/aie2.py -*- Python -*-
#
# This file is licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# (c) Copyright 2024 Advanced Micro Devices, Inc. or its affiliates
import numpy as np
import sys
from aie.dialects.aie import *
from aie.dialects.aiex import *
from aie.extras.context import mlir_mod_ctx
from aie.helpers.dialects.ext.scf import _for as range_
import aie.utils.trace as trace_utils
def passthroughKernel(vector_size, trace_size):
N = vector_size
lineWidthInBytes = N // 4 # chop input in 4 sub-tensors
@device(AIEDevice.npu1_1col)
def device_body():
# define types
vector_ty = np.ndarray[(N,), np.dtype[np.uint8]]
line_ty = np.ndarray[(lineWidthInBytes,), np.dtype[np.uint8]]
# AIE Core Function declarations
passThroughLine = external_func(
"passThroughLine", inputs=[line_ty, line_ty, np.int32]
)
# Tile declarations
ShimTile = tile(0, 0)
ComputeTile2 = tile(0, 2)
# Set up a circuit-switched flow from core to shim for tracing information
if trace_size > 0:
flow(ComputeTile2, WireBundle.Trace, 0, ShimTile, WireBundle.DMA, 1)
# AIE-array data movement with object fifos
of_in = object_fifo("in", ShimTile, ComputeTile2, 2, line_ty)
of_out = object_fifo("out", ComputeTile2, ShimTile, 2, line_ty)
# Set up compute tiles
# Compute tile 2
@core(ComputeTile2, "passThrough.cc.o")
def core_body():
for _ in range_(sys.maxsize):
elemOut = of_out.acquire(ObjectFifoPort.Produce, 1)
elemIn = of_in.acquire(ObjectFifoPort.Consume, 1)
passThroughLine(elemIn, elemOut, lineWidthInBytes)
of_in.release(ObjectFifoPort.Consume, 1)
of_out.release(ObjectFifoPort.Produce, 1)
# print(ctx.module.operation.verify())
@runtime_sequence(vector_ty, vector_ty, vector_ty)
def sequence(inTensor, outTensor, notUsed):
if trace_size > 0:
trace_utils.configure_simple_tracing_aie2(
ComputeTile2,
ShimTile,
ddr_id=1,
size=trace_size,
offset=N,
)
npu_dma_memcpy_nd(
metadata=of_in,
bd_id=0,
mem=inTensor,
sizes=[1, 1, 1, N],
issue_token=True,
)
npu_dma_memcpy_nd(
metadata=of_out,
bd_id=1,
mem=outTensor,
sizes=[1, 1, 1, N],
)
dma_wait(of_in, of_out)
try:
vector_size = int(sys.argv[1])
if vector_size % 64 != 0 or vector_size < 512:
print("Vector size must be a multiple of 64 and greater than or equal to 512")
raise ValueError
trace_size = 0 if (len(sys.argv) != 3) else int(sys.argv[2])
except ValueError:
print("Argument has inappropriate value")
with mlir_mod_ctx() as ctx:
passthroughKernel(vector_size, trace_size)
print(ctx.module)