Skip to content

Commit

Permalink
gw convert: Initial support for hard-sectored images
Browse files Browse the repository at this point in the history
New option: --hard-sectors

Converts input index pulses into hard sectors. Currently this is
useful:
(a) to extract the true index pulses
(b) to create hard-sectored HFEv3 image files
  • Loading branch information
keirf committed Jul 6, 2024
1 parent 01ca2c3 commit 4d5a8c8
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/greaseweazle/codec/ibm/ibm.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def mfm_decode_raw(raw: PLLTrack) -> List[TrackArea]:

# Convert to offsets within track
areas.sort(key=lambda x:x.start)
index = iter(raw.revolutions)
index = iter([x.nr_bits for x in raw.revolutions])
p, n = 0, next(index)
for a in areas:
if a.start >= n:
Expand Down Expand Up @@ -593,7 +593,7 @@ def fm_decode_raw(raw: PLLTrack,

# Convert to offsets within track
areas.sort(key=lambda x:x.start)
index = iter(raw.revolutions)
index = iter([x.nr_bits for x in raw.revolutions])
p, n = 0, next(index)
for a in areas:
if a.start >= n:
Expand Down
50 changes: 46 additions & 4 deletions src/greaseweazle/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self,
sample_freq: float,
index_cued = True) -> None:
self.index_list = index_list
self.sector_list: Optional[List[List[float]]] = None
self.list = flux_list
self.sample_freq = sample_freq
self.splice: Optional[float] = None
Expand All @@ -38,10 +39,12 @@ def __str__(self) -> str:
s = "\nFlux: %.2f MHz" % (self.sample_freq*1e-6)
s += ("\n Total: %u samples, %.2fms\n"
% (len(self.list), sum(self.list)*1000/self.sample_freq))
rev = 0
for t in self.index_list:
for rev, t in enumerate(self.index_list):
s += " Revolution %u: %.2fms\n" % (rev, t*1000/self.sample_freq)
rev += 1
if self.sector_list:
for sec, t in enumerate(self.sector_list[rev]):
s += (" Sector %u: %.2fms\n"
% (sec, t*1000/self.sample_freq))
return s[:-1]


Expand All @@ -50,6 +53,38 @@ def summary_string(self) -> str:
% (len(self.list), sum(self.list)*1000/self.sample_freq))


def identify_hard_sectors(self) -> None:
assert not self.sector_list
error.check(len(self.index_list) > 3,
"Not enough index marks for a hard-sectored track")
self.cue_at_index()
x = [self.index_list[0], self.index_list[2]]
x.sort()
thresh = x[1] * 3 / 4
ticks_to_index: float = 0
short_ticks: float = 0
sectors: List[float] = []
index_list = self.index_list
self.index_list = []
self.sector_list = []
for t in index_list:
if t < thresh:
short_ticks += t
else:
if short_ticks != 0:
sectors.append(short_ticks)
ticks_to_index += short_ticks
self.index_list.append(ticks_to_index)
self.sector_list.append(sectors)
sectors = []
short_ticks = ticks_to_index = 0
ticks_to_index += t
sectors.append(t)
self.index_cued = (
(len(self.index_list) >= 2) and
(len(self.sector_list[0]) == len(self.sector_list[1])))


def append(self, flux: Flux) -> None:
# Scale the new flux if required, to match existing sample frequency.
# This will result in floating-point flux values.
Expand All @@ -64,6 +99,8 @@ def append(self, flux: Flux) -> None:
rev0 = i_list[0] + sum(self.list) - sum(self.index_list)
self.index_list += [rev0] + i_list[1:]
self.list += f_list
# TODO: Work with hard-sectored disks
self.sector_list = None


def cue_at_index(self) -> None:
Expand All @@ -83,6 +120,8 @@ def cue_at_index(self) -> None:
self.list = []
self.index_list = self.index_list[1:]
self.index_cued = True
if self.sector_list:
self.sector_list = self.sector_list[1:]


def set_nr_revs(self, revs:int) -> None:
Expand All @@ -93,6 +132,8 @@ def set_nr_revs(self, revs:int) -> None:

