Skip to content

Commit

Permalink
fix: Reach 100% test coverage and fix edge cases
Browse files Browse the repository at this point in the history
In detail:
fix: Fail on parts without header/data separator.
fix: Fail on completely empty parts.
fix: Fail on streams without end boundary.
fix: Do not fail if there is junk after end boundary.
  • Loading branch information
defnull committed Aug 26, 2024
1 parent c97df75 commit 53cf784
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 33 deletions.
49 changes: 23 additions & 26 deletions multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ class MultiDict(DictMixin):

def __init__(self, *args, **kwargs):
self.dict = {}
for k, v in args:
self[k] = v
for arg in args:
if hasattr(arg, 'items'):
for k, v in arg.items():
self[k] = v
else:
for k, v in arg:
self[k] = v
for k, v in kwargs.items():
self[k] = v

Expand Down Expand Up @@ -284,17 +289,13 @@ def _iterparse(self):
# Consume first boundary. Ignore any preamble, as required by RFC
# 2046, section 5.1.1.
for line, nl in lines:
if line in (separator, terminator):
if line == separator:
break
if line == terminator:
return # Empty form
else:
raise MultipartError("Stream does not contain boundary")

# Check for empty data
if line == terminator:
for _ in lines:
raise MultipartError("Data after end of stream")
return

# For each part in stream...
mem_used, disk_used = 0, 0 # Track used resources to prevent DoS
is_tail = False # True if the last line was incomplete (cutted)
Expand All @@ -309,7 +310,7 @@ def _iterparse(self):

for line, nl in lines:
if line == terminator and not is_tail:
part.file.seek(0)
part.end_part()
yield part
break

Expand All @@ -318,8 +319,8 @@ def _iterparse(self):
mem_used += part.size
else:
disk_used += part.size
part.file.seek(0)

part.end_part()
yield part

part = MultipartPart(**opts)
Expand All @@ -337,13 +338,9 @@ def _iterparse(self):
except MultipartError:
part.close()
raise
else:
# If we run off the end of the loop, the current MultipartPart
# will not have been yielded, so it's our responsibility to
# close it.
part.close()

if line != terminator:
part.close()
raise MultipartError("Unexpected end of multipart stream.")


Expand All @@ -368,6 +365,11 @@ def feed(self, line, nl=""):

return self.write_header(line, nl)

def end_part(self):
if not self.file:
raise MultipartError("Unexpected end of part within header section.")
self.file.seek(0)

def write_header(self, line, nl):
line = line.decode(self.charset)

Expand All @@ -387,8 +389,8 @@ def write_header(self, line, nl):
self.headerlist.append((name.strip(), value.strip()))

def write_body(self, line, nl):
if not line and not nl:
return # This does not even flush the buffer
if not line and not nl: # pragma: no cover
return # No new data (should not happen)

self.size += len(line) + len(self._buf)
self.file.write(self._buf + line)
Expand Down Expand Up @@ -431,17 +433,12 @@ def value(self):

@property
def raw(self):
""" Data without decoding """
""" Raw binary data """
pos = self.file.tell()
self.file.seek(0)

try:
val = self.file.read()
except IOError:
raise
finally:
self.file.seek(pos)

val = self.file.read()
self.file.seek(pos)
return val

def save_as(self, path):
Expand Down
52 changes: 52 additions & 0 deletions test/test_multdict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import unittest
import os.path, tempfile

import multipart as mp

class testMultiDict(unittest.TestCase):

def test_init(self):
md = mp.MultiDict([("a", "1")], {"a": "2"}, a="3")
self.assertEqual(md.dict, {"a": ["1", "2", "3"]})

def test_append(self):
md = mp.MultiDict()
md["a"] = "1"
md["a"] = "2"
md.append("a", "3")
md.update(a="4")
self.assertEqual(md.dict, {"a": ["1", "2", "3", "4"]})

