forked from Foundation-Devices/passport-firmware
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsffile.py
233 lines (179 loc) · 6.27 KB
/
sffile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# SPDX-FileCopyrightText: 2020 Foundation Devices, Inc. <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later
#
# SPDX-FileCopyrightText: 2018 Coinkite, Inc. <coldcardwallet.com>
# SPDX-License-Identifier: GPL-3.0-only
#
# (c) Copyright 2018 by Coinkite Inc. This file is part of Coldcard <coldcardwallet.com>
# and is covered by GPLv3 license found in COPYING.
#
# sffile.py - file-like objects stored in SPI Flash
#
# - implements stream IO protoccol
# - does erasing for you
# - random read, sequential write
# - only a few of these are possible
# - the offset is the file name
# - last 64k of memory reserved for settings
#
import trezorcrypto
from uasyncio import sleep_ms
from uio import BytesIO
from common import system
blksize = const(65536)
def PADOUT(n):
# rounds up
return (n + blksize - 1) & ~(blksize - 1)
class SFFile:
def __init__(self, start, length=0, max_size=1, message=None, pre_erased=False):
if not pre_erased:
assert start % blksize == 0 # 'misaligned'
self.start = start
self.pos = 0
self.length = length # byte-wise length
self.message = message
if max_size != None:
self.max_size = PADOUT(max_size) if not pre_erased else max_size
self.readonly = False
self.checksum = trezorcrypto.sha256()
else:
self.readonly = True
from common import sf
self.sf = sf
def tell(self):
# where are we?
return self.pos
def is_eof(self):
# we are positioned at end of file
return (self.pos >= self.length)
def seek(self, offset, whence=0):
# whence:
# 0 -- start of stream (the default); offset should be zero or positive
# 1 -- current stream position; offset may be negative
# 2 -- end of stream; offset is usually negative
# except no clipping; force their math to be right.
if whence == 0:
pass
elif whence == 1:
# move relative
offset = self.pos + offset
elif whence == 2:
offset = self.length + offset
else:
raise ValueError(whence)
assert 0 <= offset <= self.length # "bad offset"
self.pos = offset
async def erase(self):
# must be used by caller before writing any bytes
assert not self.readonly
assert self.length == 0 # 'already wrote?'
for i in range(0, self.max_size, blksize):
self.sf.block_erase(self.start + i)
if i and self.message:
from common import dis
system.progress_bar((i*100)//self.max_size)
# expect block erase to take up to 2 seconds
while self.sf.is_busy():
await sleep_ms(50)
def __enter__(self):
if self.message:
from common import dis
dis.fullscreen(self.message)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.message:
from common import dis
system.progress_bar(100)
return False
def wait_writable(self):
# TODO: Could add some timeout handling here.
while self.sf.is_busy():
pass
def write(self, b):
# immediate write, no buffering
assert not self.readonly
assert self.pos == self.length # "can only append"
# "past end: %r" % [self.pos, len(b), self.max_size]
assert self.pos + len(b) <= self.max_size
left = len(b)
# must perform page-aligned (256) writes, but can start
# anywhere in the page, and can write just one byte
sofar = 0
while left:
if (self.pos + sofar) % 256 != 0:
# start is unaligned, do a partial write to align
# , (sofar, (self.pos+sofar)) # can only happen on first page
assert sofar == 0
runt = min(left, 256 - (self.pos % 256))
here = memoryview(b)[0:runt]
assert len(here) == runt
else:
# write full pages, or final runt
here = memoryview(b)[sofar:sofar+256]
assert 1 <= len(here) <= 256
self.wait_writable()
self.sf.write(self.start + self.pos + sofar, here)
left -= len(here)
sofar += len(here)
self.checksum.update(here)
assert left >= 0
assert sofar == len(b)
self.pos += sofar
self.length = self.pos
return sofar
def read(self, ll=None):
if ll == 0:
return b''
elif ll is None:
ll = self.length - self.pos
else:
ll = min(ll, self.length - self.pos)
if ll <= 0:
# at EOF
return b''
rv = bytearray(ll)
self.sf.read(self.start + self.pos, rv)
self.pos += ll
if self.message and ll > 1:
from common import dis
system.progress_bar((self.pos * 100) // self.length)
# altho tempting to return a bytearray (which we already have) many
# callers expect return to be bytes and have those methods, like "find"
return bytes(rv)
def readinto(self, b):
# limitation: this will read past end of file, but not tell the caller
actual = min(self.length - self.pos, len(b))
if actual <= 0:
return 0
self.sf.read(self.start + self.pos, b)
self.pos += actual
return actual
def close(self):
pass
class SizerFile(SFFile):
# looks like a file, but forgets everything except file position
# - used to measure length of an output
def __init__(self):
self.pos = self.length = 0
async def erase(self):
return
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
def wait_writable(self):
return
def write(self, b):
# immediate write, no buffering
assert self.pos == self.length # "can only append"
here = len(b)
self.pos += here
self.length += here
return here
def read(self, ll=None):
raise ValueError
def readinto(self, b):
raise ValueError
def close(self):
pass
# EOF