# -*- coding: utf-8 -*-
import cgi
import collections
from io import BytesIO, BufferedReader
from watson.common.contextmanagers import suppress
__all__ = ['get_form_vars']
def read_binary(self):
"""Override for FieldStorage.read_binary method.
Existing FieldStorage method raises a "TypeError: must be str, not bytes"
when CONTENT_LENGTH is specified for a body that isn't key=value pairs.
Decoding the data into the relevant encoding resolves the issue.
"""
todo = self.length
if todo >= 0:
while todo > 0:
data = self.fp.read(min(todo, self.bufsize))
if not isinstance(data, bytes):
raise ValueError("%s should return bytes, got %s"
% (self.fp, type(data).__name__))
self.bytes_read += len(data)
if not data:
self.done = -1
break
if not self.file:
self._binary_file = isinstance(data, bytes)
self.file = self.make_file()
if not self._binary_file:
data = data.decode(self.encoding) # The fix
self.file.write(data)
todo = todo - len(data)
cgi.FieldStorage.read_binary = read_binary
WSGI_BODY = 'wsgi.body.original'
def copy_wsgi_input(environ):
"""Copies a wsgi.input key so that it is seekable.
"""
if WSGI_BODY not in environ:
content_length = environ.get('CONTENT_LENGTH', '')
length = int(content_length) if content_length else 0
body = environ['wsgi.input'].read(length)
environ[WSGI_BODY] = body
environ['wsgi.input'] = BufferedReader(BytesIO(body))
File = collections.namedtuple(
'File',
'data filename name type type_options disposition disposition_options headers raw_field')
def _process_field_storage(fs, post, files):
with suppress(Exception):
for name in fs:
field = fs[name] if isinstance(name, str) else name
if isinstance(field, list):
_process_field_storage(field, post, files)
elif field.filename:
# An uploaded file, create a new File tuple to resolve the
# not indexable issue.
files[field.name] = File(
field.file,
field.filename,
field.name,
field.type,
field.type_options,
field.disposition,
field.disposition_options,
field.headers,
field)
else:
post[field.name] = field.value
return post, files