Skip to content

Commit

Permalink
Merge pull request python-pillow#8061 from radarhere/type_hint
Browse files Browse the repository at this point in the history
  • Loading branch information
hugovk authored May 17, 2024
2 parents 0da83a1 + e419fd7 commit 8c7be25
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/PIL/DdsImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def _open(self) -> None:
else:
self.tile = [ImageFile._Tile("raw", extents, 0, rawmode or self.mode)]

def load_seek(self, pos):
def load_seek(self, pos: int) -> None:
pass


Expand Down
4 changes: 2 additions & 2 deletions src/PIL/EpsImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
gs_windows_binary = None


def has_ghostscript():
def has_ghostscript() -> bool:
global gs_binary, gs_windows_binary
if gs_binary is None:
if sys.platform.startswith("win"):
Expand Down Expand Up @@ -404,7 +404,7 @@ def load(self, scale=1, transparency=False):
self.tile = []
return Image.Image.load(self)

def load_seek(self, pos):
def load_seek(self, pos: int) -> None:
# we can't incrementally load, so force ImageFile.parser to
# use our custom load method by defining this method.
pass
Expand Down
2 changes: 1 addition & 1 deletion src/PIL/FtexImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _open(self) -> None:
self.fp.close()
self.fp = BytesIO(data)

def load_seek(self, pos):
def load_seek(self, pos: int) -> None:
pass


Expand Down
2 changes: 1 addition & 1 deletion src/PIL/IcoImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def load(self):

self.size = im.size

def load_seek(self, pos):
def load_seek(self, pos: int) -> None:
# Flag the ImageFile.Parser so that it
# just does all the decode at the end.
pass
Expand Down
4 changes: 2 additions & 2 deletions src/PIL/ImageFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ def load_end(self) -> None:
pass

# may be defined for contained formats
# def load_seek(self, pos):
# def load_seek(self, pos: int) -> None:
# pass

# may be defined for blocked formats (e.g. PNG)
# def load_read(self, read_bytes):
# def load_read(self, read_bytes: int) -> bytes:
# pass

def _seek_check(self, frame):
Expand Down
2 changes: 1 addition & 1 deletion src/PIL/JpegImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def _open(self):
msg = "no marker found"
raise SyntaxError(msg)