def test_behaves_like_dict(self):
md = mp.MultiDict([("a", "1"), ("a", "2")])
self.assertTrue("a" in md)
self.assertFalse("b" in md)
self.assertTrue("a" in md.keys())
del md["a"]
self.assertTrue("a" not in md)

def test_access_last(self):
md = mp.MultiDict([("a", "1"), ("a", "2")])
self.assertEqual(md["a"], "2")
self.assertEqual(md.get("a"), "2")
self.assertEqual(md.get("b"), None)

def test_replace(self):
md = mp.MultiDict([("a", "1"), ("a", "2")])
md.replace("a", "3")
self.assertEqual(md.dict, {"a": ["3"]})

def test_str_repr(self):
md = mp.MultiDict([("a", "1"), ("a", "2")])
self.assertEqual(str(md), str(md.dict))
self.assertEqual(repr(md), repr(md.dict))

def test_access_index(self):
md = mp.MultiDict([("a", "1"), ("a", "2")])
self.assertEqual(md.get("a", index=0), "1")

def test_access_all(self):
md = mp.MultiDict([("a", "1"), ("a", "2")])
self.assertEqual(md.getall("a"), ["1", "2"])
self.assertEqual(list(md.iterallitems()), [("a", "1"),("a", "2")])
40 changes: 33 additions & 7 deletions test/test_multipart.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# -*- coding: utf-8 -*-
import unittest
import base64
import sys, os.path, tempfile
import os.path, tempfile

from io import BytesIO

try:
import multipart as mp
except ModuleNotFoundError:
raise SystemExit("multipart not resolveable. Try 'python setup.py develop'.")

import multipart as mp
from multipart import to_bytes

#TODO: bufsize=10, line=1234567890--boundary\n
Expand Down Expand Up @@ -260,6 +256,7 @@ def setUp(self):
'wsgi.input': self.data}

def write(self, *lines):
self.data.truncate(0)
for line in lines:
self.data.write(to_bytes(line))

Expand Down Expand Up @@ -304,6 +301,14 @@ def test_missing_boundary(self):
self.env['CONTENT_TYPE'] = 'multipart/form-data'
self.assertMPError()

def test_part_ends_after_header(self):
self.write('--foo\r\n', 'Header: value\r\n', '--foo--')
self.assertMPError()

def test_part_ends_in_header(self):
self.write('--foo\r\n', 'Header: value', '--foo--')
self.assertMPError()

def test_no_terminator(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
Expand All @@ -325,7 +330,7 @@ def test_no_newline_after_middle_content(self):
self.assertEqual(len(files), 1)
self.assertTrue(to_bytes('name="file2"') in files['file1'].file.read())

def test_preamble_before_start_boundary(self):
def test_ignore_junk_before_start_boundary(self):
forms, files = self.parse('Preamble\r\n', '--foo\r\n'
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n', '--foo--')
Expand All @@ -334,12 +339,33 @@ def test_preamble_before_start_boundary(self):
self.assertEqual(files['file1'].name, 'file1')
self.assertEqual(files['file1'].content_type, 'image/png')

def test_allow_junk_after_end_boundary(self):
self.parse('--foo--\r\njunk')
self.parse('--foo\r\n'
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n', '--foo--\r\n', 'junk')

def test_no_start_boundary(self):
self.write('--bar\r\n','--nonsense\r\n'
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n', '--nonsense--')
self.assertMPError()

def test_no_end_boundary(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
'Content-Type: image/png\r\n', '\r\n', 'abc\r\n')
self.assertMPError()

def test_just_end_boundary(self):
files, forms = self.parse('--foo--') # is fine
self.assertTrue(not files)
self.assertTrue(not forms)

def test_empty_part(self):
self.write('--foo\r\n', '--foo--')
self.assertMPError()

def test_disk_limit(self):
self.write('--foo\r\n',
'Content-Disposition: form-data; name="file1"; filename="random.png"\r\n',
Expand Down

0 comments on commit 53cf784

Please sign in to comment.