-
Notifications
You must be signed in to change notification settings - Fork 114
Expand file tree
/
Copy path_session.py
More file actions
121 lines (96 loc) · 4.01 KB
/
_session.py
File metadata and controls
121 lines (96 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.
from logging import getLogger
import os
import sys
import urllib.parse
import requests
from .exceptions import BugzillaHTTPError
log = getLogger(__name__)
class _BugzillaSession(object):
"""
Class to handle the backend agnostic 'requests' setup
"""
def __init__(self, url, user_agent,
sslverify, cert, tokencache, api_key,
is_redhat_bugzilla,
requests_session=None):
self._url = url
self._user_agent = user_agent
self._scheme = urllib.parse.urlparse(url)[0]
self._tokencache = tokencache
self._api_key = api_key
self._is_xmlrpc = False
self._use_auth_bearer = False
if self._scheme not in ["http", "https"]:
raise ValueError("Invalid URL scheme: %s (%s)" % (
self._scheme, url))
self._session = requests_session
if not self._session:
self._session = requests.Session()
if cert:
self._session.cert = cert
if sslverify is False:
self._session.verify = False
self._session.headers["User-Agent"] = self._user_agent
if is_redhat_bugzilla and self._api_key:
self._use_auth_bearer = True
self._session.headers["Authorization"] = (
"Bearer %s" % self._api_key)
def _get_timeout(self):
# Default to 5 minutes. This is longer than bugzilla.redhat.com's
# apparent 3 minute timeout so shouldn't affect legitimate usage,
# but saves us from indefinite hangs
DEFAULT_TIMEOUT = 300
envtimeout = os.environ.get("PYTHONBUGZILLA_REQUESTS_TIMEOUT")
return float(envtimeout or DEFAULT_TIMEOUT)
def set_rest_defaults(self):
self._session.headers["Content-Type"] = "application/json"
def set_xmlrpc_defaults(self):
self._is_xmlrpc = True
self._session.headers["Content-Type"] = "text/xml"
def get_user_agent(self):
return self._user_agent
def get_scheme(self):
return self._scheme
def get_auth_params(self):
# bugzilla.redhat.com will error if there's auth bits in params
# when Authorization header is used
if self._use_auth_bearer:
return {}
# Don't add a token to the params list if an API key is set.
# Keeping API key solo means bugzilla will definitely fail
# if the key expires. Passing in a token could hide that
# fact, which could make it confusing to pinpoint the issue.
if self._api_key:
# Bugzilla 5.0 only supports api_key as a query parameter.
# Bugzilla 5.1+ takes it as a X-BUGZILLA-API-KEY header as well,
# with query param taking preference.
return {"Bugzilla_api_key": self._api_key}
token = self._tokencache.get_value(self._url)
if token:
return {"Bugzilla_token": token}
return {}
def get_requests_session(self):
return self._session
def request(self, *args, **kwargs):
timeout = self._get_timeout()
if "timeout" not in kwargs:
kwargs["timeout"] = timeout
try:
response = self._session.request(*args, **kwargs)
if self._is_xmlrpc:
# This still appears to matter for properly decoding unicode
# code points in bugzilla.redhat.com content
response.encoding = "UTF-8"
response.raise_for_status()
except Exception as e:
# Scrape the api key out of the returned exception string
message = str(e).replace(self._api_key or "", "")
if isinstance(e, requests.HTTPError):
response = getattr(e, "response", None)
raise BugzillaHTTPError(
message, response=response).with_traceback(
sys.exc_info()[2])
raise type(e)(message).with_traceback(sys.exc_info()[2])
return response