def load_read(self, read_bytes):
def load_read(self, read_bytes: int) -> bytes:
"""
internal: read more image data
For premature EOF and LOAD_TRUNCATED_IMAGES adds EOI marker
Expand Down
2 changes: 1 addition & 1 deletion src/PIL/MpoImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def _after_jpeg_open(self, mpheader=None):
# for now we can only handle reading and individual frame extraction
self.readonly = 1

def load_seek(self, pos):
def load_seek(self, pos: int) -> None:
self._fp.seek(pos)

def seek(self, frame: int) -> None:
Expand Down
74 changes: 41 additions & 33 deletions src/PIL/PngImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import warnings
import zlib
from enum import IntEnum
from typing import IO

from . import Image, ImageChops, ImageFile, ImagePalette, ImageSequence
from ._binary import i16be as i16
Expand Down Expand Up @@ -149,14 +150,15 @@ def _crc32(data, seed=0):


class ChunkStream:
def __init__(self, fp):
self.fp = fp
self.queue = []
def __init__(self, fp: IO[bytes]) -> None:
self.fp: IO[bytes] | None = fp
self.queue: list[tuple[bytes, int, int]] | None = []

def read(self):
def read(self) -> tuple[bytes, int, int]:
"""Fetch a new chunk. Returns header information."""
cid = None

assert self.fp is not None
if self.queue:
cid, pos, length = self.queue.pop()
self.fp.seek(pos)
Expand All @@ -173,7 +175,7 @@ def read(self):

return cid, pos, length

def __enter__(self):
def __enter__(self) -> ChunkStream:
return self

def __exit__(self, *args):
Expand All @@ -182,7 +184,8 @@ def __exit__(self, *args):
def close(self) -> None:
self.queue = self.fp = None

def push(self, cid, pos, length):
def push(self, cid: bytes, pos: int, length: int) -> None:
assert self.queue is not None
self.queue.append((cid, pos, length))

def call(self, cid, pos, length):
Expand All @@ -191,7 +194,7 @@ def call(self, cid, pos, length):
logger.debug("STREAM %r %s %s", cid, pos, length)
return getattr(self, f"chunk_{cid.decode('ascii')}")(pos, length)

def crc(self, cid, data):
def crc(self, cid: bytes, data: bytes) -> None:
"""Read and verify checksum"""

# Skip CRC checks for ancillary chunks if allowed to load truncated
Expand All @@ -201,6 +204,7 @@ def crc(self, cid, data):
self.crc_skip(cid, data)
return

assert self.fp is not None
try:
crc1 = _crc32(data, _crc32(cid))
crc2 = i32(self.fp.read(4))
Expand All @@ -211,12 +215,13 @@ def crc(self, cid, data):
msg = f"broken PNG file (incomplete checksum in {repr(cid)})"
raise SyntaxError(msg) from e

def crc_skip(self, cid, data):
def crc_skip(self, cid: bytes, data: bytes) -> None:
"""Read checksum"""

assert self.fp is not None
self.fp.read(4)

def verify(self, endchunk=b"IEND"):
def verify(self, endchunk: bytes = b"IEND") -> list[bytes]:
# Simple approach; just calculate checksum for all remaining
# blocks. Must be called directly after open.

Expand Down Expand Up @@ -361,7 +366,7 @@ def __init__(self, fp):

self.text_memory = 0

def check_text_memory(self, chunklen):
def check_text_memory(self, chunklen: int) -> None:
self.text_memory += chunklen
if self.text_memory > MAX_TEXT_MEMORY:
msg = (
Expand All @@ -382,7 +387,7 @@ def rewind(self) -> None:
self.im_tile = self.rewind_state["tile"]
self._seq_num = self.rewind_state["seq_num"]

def chunk_iCCP(self, pos, length):
def chunk_iCCP(self, pos: int, length: int) -> bytes:
# ICC profile
s = ImageFile._safe_read(self.fp, length)
# according to PNG spec, the iCCP chunk contains:
Expand All @@ -409,7 +414,7 @@ def chunk_iCCP(self, pos, length):
self.im_info["icc_profile"] = icc_profile
return s

def chunk_IHDR(self, pos, length):
def chunk_IHDR(self, pos: int, length: int) -> bytes:
# image header
s = ImageFile._safe_read(self.fp, length)
if length < 13:
Expand Down Expand Up @@ -446,14 +451,14 @@ def chunk_IEND(self, pos, length):
msg = "end of PNG image"
raise EOFError(msg)

def chunk_PLTE(self, pos, length):
def chunk_PLTE(self, pos: int, length: int) -> bytes:
# palette
s = ImageFile._safe_read(self.fp, length)
if self.im_mode == "P":
self.im_palette = "RGB", s
return s

def chunk_tRNS(self, pos, length):
def chunk_tRNS(self, pos: int, length: int) -> bytes:
# transparency
s = ImageFile._safe_read(self.fp, length)
if self.im_mode == "P":
Expand All @@ -473,13 +478,13 @@ def chunk_tRNS(self, pos, length):
self.im_info["transparency"] = i16(s), i16(s, 2), i16(s, 4)
return s

def chunk_gAMA(self, pos, length):
def chunk_gAMA(self, pos: int, length: int) -> bytes:
# gamma setting
s = ImageFile._safe_read(self.fp, length)
self.im_info["gamma"] = i32(s) / 100000.0
return s

def chunk_cHRM(self, pos, length):
def chunk_cHRM(self, pos: int, length: int) -> bytes:
# chromaticity, 8 unsigned ints, actual value is scaled by 100,000
# WP x,y, Red x,y, Green x,y Blue x,y

Expand All @@ -488,7 +493,7 @@ def chunk_cHRM(self, pos, length):
self.im_info["chromaticity"] = tuple(elt / 100000.0 for elt in raw_vals)
return s

def chunk_sRGB(self, pos, length):
def chunk_sRGB(self, pos: int, length: int) -> bytes:
# srgb rendering intent, 1 byte
# 0 perceptual
# 1 relative colorimetric
Expand All @@ -504,7 +509,7 @@ def chunk_sRGB(self, pos, length):
self.im_info["srgb"] = s[0]
return s

def chunk_pHYs(self, pos, length):
def chunk_pHYs(self, pos: int, length: int) -> bytes:
# pixels per unit
s = ImageFile._safe_read(self.fp, length)
if length < 9:
Expand All @@ -521,7 +526,7 @@ def chunk_pHYs(self, pos, length):
self.im_info["aspect"] = px, py
return s

def chunk_tEXt(self, pos, length):
def chunk_tEXt(self, pos: int, length: int) -> bytes:
# text
s = ImageFile._safe_read(self.fp, length)
try:
Expand All @@ -540,7 +545,7 @@ def chunk_tEXt(self, pos, length):

return s

def chunk_zTXt(self, pos, length):
def chunk_zTXt(self, pos: int, length: int) -> bytes:
# compressed text
s = ImageFile._safe_read(self.fp, length)
try:
Expand Down Expand Up @@ -574,7 +579,7 @@ def chunk_zTXt(self, pos, length):

return s

def chunk_iTXt(self, pos, length):
def chunk_iTXt(self, pos: int, length: int) -> bytes:
# international text
r = s = ImageFile._safe_read(self.fp, length)
try:
Expand Down Expand Up @@ -614,13 +619,13 @@ def chunk_iTXt(self, pos, length):

return s

def chunk_eXIf(self, pos, length):
def chunk_eXIf(self, pos: int, length: int) -> bytes:
s = ImageFile._safe_read(self.fp, length)
self.im_info["exif"] = b"Exif\x00\x00" + s
return s

# APNG chunks
def chunk_acTL(self, pos, length):
def chunk_acTL(self, pos: int, length: int) -> bytes:
s = ImageFile._safe_read(self.fp, length)
if length < 8:
if ImageFile.LOAD_TRUNCATED_IMAGES:
Expand All @@ -640,7 +645,7 @@ def chunk_acTL(self, pos, length):
self.im_custom_mimetype = "image/apng"
return s

def chunk_fcTL(self, pos, length):
def chunk_fcTL(self, pos: int, length: int) -> bytes:
s = ImageFile._safe_read(self.fp, length)
if length < 26:
if ImageFile.LOAD_TRUNCATED_IMAGES:
Expand Down Expand Up @@ -669,7 +674,7 @@ def chunk_fcTL(self, pos, length):
self.im_info["blend"] = s[25]
return s

def chunk_fdAT(self, pos, length):
def chunk_fdAT(self, pos: int, length: int) -> bytes:
if length < 4:
if ImageFile.LOAD_TRUNCATED_IMAGES:
s = ImageFile._safe_read(self.fp, length)
Expand Down Expand Up @@ -701,7 +706,7 @@ class PngImageFile(ImageFile.ImageFile):
format = "PNG"
format_description = "Portable network graphics"

def _open(self):
def _open(self) -> None:
if not _accept(self.fp.read(8)):
msg = "not a PNG file"
raise SyntaxError(msg)
Expand All @@ -711,8 +716,8 @@ def _open(self):
#
# Parse headers up to the first IDAT or fDAT chunk

self.private_chunks = []
self.png = PngStream(self.fp)
self.private_chunks: list[tuple[bytes, bytes] | tuple[bytes, bytes, bool]] = []
self.png: PngStream | None = PngStream(self.fp)

while True:
#
Expand Down Expand Up @@ -793,6 +798,7 @@ def verify(self) -> None:
# back up to beginning of IDAT block
self.fp.seek(self.tile[0][2] - 8)

assert self.png is not None
self.png.verify()
self.png.close()

Expand Down Expand Up @@ -921,9 +927,10 @@ def load_prepare(self) -> None:
self.__idat = self.__prepare_idat # used by load_read()
ImageFile.ImageFile.load_prepare(self)

def load_read(self, read_bytes):
def load_read(self, read_bytes: int) -> bytes:
"""internal: read more image data"""

assert self.png is not None
while self.__idat == 0:
# end of chunk, skip forward to next one

Expand Down Expand Up @@ -956,6 +963,7 @@ def load_read(self, read_bytes):

def load_end(self) -> None:
"""internal: finished reading image data"""
assert self.png is not None
if self.__idat != 0:
self.fp.read(self.__idat)
while True:
Expand Down Expand Up @@ -1079,7 +1087,7 @@ def __init__(self, fp, chunk):
self.fp = fp
self.chunk = chunk

def write(self, data):
def write(self, data: bytes) -> None:
self.chunk(self.fp, b"IDAT", data)


Expand All @@ -1091,7 +1099,7 @@ def __init__(self, fp, chunk, seq_num):
self.chunk = chunk
self.seq_num = seq_num

def write(self, data):
def write(self, data: bytes) -> None:
self.chunk(self.fp, b"fdAT", o32(self.seq_num), data)
self.seq_num += 1

Expand Down Expand Up @@ -1436,10 +1444,10 @@ def getchunks(im, **params):
class collector:
data = []

def write(self, data):
def write(self, data: bytes) -> None:
pass

def append(self, chunk):
def append(self, chunk: bytes) -> None:
self.data.append(chunk)

def append(fp, cid, *data):
Expand Down
2 changes: 1 addition & 1 deletion src/PIL/WebPImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def load(self):

return super().load()

def load_seek(self, pos):
def load_seek(self, pos: int) -> None:
pass

def tell(self) -> int:
Expand Down
Loading

0 comments on commit 8c7be25

Please sign in to comment.