if len(self.index_list) > revs:
self.index_list = self.index_list[:revs]
if self.sector_list:
self.sector_list = self.sector_list[:revs]
to_index = sum(self.index_list)
for i in range(len(self.list)):
to_index -= self.list[i]
Expand All @@ -113,7 +154,8 @@ def set_nr_revs(self, revs:int) -> None:
if self.list:
self.list = l + [to_index + self.list[0]] + self.list[1:]
self.index_list = self.index_list[:nr] + self.index_list

if self.sector_list:
self.sector_list = self.sector_list[:nr] + self.sector_list

def flux_for_writeout(self, cue_at_index) -> WriteoutFlux:

Expand Down
61 changes: 51 additions & 10 deletions src/greaseweazle/image/hfe.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ class HFEOpts(ImageOpts):
"""

w_settings = [ 'bitrate', 'version', 'interface', 'encoding',
'double_step' ]
'double_step', 'uniform' ]

def __init__(self) -> None:
self._bitrate: Optional[int] = None
self._version = 1
self._interface = 0xff
self._encoding = 0xff
self.double_step = False
self.uniform = False

@property
def bitrate(self) -> Optional[int]:
Expand Down Expand Up @@ -262,9 +263,12 @@ def emit_track(self, cyl: int, side: int, track) -> None:
flux.cue_at_index()
raw = PLLTrack(clock = 5e-4 / self.opts.bitrate, data = flux)
bits, bit_ticks = raw.get_revolution(0)
if self.opts.uniform:
bit_ticks = None
mt = MasterTrack(
bits = bits, time_per_rev = flux.time_per_rev,
bit_ticks = bit_ticks)
bit_ticks = bit_ticks,
hardsector_bits = raw.revolutions[0].hardsector_bits)
self.to_track[cyl,side] = HFETrack(mt)


Expand Down Expand Up @@ -417,6 +421,8 @@ def add_weak(weak: List[HFEv3_Range], pos: int, nr: int) -> None:
bits.frombytes(bytes(1))
ticks += [rate]*8
else:
error.check((x & 0xf0) != 0xf0,
f'T{cyl}.{head}: HFEv3: unrecognised opcode {x:02x}')
bits.frombytes(bytes([x]))
ticks += [rate]*8

Expand All @@ -443,10 +449,12 @@ def add_weak(weak: List[HFEv3_Range], pos: int, nr: int) -> None:
# HFEv3_Chunk: Represents a consecutive range of input bitcells with identical
# bitcell timings and weakness property.
class HFEv3_Chunk:
def __init__(self, nbits: int, time_per_bit: float, is_random: bool):
def __init__(self, nbits: int, time_per_bit: float, is_random: bool,
emit_index: bool):
self.nbits = nbits
self.time_per_bit = time_per_bit
self.is_random = is_random
self.emit_index = emit_index
def __str__(self) -> str:
s = "%d bits, %.4fus per bit" % (self.nbits, self.time_per_bit*1e6)
if self.is_random:
Expand All @@ -461,7 +469,16 @@ def __init__(self, track: MasterTrack) -> None:
self.ticks_per_rev = sum(track.bit_ticks)
else:
self.ticks_per_rev = len(track.bits)
self.time_per_tick = self.track.time_per_rev / self.ticks_per_rev
self.time_per_tick = track.time_per_rev / self.ticks_per_rev
# index_positions: Bit positions at which to insert index pulses
index_positions = track.hardsector_bits
if not index_positions:
index_positions = [ 0 ]
else:
index_positions[-1] //= 2
index_positions = list(it.accumulate([ 0 ] + index_positions))
index_positions.append(len(track.bits))
self.index_positions = index_positions
# tick_iter: An iterator over ranges of consecutive bitcells with
# identical ticks per bitcell.
self.ticks: List[Tuple[int,int,float]]
Expand All @@ -483,7 +500,7 @@ def __init__(self, track: MasterTrack) -> None:
it.chain(track.weak, [(len(track.bits),0)]))
self.weak_cur = next(self.weak_iter)
# out: The raw HFEv3 output bytestream that we are generating.
self.out = bytearray([HFEv3_Op.Index])
self.out = bytearray()
# time: Input track current time, in seconds.
# hfe_time: HFE track output current time, in seconds.
self.time = self.hfe_time = 0.0
Expand All @@ -492,18 +509,26 @@ def __init__(self, track: MasterTrack) -> None:
# rate: Current HFE output rate, as set by HFEv3_Op.Bitrate.
# rate_change_pos: Byte position we last updated rate in HFE output.
self.rate, self.rate_change_pos = -1, 0
# sec: Which sector are we emitting
self.sec = 0
# chunk: Chunk of input bitcells currently being processed.
self.chunk = self.next_chunk()

def next_chunk(self) -> Optional[HFEv3_Chunk]:
# All done? Then return nothing.
if self.pos >= len(self.track.bits):
return None
# Position to next sector pulse
emit_index = False
while (n := self.index_positions[self.sec] - self.pos) <= 0:
assert n == 0 and not emit_index
self.sec += 1
emit_index = True
# Position among bitcells with identical timing.
while self.pos >= self.tick_cur.e:
self.tick_cur = next(self.tick_iter)
assert self.pos >= self.tick_cur.s
n = self.tick_cur.e - self.pos
n = min(n, self.tick_cur.e - self.pos)
# Position relative to weak ranges.
while self.pos >= self.weak_cur.e:
self.weak_cur = next(self.weak_iter)
Expand All @@ -514,7 +539,7 @@ def next_chunk(self) -> Optional[HFEv3_Chunk]:
n = min(n, self.weak_cur.e - self.pos)
is_random = True
return HFEv3_Chunk(n, self.time_per_tick * self.tick_cur.val,
is_random)
is_random, emit_index)

def increment_position(self, n: int) -> None:
c = self.chunk
Expand Down Expand Up @@ -601,6 +626,11 @@ def hfev3_get_image(hfe: HFE) -> bytes:
rate_distance = 64 # byte-cells
tpb = c.time_per_bit + (x.time - x.hfe_time) / (rate_distance*8)

if c.emit_index:
c.emit_index = False
x.out.append(HFEv3_Op.Index)
diff -= 8

# Do a rate change if the rate has significantly changed or,
# for a change of +/-1, if we haven't changed rate in a while.
rate = round(tpb * 36e6)
Expand All @@ -625,7 +655,16 @@ def hfev3_get_image(hfe: HFE) -> bytes:
if c.is_random:
x.out.append(HFEv3_Op.Rand)
else:
x.out.append(x.track.bits[x.pos:x.pos+n].tobytes()[0]>>(8-n))
# Extract next bitcells into a stream-ready byte.
b = x.track.bits[x.pos:x.pos+n].tobytes()[0] >> (8-n)
# If the byte looks like an opcode, skip a bit.
if (b & 0xf0) == 0xf0:
n = 7
b >>= 1
x.out.append(HFEv3_Op.SkipBits)
x.out.append(8 - n)
# Emit the fixed-up byte.
x.out.append(b)

# Update tallies.
x.increment_position(n)
Expand All @@ -645,7 +684,9 @@ def hfev3_get_image(hfe: HFE) -> bytes:
'''\
HFEv3: Track too long to fit in image!
Are you trying to convert raw flux (SCP, KF, etc)?
If so: You can't (yet). If not: Report a bug.''')
If so: Try specifying 'uniform': eg. name.hfe::version=3:uniform
(will break variable-rate copy protections such as Copylock)
If not: Report a bug.''')

nr_blocks = (nr_bytes + 0xff) // 0x100
for x in s:
Expand Down
9 changes: 9 additions & 0 deletions src/greaseweazle/tools/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def process_input_track(
if track is None:
return None

if args.hard_sectors:
track = track.flux()
track.identify_hard_sectors()
assert track.sector_list is not None # mypy
print('T%u.%u: Converted to %u hard sectors'
% (cyl, head, len(track.sector_list[-1])))

if args.adjust_speed is not None:
if isinstance(track, codec.Codec):
track = track.master_track()
Expand Down Expand Up @@ -123,6 +130,8 @@ def main(argv) -> None:
help="do not overwrite an existing file")
parser.add_argument("--pll", type=track.PLL, metavar="PLLSPEC",
help="manual PLL parameter override")
parser.add_argument("--hard-sectors", action="store_true",
help="convert index positions to hard sectors")
parser.add_argument("in_file", help="input filename")
parser.add_argument("out_file", help="output filename")
parser.description = description
Expand Down
Loading

0 comments on commit 4d5a8c8

Please sign in to comment.