-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathperipherals.py
More file actions
234 lines (201 loc) · 8.23 KB
/
peripherals.py
File metadata and controls
234 lines (201 loc) · 8.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#
# Copyright (2025) Ciro Cattuto <ciro.cattuto@gmail.com>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import os, fcntl, selectors, tty
from ram import MemoryAccessError
# Base class for peripherals with memory-mapped IO
class MMIOPeripheral:
REG_BASE = None
REG_END = None
def read32(self, addr):
return 0
def write32(self, addr, value):
pass
# a run() method, if defined, will be registered and called periodically by the emulator
#def run(self):
# pass
# mtime
class MMIOTimer(MMIOPeripheral):
REG_BASE = 0x0200_4000
REG_MTIMECMP_LO = REG_BASE + 0x00
REG_MTIMECMP_HI = REG_MTIMECMP_LO + 0x04
REG_MTIME_LO = 0x0200_BFF8
REG_MTIME_HI = REG_MTIME_LO + 0x04
REG_END = REG_MTIME_HI + 0x04
def __init__(self, cpu):
super().__init__()
self.cpu = cpu
self.mtime_lo = 0
self.mtime_hi = 0
self.mtime_lo_updated = False
self.mtime_hi_updated = False
def read32(self, addr):
if addr == self.REG_MTIME_LO:
return self.cpu.mtime & 0xFFFFFFFF
elif addr == self.REG_MTIME_HI:
return self.cpu.mtime >> 32
if addr == self.REG_MTIMECMP_LO:
return self.cpu.mtimecmp & 0xFFFFFFFF
elif addr == self.REG_MTIMECMP_HI:
return self.cpu.mtimecmp >> 32
else:
raise MemoryAccessError(f"Invalid MMIO register read at 0x{addr:08X}")
def write32(self, addr, value):
if addr == self.REG_MTIMECMP_LO:
self.cpu.mtimecmp = (self.cpu.mtimecmp & 0xFFFFFFFF_00000000) | (value & 0xFFFFFFFF)
elif addr == self.REG_MTIMECMP_HI:
self.cpu.mtimecmp = (self.cpu.mtimecmp & 0x00000000_FFFFFFFF) | ((value & 0xFFFFFFFF) << 32)
elif addr == self.REG_MTIME_LO:
self.mtime_lo = value
self.mtime_lo_updated = True
elif addr == self.REG_MTIME_HI:
self.mtime_hi = value
self.mtime_hi_updated = True
else:
raise MemoryAccessError(f"Invalid MMIO register write at 0x{addr:08X}")
# atomic update of mtime after writing both high and low words
if self.mtime_lo_updated and self.mtime_hi_updated:
self.cpu.mtime = (self.mtime_hi << 32) | self.mtime_lo
self.mtime_lo_updated = False
self.mtime_hi_updated = False
# UART exposed as a host pseudo-terminal
class PtyUART(MMIOPeripheral):
def __init__(self, reg_base=0x1000_0000, logger=None):
super().__init__()
self.REG_BASE = reg_base
self.REG_TX = reg_base + 0x00
self.REG_RX = reg_base + 0x04
self.REG_END = reg_base + 0x08
self.logger = logger
# create master/slave pty pair
self.master_fd, self.slave_fd = os.openpty()
self.slave_name = os.ttyname(self.slave_fd)
if self.logger is not None:
self.logger.info(f"[UART] PTY created: {self.slave_name}")
# put slave in raw mode so the terminal program sees bytes verbatim
tty.setraw(self.slave_fd, when=tty.TCSANOW)
# non-blocking master for polling
fl = fcntl.fcntl(self.master_fd, fcntl.F_GETFL)
fcntl.fcntl(self.master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.rx_buf = [] # RX buffer
self.selector = selectors.DefaultSelector()
self.selector.register(self.master_fd, selectors.EVENT_READ)
# RX polling, to be called periodically by the emulator
def run(self):
for _key, _mask in self.selector.select(timeout=0):
try:
data = os.read(self.master_fd, 64)
self.rx_buf.extend(data)
except BlockingIOError:
pass
# Memory-mapped interface
def read32(self, addr):
if addr == self.REG_RX:
self.run()
if self.rx_buf:
return self.rx_buf.pop(0) # return first char in RX buffer
else:
return 1 << 31 # RX empty bit
elif addr == self.REG_TX:
return 0 # always ready to write
else:
raise MemoryAccessError(f"Invalid MMIO register read at 0x{addr:08X}")
def write32(self, addr, value):
if addr == self.REG_TX:
try:
os.write(self.master_fd, bytes([value & 0xFF]))
except BlockingIOError:
pass
else:
raise MemoryAccessError(f"Invalid MMIO register write at 0x{addr:08X}")
# Block device
class MMIOBlockDevice(MMIOPeripheral):
def __init__(self, reg_base=0x1001_0000, image_path=None, ram=None, block_size=512, size=1024, logger=None):
super().__init__()
self.REG_BASE = reg_base
self.REG_CMD = reg_base + 0x00 # 0 = read, 1 = write
self.REG_BLK = reg_base + 0x04 # block number
self.REG_PTR = reg_base + 0x08 # guest pointer to buffer
self.REG_CTRL = reg_base + 0x0C # write 1 to trigger
self.REG_STATUS = reg_base + 0x10 # 1 = ready
self.REG_END = reg_base + 0x14
self.logger = logger
self.block_size = block_size
self.num_blocks = size
self.image_path = image_path
self.ram = ram
self.cmd = 0
self.blk = 0
self.ptr = 0
self.status = 1
self.fd = None
self._open_or_create_image()
def _open_or_create_image(self):
total_bytes = self.num_blocks * self.block_size
if not os.path.exists(self.image_path):
if self.logger is not None:
self.logger.info(f"[BLOCK] Creating new block device image: {self.image_path}")
with open(self.image_path, "wb") as f:
f.write(b"\xFF" * total_bytes) # emulating the initial state of a blank flash
if self.logger is not None:
self.logger.info(f"[BLOCK] Opening block device image: {self.image_path}")
self.fd = open(self.image_path, "r+b") # read/write, binary
def read32(self, addr):
if addr == self.REG_CMD:
return self.cmd
elif addr == self.REG_BLK:
return self.blk
elif addr == self.REG_PTR:
return self.ptr
elif addr == self.REG_CTRL:
return 0
elif addr == self.REG_STATUS:
return self.status
else:
raise MemoryAccessError(f"Invalid MMIO register read at 0x{addr:08X}")
def write32(self, addr, value):
if addr == self.REG_CMD:
self.cmd = value
elif addr == self.REG_BLK:
self.blk = value
elif addr == self.REG_PTR:
self.ptr = value
elif addr == self.REG_CTRL:
if value == 1:
self._execute_cmd()
else:
raise MemoryAccessError(f"Invalid MMIO register write at 0x{addr:08X}")
def _execute_cmd(self):
offset = self.blk * self.block_size
if offset >= self.num_blocks * self.block_size:
self.status = 1
if self.logger is not None:
self.logger.warning(f"[BLOCK] Invalid block {self.blk}")
return
if self.cmd == 0: # READ
self.fd.seek(offset)
data = self.fd.read(self.block_size)
self.ram.store_binary(self.ptr, data)
#if self.logger is not None:
# self.logger.debug(f"[BLOCK] READ blk={self.blk} -> 0x{self.ptr:08x}")
elif self.cmd == 1: # WRITE
data = self.ram.load_binary(self.ptr, self.block_size)
self.fd.seek(offset)
self.fd.write(data)
self.fd.flush()
#if self.logger is not None:
# self.logger.debug(f"[BLOCK] WRITE blk={self.blk} <- 0x{self.ptr:08x}")
self.status = 1