Source code for watson.http.headers
# -*- coding: utf-8 -*-
from watson.common.datastructures import MultiDict
from watson.http.cookies import CookieDict
[docs]class HeaderDict(MultiDict):
"""A dictionary of headers and their values.
Contains a collection of key/value pairs that define a set of headers
for either a http request or response (e.g. HTTP_ACCEPT)
"""
[docs] def add(self, field, value, replace=False, **options):
"""
Adds a header to the collection.
Example:
.. code-block:: python
# Content-Type: text/html; charset=utf-8
headers = HeaderCollection()
headers.add('Content-Type', 'text/html', charset='utf-8')
Args:
field: the field name of the header
value: the value for the header
options: any other keyword args to add to the value
"""
vals = [str(value)]
if options:
vals.extend(['{0}={1}'.format(key, val) for
key, val in options.items()])
self.set(
parse_from_environ_header_field(field),
'; '.join(vals),
replace)
[docs] def get_option(self, field, option, default=None):
"""Retrieve an individual option from a header.
Example:
.. code-block:: python
# Content-Type: text/html; charset=utf-8
headers = HeaderCollection()
headers.add('Content-Type', 'text/html', charset='utf-8')
option = headers.get_option('Content-Type', 'charset') # utf-8
Args:
field: the header field
option: the option to retrieve from the field
default: the default value if the option does not exist
Returns:
The default value or the value from the option
"""
real_field = parse_from_environ_header_field(field)
if real_field not in self:
return default
options = self.get(real_field).split('; ')
found = [opt.split('=')[1]
for opt in options if opt.split('=')[0] == option]
return found[0] if found else default
def __getitem__(self, field, default=None):
field = parse_from_environ_header_field(field)
return super(HeaderDict, self).get(field, default)
def get(self, field, default=None):
return self.__getitem__(field, default)
def __delitem__(self, field):
if field in self:
super(HeaderDict, self).__delitem__(field)
def __call__(self):
"""Output in a format suitable for a wsgi callable.
Outputs the header collection as a list of tuple pairs for use in a
wsgi application.
Returns:
A list of tuple pairs
"""
tuple_pairs = []
for field, value in sorted(self.items()):
if (isinstance(value, list)):
for multi_val in value:
tuple_pairs.append((field, multi_val))
else:
tuple_pairs.append((field, value))
return tuple_pairs
def __str__(self):
return (
'\r\n'.join(['{0}: {1}'.format(field, value)
for field, value in self()])
)
[docs]def is_header(field):
"""Determine if a field is an acceptable http header.
"""
return (
field[:5] == 'HTTP_' or field in (
'CONTENT_TYPE',
'CONTENT_LENGTH',
'HTTPS')
)
[docs]def http_header(field):
"""Return the correct header field name.
"""
return field if field[:5] != 'HTTP_' else field[5:]
[docs]def parse_to_environ_header_field(field):
"""Converts a http header field into an uppercase form.
"""
return field.replace('-', '_').upper()
[docs]def parse_from_environ_header_field(field):
"""Converts a http header field into a lowercase form.
"""
return http_header(field).replace('_', ' ').title().replace(' ', '-')
[docs]def split_headers_server_vars(environ):
"""Splits the environ into headers and server pairs.
"""
headers = HeaderDict()
server = MultiDict()
cookies = CookieDict()
for key in environ:
if is_header(key):
headers.add(http_header(key), environ[key])
if key == 'HTTP_COOKIE':
cookies = CookieDict(environ[key])
cookies.modified = False
else:
server[key] = environ[key]
return headers, server, cookies