-
Notifications
You must be signed in to change notification settings - Fork 98
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
IPF: Rename to CAPS and support both IPF and CTRaw (requires library …
…v5.x)
- Loading branch information
Showing
3 changed files
with
150 additions
and
86 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
# greaseweazle/image/ipf.py | ||
# greaseweazle/image/caps.py | ||
# | ||
# Written & released by Keir Fraser <[email protected]> | ||
# | ||
# This is free and unencumbered software released into the public domain. | ||
# See the file COPYING for more details, or visit <http://unlicense.org>. | ||
|
||
from __future__ import annotations | ||
from typing import cast, List, Tuple, Optional, Generator | ||
|
||
import os, sys | ||
|
@@ -104,6 +105,133 @@ class DI_LOCK: | |
|
||
RangeList = List[Tuple[int,int]] | ||
|
||
class CAPSTrackInfo(CapsTrackInfoT2): | ||
|
||
class NoTrack(Exception): | ||
pass | ||
|
||
def __init__(self, image: CAPS, cyl: int, head: int) -> None: | ||
pi = image.pi | ||
if head < pi.minhead or head > pi.maxhead: | ||
raise CAPSTrackInfo.NoTrack() | ||
if cyl < pi.mincylinder or cyl > pi.maxcylinder: | ||
raise CAPSTrackInfo.NoTrack() | ||
|
||
ti = self | ||
CapsTrackInfoT2.__init__(ti, 2) | ||
res = image.lib.CAPSLockTrack(ct.byref(ti), image.iid, | ||
cyl, head, DI_LOCK.def_flags) | ||
error.check(res == 0, "Could not lock CAPS track %d.%d" % (cyl, head)) | ||
|
||
if not ti.trackbuf: | ||
raise CAPSTrackInfo.NoTrack() # unformatted/empty | ||
|
||
carray_type = ct.c_ubyte * ((ti.tracklen+7)//8) | ||
carray = carray_type.from_address( | ||
ct.addressof(ti.trackbuf.contents)) | ||
bits = bitarray(endian='big') | ||
bits.frombytes(bytes(carray)) | ||
bits = bits = bits[:ti.tracklen] | ||
|
||
ticks: Optional[List[float]] = None | ||
if ti.timebuf: | ||
carray_type = ct.c_uint * ti.timelen | ||
carray = carray_type.from_address( | ||
ct.addressof(ti.timebuf.contents)) | ||
# Unpack the per-byte timing info into per-bitcell | ||
ticks = [] | ||
for i in carray: | ||
for j in range(8): | ||
ticks.append(i) | ||
# Pad the timing info with normal cell lengths as necessary | ||
for j in range(len(carray)*8, ti.tracklen): | ||
ticks.append(1000) | ||
# Clip the timing info, if necessary. | ||
ticks = ticks = ticks[:ti.tracklen] | ||
|
||
ti.bits = bits | ||
ti.ticks = ticks | ||
|
||
# We don't really have access to the bitrate. It depends on RPM. | ||
# So we assume a rotation rate of 300 RPM (5 rev/sec). | ||
ti.rpm = 300 | ||
|
||
|
||
class CAPS(Image): | ||
|
||
iid: int | ||
pi: CapsImageInfo | ||
imagetype: str | ||
|
||
read_only = True | ||
|
||
def __init__(self) -> None: | ||
self.lib = get_libcaps() | ||
|
||
def __del__(self) -> None: | ||
try: | ||
self.lib.CAPSUnlockAllTracks(self.iid) | ||
self.lib.CAPSUnlockImage(self.iid) | ||
self.lib.CAPSRemImage(self.iid) | ||
del(self.iid) | ||
except AttributeError: | ||
pass | ||
|
||
def __str__(self) -> str: | ||
raise NotImplementedError | ||
|
||
def get_track(self, cyl: int, head: int) -> Optional[MasterTrack]: | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def from_file(cls, name: str, _fmt) -> Image: | ||
|
||
caps = cls() | ||
errprefix = f'CAPS: {cls.imagetype}' | ||
|
||
caps.iid = caps.lib.CAPSAddImage() | ||
error.check(caps.iid >= 0, | ||
f"{errprefix}: Could not create image container") | ||
cname = ct.c_char_p(name.encode()) | ||
res = caps.lib.CAPSLockImage(caps.iid, cname) | ||
error.check(res == 0, | ||
f"{errprefix}: Could not open image '{name}'") | ||
res = caps.lib.CAPSLoadImage(caps.iid, DI_LOCK.def_flags) | ||
error.check(res == 0, | ||
f"{errprefix}: Could not load image '%s'" % name) | ||
caps.pi = CapsImageInfo() | ||
res = caps.lib.CAPSGetImageInfo(ct.byref(caps.pi), caps.iid) | ||
error.check(res == 0, | ||
f"{errprefix}: Could not get info for image '{name}'") | ||
print(caps) | ||
|
||
return caps | ||
|
||
|
||
class CTRaw(CAPS): | ||
|
||
imagetype = 'CTRaw' | ||
|
||
def __str__(self) -> str: | ||
pi = self.pi | ||
s = "CTRaw Image File:" | ||
s += ("\n Cyls: %d-%d Heads: %d-%d" | ||
% (pi.mincylinder, pi.maxcylinder, pi.minhead, pi.maxhead)) | ||
return s | ||
|
||
def get_track(self, cyl: int, head: int) -> Optional[MasterTrack]: | ||
|
||
try: | ||
ti = CAPSTrackInfo(self, cyl, head) | ||
except CAPSTrackInfo.NoTrack: | ||
return None | ||
|
||
return MasterTrack( | ||
bits = ti.bits, | ||
time_per_rev = 60/ti.rpm, | ||
bit_ticks = ti.ticks) | ||
|
||
|
||
class IPFTrack(MasterTrack): | ||
|
||
verify_revs: float = 2 | ||
|
@@ -153,29 +281,18 @@ def verify_track(self, flux: Flux) -> bool: | |
return False | ||
return True | ||
|
||
class IPF(Image): | ||
|
||
iid: int | ||
pi: CapsImageInfo | ||
|
||
read_only = True | ||
|
||
def __init__(self) -> None: | ||
self.lib = get_libcaps() | ||
class IPF(CAPS): | ||
|
||
def __del__(self) -> None: | ||
try: | ||
self.lib.CAPSUnlockAllTracks(self.iid) | ||
self.lib.CAPSUnlockImage(self.iid) | ||
self.lib.CAPSRemImage(self.iid) | ||
del(self.iid) | ||
except AttributeError: | ||
pass | ||
imagetype = 'IPF' | ||
|
||
def __str__(self) -> str: | ||
pi = self.pi | ||
s = "IPF Image File:" | ||
s += "\n SPS ID: %04d (rev %d)" % (pi.release, pi.revision) | ||
if pi.release == 0x843265bb: # disk-utilities:IPF_ID | ||
s += "\n SPS ID: None (https://github.com/keirf/disk-utilities)" | ||
else: | ||
s += "\n SPS ID: %04d (rev %d)" % (pi.release, pi.revision) | ||
s += "\n Platform: " | ||
nr_platforms = 0 | ||
for p in pi.platform: | ||
|
@@ -192,46 +309,12 @@ def __str__(self) -> str: | |
% (pi.mincylinder, pi.maxcylinder, pi.minhead, pi.maxhead)) | ||
return s | ||
|
||
@classmethod | ||
def from_file(cls, name: str, _fmt) -> Image: | ||
|
||
ipf = cls() | ||
|
||
ipf.iid = ipf.lib.CAPSAddImage() | ||
error.check(ipf.iid >= 0, "Could not create IPF image container") | ||
cname = ct.c_char_p(name.encode()) | ||
res = ipf.lib.CAPSLockImage(ipf.iid, cname) | ||
error.check(res == 0, "Could not open IPF image '%s'" % name) | ||
res = ipf.lib.CAPSLoadImage(ipf.iid, DI_LOCK.def_flags) | ||
error.check(res == 0, "Could not load IPF image '%s'" % name) | ||
ipf.pi = CapsImageInfo() | ||
res = ipf.lib.CAPSGetImageInfo(ct.byref(ipf.pi), ipf.iid) | ||
error.check(res == 0, "Could not get info for IPF '%s'" % name) | ||
print(ipf) | ||
|
||
return ipf | ||
|
||
|
||
def get_track(self, cyl: int, head: int) -> Optional[MasterTrack]: | ||
pi = self.pi | ||
if head < pi.minhead or head > pi.maxhead: | ||
return None | ||
if cyl < pi.mincylinder or cyl > pi.maxcylinder: | ||
return None | ||
|
||
ti = CapsTrackInfoT2(2) | ||
res = self.lib.CAPSLockTrack(ct.byref(ti), self.iid, | ||
cyl, head, DI_LOCK.def_flags) | ||
error.check(res == 0, "Could not lock IPF track %d.%d" % (cyl, head)) | ||
|
||
if not ti.trackbuf: | ||
return None # unformatted/empty | ||
carray_type = ct.c_ubyte * ((ti.tracklen+7)//8) | ||
carray = carray_type.from_address( | ||
ct.addressof(ti.trackbuf.contents)) | ||
trackbuf = bitarray(endian='big') | ||
trackbuf.frombytes(bytes(carray)) | ||
trackbuf = trackbuf[:ti.tracklen] | ||
try: | ||
ti = CAPSTrackInfo(self, cyl, head) | ||
except CAPSTrackInfo.NoTrack: | ||
return None | ||
|
||
data = [] | ||
for i in range(ti.sectorcnt): | ||
|
@@ -252,36 +335,16 @@ def get_track(self, cyl: int, head: int) -> Optional[MasterTrack]: | |
# Adjust the range start to be splice- rather than index-relative | ||
weak.append(((wi.start - ti.overlap) % ti.tracklen, wi.size)) | ||
|
||
timebuf = None | ||
if ti.timebuf: | ||
carray_type = ct.c_uint * ti.timelen | ||
carray = carray_type.from_address( | ||
ct.addressof(ti.timebuf.contents)) | ||
# Unpack the per-byte timing info into per-bitcell | ||
timebuf = [] | ||
for i in carray: | ||
for j in range(8): | ||
timebuf.append(i) | ||
# Pad the timing info with normal cell lengths as necessary | ||
for j in range(len(carray)*8, ti.tracklen): | ||
timebuf.append(1000) | ||
# Clip the timing info, if necessary. | ||
timebuf = timebuf[:ti.tracklen] | ||
|
||
# Rotate the track to start at the splice rather than the index. | ||
if ti.overlap: | ||
trackbuf = trackbuf[ti.overlap:] + trackbuf[:ti.overlap] | ||
if timebuf: | ||
timebuf = timebuf[ti.overlap:] + timebuf[:ti.overlap] | ||
|
||
# We don't really have access to the bitrate. It depends on RPM. | ||
# So we assume a rotation rate of 300 RPM (5 rev/sec). | ||
rpm = 300 | ||
ti.bits = ti.bits[ti.overlap:] + ti.bits[:ti.overlap] | ||
if ti.ticks: | ||
ti.ticks = ti.ticks[ti.overlap:] + ti.ticks[:ti.overlap] | ||
|
||
track = IPFTrack( | ||
bits = trackbuf, | ||
time_per_rev = 60/rpm, | ||
bit_ticks = cast(List[float], timebuf), # mypy | ||
bits = ti.bits, | ||
time_per_rev = 60/ti.rpm, | ||
bit_ticks = ti.ticks, | ||
splice = ti.overlap, | ||
weak = weak | ||
) | ||
|
@@ -328,13 +391,13 @@ def open_libcaps(): | |
pass | ||
|
||
error.check("lib" in locals(), """\ | ||
Could not find SPS/CAPS IPF decode library | ||
Could not find SPS/CAPS library | ||
For installation instructions please read the wiki: | ||
<https://github.com/keirf/greaseweazle/wiki/IPF-Images>""") | ||
|
||
# We have opened the library. Now initialise it. | ||
res = lib.CAPSInit() | ||
error.check(res == 0, "Failure initialising IPF library '%s'" % name) | ||
error.check(res == 0, "Failure initialising CAPS/SPS library '%s'" % name) | ||
|
||
return lib | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters