summaryrefslogtreecommitdiffstats
path: root/contrib
diff options
context:
space:
mode:
authorWilliam Yardley <wyardley@users.noreply.github.com>2020-08-21 22:50:55 -0700
committerWilliam Yardley <wyardley@users.noreply.github.com>2020-08-22 10:13:47 -0700
commit2f5a949617cc13eb76758c0c1306a62207623a56 (patch)
tree48c34118957d2ea342e3619b72088b2e31af9964 /contrib
parentc0218adedc6eb420a656d9d061933d93f66126e7 (diff)
mutt_oauth2: update for pylint / flake8 warnings
* Update style to resolve some pylint / flake8 warnings * Run pylint / flake8 in CI
Diffstat (limited to 'contrib')
-rw-r--r--contrib/mutt_oauth2.py31
1 files changed, 20 insertions, 11 deletions
diff --git a/contrib/mutt_oauth2.py b/contrib/mutt_oauth2.py
index 4fd632ac..ed241fc6 100644
--- a/contrib/mutt_oauth2.py
+++ b/contrib/mutt_oauth2.py
@@ -84,7 +84,7 @@ This script obtains and prints a valid OAuth2 access token. State is maintained
encrypted TOKENFILE. Run with "--verbose --authorize" to get started or whenever all
tokens have expired, optionally with "--authflow" to override the default authorization
flow. To truly start over from scratch, first delete TOKENFILE. Use "--verbose --test"
-to test the IMAP/POP/SMTP endpoints.
+to test the IMAP/POP/SMTP endpoints.
''')
ap.add_argument('-v', '--verbose', action='store_true', help='increase verbosity')
ap.add_argument('-d', '--debug', action='store_true', help='enable debug output')
@@ -108,6 +108,7 @@ if path.exists():
'non-interactive usage, or an appropriate environment variable such as '
'GPG_TTY set to allow interactive agent usage from inside a pipe?')
+
def writetokenfile():
'''Writes global token dictionary into token file.'''
if not path.exists():
@@ -118,6 +119,7 @@ def writetokenfile():
capture_output=True)
path.write_bytes(sub2.stdout)
+
if args.debug:
print('Obtained from token file:', json.dumps(token))
if not token:
@@ -147,11 +149,13 @@ baseparams = {'client_id': registration['client_id']}
if 'tenant' in registration:
baseparams['tenant'] = registration['tenant']
+
def access_token_valid():
'''Returns True when stored access token exists and is still valid at this time.'''
token_exp = token['access_token_expiration']
return token_exp and datetime.now() < datetime.fromisoformat(token_exp)
+
def update_tokens(r):
'''Takes a response dictionary, extracts tokens out of it, and updates token file.'''
token['access_token'] = r['access_token']
@@ -172,14 +176,14 @@ if args.authorize:
verifier = secrets.token_urlsafe(90)
challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())[:-1]
redirect_uri = registration['redirect_uri']
- port = 0
+ listen_port = 0
if authflow == 'localhostauthcode':
# Find an available port to listen on
s = socket.socket()
s.bind(('127.0.0.1', 0))
- port = s.getsockname()[1]
+ listen_port = s.getsockname()[1]
s.close()
- redirect_uri = 'http://localhost:'+str(port)+'/'
+ redirect_uri = 'http://localhost:'+str(listen_port)+'/'
# Probably should edit the port number into the actual redirect URL.
p.update({'login_hint': token['email'],
@@ -197,15 +201,20 @@ if args.authorize:
else:
print('Visit displayed URL to authorize this application. Waiting...',
end='', flush=True)
+
class MyHandler(http.server.BaseHTTPRequestHandler):
'''Handles the browser query resulting from redirect to redirect_uri.'''
+
+ # pylint: disable=C0103
def do_HEAD(self):
'''Response to a HEAD requests.'''
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
+
def do_GET(self):
'''For GET request, extract code parameter from URL.'''
+ # pylint: disable=W0603
global authcode
querystring = urllib.parse.urlparse(self.path).query
querydict = urllib.parse.parse_qs(querystring)
@@ -215,7 +224,7 @@ if args.authorize:
self.wfile.write(b'<html><head><title>Authorizaton result</title></head>')
self.wfile.write(b'<body><p>Authorization redirect completed. You may '
b'close this window.</p></body></html>')
- with http.server.HTTPServer(('127.0.0.1', port), MyHandler) as httpd:
+ with http.server.HTTPServer(('127.0.0.1', listen_port), MyHandler) as httpd:
try:
httpd.handle_request()
except KeyboardInterrupt:
@@ -309,7 +318,7 @@ if not access_token_valid():
print('NOTICE: Invalid or expired access token; using refresh token '
'to obtain new access token.')
if not token['refresh_token']:
- sys.exit(f'ERROR: No refresh token. Run script with "--authorize".')
+ sys.exit('ERROR: No refresh token. Run script with "--authorize".')
p = baseparams.copy()
p.update({'client_secret': registration['client_secret'],
'refresh_token': token['refresh_token'],
@@ -328,7 +337,7 @@ if not access_token_valid():
print(response['error'])
if 'error_description' in response:
print(response['error_description'])
- print(f'Perhaps refresh token invalid. Try running once with "--authorize"')
+ print('Perhaps refresh token invalid. Try running once with "--authorize"')
sys.exit(1)
update_tokens(response)
@@ -342,12 +351,12 @@ if args.verbose:
print(token['access_token'])
-def build_sasl_string(user, host, port, token):
+def build_sasl_string(user, host, port, bearer_token):
'''Build appropriate SASL string, which depends on cloud server's supported SASL method.'''
if registration['sasl_method'] == 'OAUTHBEARER':
- return f'n,a={user},\1host={host}\1port={port}\1auth=Bearer {token}\1\1'
+ return f'n,a={user},\1host={host}\1port={port}\1auth=Bearer {bearer_token}\1\1'
if registration['sasl_method'] == 'XOAUTH2':
- return f'user={user}\1auth=Bearer {token}\1\1'
+ return f'user={user}\1auth=Bearer {bearer_token}\1\1'
sys.exit(f'Unknown SASL method {registration["sasl_method"]}.')
@@ -380,6 +389,7 @@ if args.test:
try:
# poplib doesn't have an auth command taking an authenticator object
# Microsoft requires a two-line SASL for POP
+ # pylint: disable=W0212
pop_conn._shortcmd('AUTH ' + registration['sasl_method'])
pop_conn._shortcmd(base64.standard_b64encode(sasl_string.encode()).decode())
if args.verbose:
@@ -407,4 +417,3 @@ if args.test:
if errors:
sys.exit(1)
-